You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/11 05:34:48 UTC
[1/3] incubator-kylin git commit: KYLIN-1007 Use
CubeDesc.getMeasures() as the order of measures on GridTable,
as well as on MR KV
Repository: incubator-kylin
Updated Branches:
refs/heads/2.x-staging 0739cc227 -> acd8c0dfb
KYLIN-1007 Use CubeDesc.getMeasures() as the order of measures on
GridTable, as well as on MR KV
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/fe9e02cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/fe9e02cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/fe9e02cd
Branch: refs/heads/2.x-staging
Commit: fe9e02cd3b18d7e25232ca9d50d37637d5721ae8
Parents: 0739cc2
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Sep 7 18:00:07 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Sep 11 11:34:01 2015 +0800
----------------------------------------------------------------------
.../gridtable/CuboidToGridTableMapping.java | 66 ++++++++++--------
.../cube/inmemcubing/DoggedCubeBuilder.java | 7 +-
.../cube/inmemcubing/InMemCubeBuilder.java | 33 ++-------
.../org/apache/kylin/cube/model/CubeDesc.java | 17 +++--
.../kylin/cube/model/HBaseColumnDesc.java | 37 +++++++---
.../org/apache/kylin/engine/mr/IMROutput.java | 8 +--
.../org/apache/kylin/engine/mr/IMROutput2.java | 22 ++++++
.../engine/mr/steps/InMemCuboidReducer.java | 15 +----
.../mr/steps/MergeCuboidFromStorageMapper.java | 21 ++----
.../engine/mr/steps/MergeCuboidMapper.java | 3 +-
.../storage/hbase/cube/v1/CubeStorageQuery.java | 4 +-
.../hbase/cube/v1/CubeTupleConverter.java | 2 +-
.../storage/hbase/steps/CubeHFileMapper.java | 3 +-
.../storage/hbase/steps/HBaseCuboidWriter.java | 18 ++---
.../storage/hbase/steps/HBaseMROutput2.java | 19 ++----
.../hbase/steps/InMemKeyValueCreator.java | 71 --------------------
.../storage/hbase/steps/KeyValueCreator.java | 32 ++-------
.../storage/hbase/steps/RowValueDecoder.java | 38 +++++++----
.../hbase/steps/RowValueDecoderTest.java | 6 +-
19 files changed, 167 insertions(+), 255 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
index ab4efca..12eb5f8 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
@@ -1,5 +1,6 @@
package org.apache.kylin.cube.gridtable;
+import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
@@ -59,29 +60,48 @@ public class CuboidToGridTableMapping {
nDimensions = gtColIdx;
assert nDimensions == cuboid.getColumns().size();
+ // column blocks of metrics
+ ArrayList<BitSet> metricsColBlocks = Lists.newArrayList();
+ for (HBaseColumnFamilyDesc familyDesc : cuboid.getCube().getHbaseMapping().getColumnFamily()) {
+ for (int i = 0; i < familyDesc.getColumns().length; i++) {
+ metricsColBlocks.add(new BitSet());
+ }
+ }
+
// metrics
metrics2gt = LinkedListMultimap.create();
- for (HBaseColumnFamilyDesc familyDesc : cuboid.getCube().getHbaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
- BitSet colBlock = new BitSet();
- for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
- // Count distinct & holistic count distinct are equals() but different.
- // Ensure the holistic version if exists is always the first.
- FunctionDesc func = measure.getFunction();
- if (func.isHolisticCountDistinct()) {
- List<Integer> existing = metrics2gt.removeAll(func);
- metrics2gt.put(func, gtColIdx);
- metrics2gt.putAll(func, existing);
- } else {
- metrics2gt.put(func, gtColIdx);
+ for (MeasureDesc measure :cuboid.getCube().getMeasures()) {
+ // Count distinct & holistic count distinct are equals() but different.
+ // Ensure the holistic version if exists is always the first.
+ FunctionDesc func = measure.getFunction();
+ if (func.isHolisticCountDistinct()) {
+ List<Integer> existing = metrics2gt.removeAll(func);
+ metrics2gt.put(func, gtColIdx);
+ metrics2gt.putAll(func, existing);
+ } else {
+ metrics2gt.put(func, gtColIdx);
+ }
+
+ gtDataTypes.add(func.getReturnDataType());
+
+ // map to column block
+ int cbIdx = 0;
+ for (HBaseColumnFamilyDesc familyDesc : cuboid.getCube().getHbaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+ if (hbaseColDesc.containsMeasure(measure.getName())) {
+ metricsColBlocks.get(cbIdx).set(gtColIdx);
}
- gtDataTypes.add(func.getReturnDataType());
- colBlock.set(gtColIdx);
- gtColIdx++;
+ cbIdx++;
}
- gtColBlocks.add(new ImmutableBitSet(colBlock));
}
+
+ gtColIdx++;
}
+
+ for (BitSet set : metricsColBlocks) {
+ gtColBlocks.add(new ImmutableBitSet(set));
+ }
+
nMetrics = gtColIdx - nDimensions;
assert nMetrics == cuboid.getCube().getMeasures().size();
}
@@ -157,16 +177,4 @@ public class CuboidToGridTableMapping {
return result.isEmpty() ? Collections.<Integer, Integer> emptyMap() : result;
}
- public static MeasureDesc[] getMeasureSequenceOnGridTable(CubeDesc cube) {
- MeasureDesc[] result = new MeasureDesc[cube.getMeasures().size()];
- int i = 0;
- for (HBaseColumnFamilyDesc familyDesc : cube.getHbaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
- for (MeasureDesc m : hbaseColDesc.getMeasures()) {
- result[i++] = m;
- }
- }
- }
- return result;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index 30e1e95..d565ab5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -32,14 +32,12 @@ import java.util.concurrent.TimeUnit;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.metadata.measure.MeasureAggregators;
-import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -286,9 +284,8 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
ImmutableBitSet lastMetricsColumns;
Merger() {
- MeasureDesc[] measures = CuboidToGridTableMapping.getMeasureSequenceOnGridTable(cubeDesc);
- reuseAggrs = new MeasureAggregators(measures);
- reuseMetricsArray = new Object[measures.length];
+ reuseAggrs = new MeasureAggregators(cubeDesc.getMeasures());
+ reuseMetricsArray = new Object[cubeDesc.getMeasures().size()];
}
public void mergeAndOutput(List<SplitThread> splits, ICuboidWriter output) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 465dd99..662416e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -38,8 +38,6 @@ import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.gridtable.CubeGridTable;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.gridtable.GTAggregateScanner;
import org.apache.kylin.gridtable.GTBuilder;
@@ -75,7 +73,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private final CubeJoinedFlatTableDesc intermediateTableDesc;
private final MeasureCodec measureCodec;
private final String[] metricsAggrFuncs;
- private final int[] hbaseMeasureRefIndex;
private final MeasureDesc[] measureDescs;
private final int measureCount;
@@ -97,35 +94,18 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
+ this.measureCount = cubeDesc.getMeasures().size();
+ this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
+
Map<String, Integer> measureIndexMap = Maps.newHashMap();
List<String> metricsAggrFuncsList = Lists.newArrayList();
- measureCount = cubeDesc.getMeasures().size();
-
- List<MeasureDesc> measureDescsList = Lists.newArrayList();
- hbaseMeasureRefIndex = new int[measureCount];
- int measureRef = 0;
- for (HBaseColumnFamilyDesc familyDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
- for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
- for (int j = 0; j < measureCount; j++) {
- if (cubeDesc.getMeasures().get(j).equals(measure)) {
- measureDescsList.add(measure);
- hbaseMeasureRefIndex[measureRef] = j;
- break;
- }
- }
- measureRef++;
- }
- }
- }
for (int i = 0; i < measureCount; i++) {
- MeasureDesc measureDesc = measureDescsList.get(i);
+ MeasureDesc measureDesc = measureDescs[i];
metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
measureIndexMap.put(measureDesc.getName(), i);
}
this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
- this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
}
private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
@@ -605,8 +585,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
Object[] values = new Object[measureCount];
MeasureDesc measureDesc = null;
- for (int position = 0; position < hbaseMeasureRefIndex.length; position++) {
- int i = hbaseMeasureRefIndex[position];
+ for (int i = 0; i < measureCount; i++) {
measureDesc = measureDescs[i];
Object value = null;
@@ -635,7 +614,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
value = measureCodec.getSerializer(i).valueOf(result);
}
- values[position] = value;
+ values[i] = value;
}
return values;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 84cf842..0060e09 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -651,17 +651,24 @@ public class CubeDesc extends RootPersistentEntity {
if (measures == null || measures.size() == 0)
return;
- Map<String, MeasureDesc> measureCache = new HashMap<String, MeasureDesc>();
+ Map<String, MeasureDesc> measureLookup = new HashMap<String, MeasureDesc>();
for (MeasureDesc m : measures)
- measureCache.put(m.getName(), m);
+ measureLookup.put(m.getName(), m);
+ Map<String, Integer> measureIndexLookup = new HashMap<String, Integer>();
+ for (int i = 0; i < measures.size(); i++)
+ measureIndexLookup.put(measures.get(i).getName(), i);
for (HBaseColumnFamilyDesc cf : getHBaseMapping().getColumnFamily()) {
for (HBaseColumnDesc c : cf.getColumns()) {
- MeasureDesc[] measureDescs = new MeasureDesc[c.getMeasureRefs().length];
- for (int i = 0; i < c.getMeasureRefs().length; i++) {
- measureDescs[i] = measureCache.get(c.getMeasureRefs()[i]);
+ String[] colMeasureRefs = c.getMeasureRefs();
+ MeasureDesc[] measureDescs = new MeasureDesc[colMeasureRefs.length];
+ int[] measureIndex = new int[colMeasureRefs.length];
+ for (int i = 0; i < colMeasureRefs.length; i++) {
+ measureDescs[i] = measureLookup.get(colMeasureRefs[i]);
+ measureIndex[i] = measureIndexLookup.get(colMeasureRefs[i]);
}
c.setMeasures(measureDescs);
+ c.setMeasureIndex(measureIndex);
c.setColumnFamilyName(cf.getName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java
index 9aa6727..e82ad53 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java
@@ -37,8 +37,9 @@ public class HBaseColumnDesc {
@JsonProperty("measure_refs")
private String[] measureRefs;
- // these two will be assemble at runtime
+ // these two will be assembled at runtime
private MeasureDesc[] measures;
+ private int[] measureIndex; // the index on CubeDesc.getMeasures()
private String columnFamilyName;
public String getQualifier() {
@@ -56,20 +57,19 @@ public class HBaseColumnDesc {
public void setMeasureRefs(String[] measureRefs) {
this.measureRefs = measureRefs;
}
+
+ public int[] getMeasureIndex() {
+ return measureIndex;
+ }
+
+ public void setMeasureIndex(int[] index) {
+ this.measureIndex = index;
+ }
public MeasureDesc[] getMeasures() {
return measures;
}
- public int findMeasureIndex(FunctionDesc function) {
- for (int i = 0; i < measures.length; i++) {
- if (measures[i].getFunction().equals(function)) {
- return i;
- }
- }
- return -1;
- }
-
public void setMeasures(MeasureDesc[] measures) {
this.measures = measures;
}
@@ -82,6 +82,23 @@ public class HBaseColumnDesc {
this.columnFamilyName = columnFamilyName;
}
+ public int findMeasure(FunctionDesc function) {
+ for (int i = 0; i < measures.length; i++) {
+ if (measures[i].getFunction().equals(function)) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ public boolean containsMeasure(String refName) {
+ for (String ref : measureRefs) {
+ if (ref.equals(refName))
+ return true;
+ }
+ return false;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
index e6282f7..577a836 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
@@ -40,8 +40,8 @@ public interface IMROutput {
/**
* Add step that saves cuboid output from HDFS to storage.
*
- * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn",
- * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn,
+ * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
* dictionary encoding; Mx is measure value serialization form.
*/
public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
@@ -66,8 +66,8 @@ public interface IMROutput {
/**
* Add step that saves cuboid output from HDFS to storage.
*
- * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn",
- * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn,
+ * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
* dictionary encoding; Mx is measure value serialization form.
*/
public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
index 456df82..b4aff5d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -27,6 +27,7 @@ public interface IMROutput2 {
*/
public interface IMRBatchCubingOutputSide2 {
+ /** Return an output format for Phase 3: Build Cube MR */
public IMRStorageOutputFormat getStorageOutputFormat();
/** Add step that executes after build dictionary and before build cube. */
@@ -45,13 +46,22 @@ public interface IMROutput2 {
public IMRStorageInputFormat getStorageInputFormat();
}
+ /** Read in a cube as input of merge. Configure the input file format of mapper. */
@SuppressWarnings("rawtypes")
public interface IMRStorageInputFormat {
+ /** Configure MR mapper class and input file format. */
public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException;
+ /** Given a mapper context, figure out which segment the mapper reads from. */
public CubeSegment findSourceSegment(Mapper.Context context, CubeInstance cubeInstance) throws IOException;
+ /**
+ * Read in a row of cuboid. Given the input KV, de-serialize back cuboid ID, dimensions, and measures.
+ *
+ * @return <code>ByteArrayWritable</code> is the cuboid ID (8 bytes) + dimension values in dictionary encoding
+ * <code>Object[]</code> is the measure values in order of <code>CubeDesc.getMeasures()</code>
+ */
public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue);
}
@@ -67,6 +77,7 @@ public interface IMROutput2 {
*/
public interface IMRBatchMergeOutputSide2 {
+ /** Return an input format for Phase 2: Merge Cube MR */
public IMRStorageOutputFormat getStorageOutputFormat();
/** Add step that executes after merge dictionary and before merge cube. */
@@ -79,10 +90,21 @@ public interface IMROutput2 {
public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
}
+ /** Write out a cube. Configure the output file format of reducer and do the actual K-V output. */
@SuppressWarnings("rawtypes")
public interface IMRStorageOutputFormat {
+
+ /** Configure MR reducer class and output file format. */
public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException;
+ /**
+ * Write out a row of cuboid. Given the cuboid ID, dimensions, and measures, serialize in whatever
+ * way and output to reducer context.
+ *
+ * @param key The cuboid ID (8 bytes) + dimension values in dictionary encoding
+ * @param value The measure values in order of <code>CubeDesc.getMeasures()</code>
+ * @param context The reducer context output goes to
+ */
public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index 443ecd8..db254f6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -8,8 +8,6 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.engine.mr.ByteArrayWritable;
import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
import org.apache.kylin.engine.mr.KylinReducer;
@@ -23,8 +21,6 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-
/**
*/
public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArrayWritable, Object, Object> {
@@ -56,18 +52,9 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
else
storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
- List<MeasureDesc> measuresDescs = Lists.newArrayList();
- for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- for (MeasureDesc measure : colDesc.getMeasures()) {
- measuresDescs.add(measure);
- }
- }
- }
-
+ List<MeasureDesc> measuresDescs = cubeDesc.getMeasures();
codec = new MeasureCodec(measuresDescs);
aggs = new MeasureAggregators(measuresDescs);
-
input = new Object[measuresDescs.size()];
result = new Object[measuresDescs.size()];
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index b57e484..fa575ca 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.BytesUtil;
@@ -35,8 +34,6 @@ import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.engine.mr.ByteArrayWritable;
@@ -46,14 +43,12 @@ import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
/**
* @author shaoshi
@@ -111,22 +106,14 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
storageInputFormat = MRUtil.getBatchMergeInputSide2(mergedCubeSegment).getStorageInputFormat();
- newKeyBuf = new byte[256];// size will auto-grow
+ newKeyBuf = new byte[256]; // size will auto-grow
sourceCubeSegment = storageInputFormat.findSourceSegment(context, cube);
- logger.info(sourceCubeSegment.toString());
+ logger.info("Source cube segment: " + sourceCubeSegment);
- this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+ rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
- List<MeasureDesc> measuresDescs = Lists.newArrayList();
- for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- for (MeasureDesc measure : colDesc.getMeasures()) {
- measuresDescs.add(measure);
- }
- }
- }
- codec = new MeasureCodec(measuresDescs);
+ codec = new MeasureCodec(cubeDesc.getMeasures());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 5327dc4..a09ffe1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -57,8 +57,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private CubeInstance cube;
private CubeDesc cubeDesc;
private CubeSegment mergedCubeSegment;
- private CubeSegment sourceCubeSegment;// Must be unique during a mapper's
- // life cycle
+ private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
private Text outputKey = new Text();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 0514b45..836f142 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -373,7 +373,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
int bestIndex = -1;
for (HBaseColumnDesc hbCol : hbCols) {
bestHBCol = hbCol;
- bestIndex = hbCol.findMeasureIndex(aggrFunc);
+ bestIndex = hbCol.findMeasure(aggrFunc);
MeasureDesc measure = hbCol.getMeasures()[bestIndex];
// criteria for holistic measure: Exact Aggregation && Exact Cuboid
if (measure.getFunction().isHolisticCountDistinct() && context.isExactAggregation()) {
@@ -387,7 +387,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
codec = new RowValueDecoder(bestHBCol);
codecMap.put(bestHBCol, codec);
}
- codec.setIndex(bestIndex);
+ codec.setProjectIndex(bestIndex);
}
return new ArrayList<RowValueDecoder>(codecMap.values());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
index 27f4f10..e569cbd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
@@ -118,7 +118,7 @@ public class CubeTupleConverter {
// measures
for (int i = 0; i < rowValueDecoders.size(); i++) {
RowValueDecoder rowValueDecoder = rowValueDecoders.get(i);
- rowValueDecoder.decode(hbaseRow);
+ rowValueDecoder.decodeAndConvertJavaObj(hbaseRow);
Object[] measureValues = rowValueDecoder.getValues();
int[] measureIdx = metricsMeasureIdx[i];
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
index 65b56b9..9f97b0e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
@@ -79,8 +79,7 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWrita
KeyValue outputValue;
int n = keyValueCreators.size();
- if (n == 1 && keyValueCreators.get(0).isFullCopy) { // shortcut for
- // simple full copy
+ if (n == 1 && keyValueCreators.get(0).isFullCopy) { // shortcut for simple full copy
outputValue = keyValueCreators.get(0).create(key, value.getBytes(), 0, value.getLength());
context.write(outputKey, outputValue);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
index d313233..1271070 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
@@ -45,7 +45,6 @@ import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.HBaseColumnDesc;
@@ -64,26 +63,26 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
private static final int BATCH_PUT_THRESHOLD = 10000;
- private final List<InMemKeyValueCreator> keyValueCreators;
+ private final List<KeyValueCreator> keyValueCreators;
private final int nColumns;
private final HTableInterface hTable;
private final ByteBuffer byteBuffer;
private final CubeDesc cubeDesc;
+ private final Object[] measureValues;
private List<Put> puts = Lists.newArrayList();
public HBaseCuboidWriter(CubeDesc cubeDesc, HTableInterface hTable) {
this.keyValueCreators = Lists.newArrayList();
this.cubeDesc = cubeDesc;
- int startPosition = 0;
for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- keyValueCreators.add(new InMemKeyValueCreator(colDesc, startPosition));
- startPosition += colDesc.getMeasures().length;
+ keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
}
}
this.nColumns = keyValueCreators.size();
this.hTable = hTable;
this.byteBuffer = ByteBuffer.allocate(1 << 20);
+ this.measureValues = new Object[cubeDesc.getMeasures().size()];
}
private byte[] copy(byte[] array, int offset, int length) {
@@ -106,10 +105,13 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
@Override
public void write(long cuboidId, GTRecord record) throws IOException {
final ByteBuffer key = createKey(cuboidId, record);
- final CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(Cuboid.findById(cubeDesc, cuboidId));
- final ImmutableBitSet bitSet = new ImmutableBitSet(mapping.getDimensionCount(), mapping.getColumnCount());
+ final Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
+ final int nDims = cuboid.getColumns().size();
+ final ImmutableBitSet bitSet = new ImmutableBitSet(nDims, nDims + cubeDesc.getMeasures().size());
+
for (int i = 0; i < nColumns; i++) {
- final KeyValue keyValue = keyValueCreators.get(i).create(key.array(), 0, key.position(), record.getValues(bitSet, new Object[bitSet.cardinality()]));
+ final Object[] values = record.getValues(bitSet, measureValues);
+ final KeyValue keyValue = keyValueCreators.get(i).create(key.array(), 0, key.position(), values);
final Put put = new Put(copy(key.array(), 0, key.position()));
byte[] family = copy(keyValue.getFamilyArray(), keyValue.getFamilyOffset(), keyValue.getFamilyLength());
byte[] qualifier = copy(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
index 6cbcec3..1e414be 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
@@ -58,9 +58,6 @@ import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMROutput2;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.storage.hbase.steps.HBaseMRSteps;
-import org.apache.kylin.storage.hbase.steps.InMemKeyValueCreator;
-import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
import com.google.common.collect.Lists;
@@ -196,14 +193,10 @@ public class HBaseMROutput2 implements IMROutput2 {
ImmutableBytesWritable key = (ImmutableBytesWritable) inKey;
parsedKey.set(key.get(), key.getOffset(), key.getLength());
- Result value = (Result) inValue;
- int position = 0;
+ Result hbaseRow = (Result) inValue;
for (int i = 0; i < rowValueDecoders.length; i++) {
- rowValueDecoders[i].decode(value, false);
- Object[] measureValues = rowValueDecoders[i].getValues();
- for (int j = 0; j < measureValues.length; j++) {
- parsedValue[position++] = measureValues[j];
- }
+ rowValueDecoders[i].decode(hbaseRow);
+ rowValueDecoders[i].loadCubeMeasureArray(parsedValue);
}
return parsedPair;
@@ -214,7 +207,7 @@ public class HBaseMROutput2 implements IMROutput2 {
private static class HBaseOutputFormat implements IMRStorageOutputFormat {
final CubeSegment seg;
- final List<InMemKeyValueCreator> keyValueCreators = Lists.newArrayList();
+ final List<KeyValueCreator> keyValueCreators = Lists.newArrayList();
final ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
public HBaseOutputFormat(CubeSegment seg) {
@@ -243,11 +236,9 @@ public class HBaseMROutput2 implements IMROutput2 {
@Override
public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException {
if (keyValueCreators.size() == 0) {
- int startPosition = 0;
for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- keyValueCreators.add(new InMemKeyValueCreator(colDesc, startPosition));
- startPosition += colDesc.getMeasures().length;
+ keyValueCreators.add(new KeyValueCreator(seg.getCubeDesc(), colDesc));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java
deleted file mode 100644
index d96dfcc..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package org.apache.kylin.storage.hbase.steps;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
-import com.google.common.collect.Lists;
-
-public class InMemKeyValueCreator {
- byte[] cfBytes;
- byte[] qBytes;
- long timestamp;
-
- MeasureCodec codec;
- Object[] colValues;
- ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
- int startPosition = 0;
-
- public InMemKeyValueCreator(HBaseColumnDesc colDesc, int startPosition) {
-
- cfBytes = Bytes.toBytes(colDesc.getColumnFamilyName());
- qBytes = Bytes.toBytes(colDesc.getQualifier());
- timestamp = System.currentTimeMillis();
-
- List<MeasureDesc> measures = Lists.newArrayList();
- for (MeasureDesc measure : colDesc.getMeasures()) {
- measures.add(measure);
- }
- codec = new MeasureCodec(measures);
- colValues = new Object[measures.size()];
-
- this.startPosition = startPosition;
-
- }
-
- public KeyValue create(Text key, Object[] measureValues) {
- return create(key.getBytes(), 0, key.getLength(), measureValues);
- }
-
- public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, Object[] measureValues) {
- for (int i = 0; i < colValues.length; i++) {
- colValues[i] = measureValues[startPosition + i];
- }
-
- valueBuf.clear();
- codec.encode(colValues, valueBuf);
-
- return create(keyBytes, keyOffset, keyLength, valueBuf.array(), 0, valueBuf.position());
- }
-
- public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, byte[] value, int voffset, int vlen) {
- return new KeyValue(keyBytes, keyOffset, keyLength, //
- cfBytes, 0, cfBytes.length, //
- qBytes, 0, qBytes.length, //
- timestamp, KeyValue.Type.Put, //
- value, voffset, vlen);
- }
-
- public KeyValue create(Text key, byte[] value, int voffset, int vlen) {
- return create(key.getBytes(), 0, key.getLength(), value, voffset, vlen);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
index a24fe39..7f40259 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
@@ -1,7 +1,6 @@
package org.apache.kylin.storage.hbase.steps;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
@@ -36,21 +35,14 @@ public class KeyValueCreator {
qBytes = Bytes.toBytes(colDesc.getQualifier());
timestamp = System.currentTimeMillis();
- List<MeasureDesc> measures = cubeDesc.getMeasures();
- String[] measureNames = getMeasureNames(cubeDesc);
- String[] refs = colDesc.getMeasureRefs();
-
- refIndex = new int[refs.length];
- refMeasures = new MeasureDesc[refs.length];
- for (int i = 0; i < refs.length; i++) {
- refIndex[i] = indexOf(measureNames, refs[i]);
- refMeasures[i] = measures.get(refIndex[i]);
- }
+ refIndex = colDesc.getMeasureIndex();
+ refMeasures = colDesc.getMeasures();
codec = new MeasureCodec(refMeasures);
- colValues = new Object[refs.length];
+ colValues = new Object[refMeasures.length];
isFullCopy = true;
+ List<MeasureDesc> measures = cubeDesc.getMeasures();
for (int i = 0; i < measures.size(); i++) {
if (refIndex.length <= i || refIndex[i] != i)
isFullCopy = false;
@@ -84,20 +76,4 @@ public class KeyValueCreator {
return create(key.getBytes(), 0, key.getLength(), value, voffset, vlen);
}
- private int indexOf(String[] measureNames, String ref) {
- for (int i = 0; i < measureNames.length; i++)
- if (measureNames[i].equalsIgnoreCase(ref))
- return i;
-
- throw new IllegalArgumentException("Measure '" + ref + "' not found in " + Arrays.toString(measureNames));
- }
-
- private String[] getMeasureNames(CubeDesc cubeDesc) {
- List<MeasureDesc> measures = cubeDesc.getMeasures();
- String[] result = new String[measures.size()];
- for (int i = 0; i < measures.size(); i++)
- result[i] = measures.get(i).getName();
- return result;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
index 9fd4e22..7c1a19f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
@@ -42,7 +42,7 @@ public class RowValueDecoder implements Cloneable {
private final MeasureCodec codec;
private final BitSet projectionIndex;
private final MeasureDesc[] measures;
- private Object[] values;
+ private final Object[] values;
public RowValueDecoder(HBaseColumnDesc hbaseColumn) {
this.hbaseColumn = hbaseColumn;
@@ -54,22 +54,26 @@ public class RowValueDecoder implements Cloneable {
this.values = new Object[measures.length];
}
- public void decode(Result hbaseRow) {
+ public void decodeAndConvertJavaObj(Result hbaseRow) {
decode(hbaseRow, true);
}
+
+ public void decode(Result hbaseRow) {
+ decode(hbaseRow, false);
+ }
- public void decode(Result hbaseRow, boolean convertToJavaObject) {
+ private void decode(Result hbaseRow, boolean convertToJavaObject) {
decode(hbaseRow.getValueAsByteBuffer(hbaseColumnFamily, hbaseColumnQualifier), convertToJavaObject);
}
- public void decode(byte[] bytes) {
- decode(bytes, true);
+ public void decodeAndConvertJavaObj(byte[] bytes) {
+ decode(ByteBuffer.wrap(bytes), true);
}
- public void decode(byte[] bytes, boolean convertToJavaObject) {
- decode(ByteBuffer.wrap(bytes), convertToJavaObject);
+ public void decode(byte[] bytes) {
+ decode(ByteBuffer.wrap(bytes), false);
}
-
+
private void decode(ByteBuffer buffer, boolean convertToJavaObject) {
codec.decode(buffer, values);
if (convertToJavaObject) {
@@ -90,18 +94,18 @@ public class RowValueDecoder implements Cloneable {
}
}
- public void setIndex(int bitIndex) {
+ public void setProjectIndex(int bitIndex) {
projectionIndex.set(bitIndex);
}
- public HBaseColumnDesc getHBaseColumn() {
- return hbaseColumn;
- }
-
public BitSet getProjectionIndex() {
return projectionIndex;
}
+ public HBaseColumnDesc getHBaseColumn() {
+ return hbaseColumn;
+ }
+
public Object[] getValues() {
return values;
}
@@ -109,6 +113,14 @@ public class RowValueDecoder implements Cloneable {
public MeasureDesc[] getMeasures() {
return measures;
}
+
+ // result is in order of <code>CubeDesc.getMeasures()</code>
+ public void loadCubeMeasureArray(Object result[]) {
+ int[] measureIndex = hbaseColumn.getMeasureIndex();
+ for (int i = 0; i < measureIndex.length; i++) {
+ result[measureIndex[i]] = values[i];
+ }
+ }
public boolean hasMemHungryCountDistinct() {
for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fe9e02cd/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
index d62da80..ba79305 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
@@ -73,11 +73,11 @@ public class RowValueDecoderTest extends LocalFileMetadataTestCase {
RowValueDecoder rowValueDecoder = new RowValueDecoder(hbaseCol);
for (MeasureDesc measure : cubeDesc.getMeasures()) {
FunctionDesc aggrFunc = measure.getFunction();
- int index = hbaseCol.findMeasureIndex(aggrFunc);
- rowValueDecoder.setIndex(index);
+ int index = hbaseCol.findMeasure(aggrFunc);
+ rowValueDecoder.setProjectIndex(index);
}
- rowValueDecoder.decode(valueBytes);
+ rowValueDecoder.decodeAndConvertJavaObj(valueBytes);
Object[] measureValues = rowValueDecoder.getValues();
//BigDecimal.ROUND_HALF_EVEN in BigDecimalSerializer
assertEquals("[333.1235, 333.1111, 333.2000, 2, 100]", Arrays.toString(measureValues));
[2/3] incubator-kylin git commit: KYLIN-1007 Generates cuboid file in
build and merge
Posted by li...@apache.org.
KYLIN-1007 Generates cuboid file in build and merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f156d138
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f156d138
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f156d138
Branch: refs/heads/2.x-staging
Commit: f156d138d34b58e5c620c0d1c8678029b5aa41a0
Parents: fe9e02c
Author: Li, Yang <ya...@ebay.com>
Authored: Thu Sep 10 14:31:03 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Sep 11 11:34:04 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/engine/mr/CubingJob.java | 24 +-
.../kylin/storage/hbase/HBaseStorage.java | 3 +-
.../hbase/steps/HBaseMROutput2Transition.java | 266 +++++++++++++++++++
.../kylin/storage/hbase/steps/HBaseMRSteps.java | 12 +-
4 files changed, 297 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f156d138/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index b2f0a06..ed5c036 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -21,6 +21,8 @@ package org.apache.kylin.engine.mr;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
@@ -179,11 +181,26 @@ public class CubingJob extends DefaultChainedExecutable {
}
public long findCubeSizeBytes() {
- return Long.parseLong(findExtraInfo(CUBE_SIZE_BYTES, "0"));
+ // look for the info BACKWARD, let the last step that claims the cube size win
+ return Long.parseLong(findExtraInfoBackward(CUBE_SIZE_BYTES, "0"));
+ }
+
+ public String findExtraInfo(String key, String dft) {
+ return findExtraInfo(key, dft, false);
}
- private String findExtraInfo(String key, String dft) {
- for (AbstractExecutable child : getTasks()) {
+ public String findExtraInfoBackward(String key, String dft) {
+ return findExtraInfo(key, dft, true);
+ }
+
+ private String findExtraInfo(String key, String dft, boolean backward) {
+ ArrayList<AbstractExecutable> tasks = new ArrayList<AbstractExecutable>(getTasks());
+
+ if (backward) {
+ Collections.reverse(tasks);
+ }
+
+ for (AbstractExecutable child : tasks) {
Output output = executableManager.getOutput(child.getId());
String value = output.getExtra().get(key);
if (value != null)
@@ -191,4 +208,5 @@ public class CubingJob extends DefaultChainedExecutable {
}
return dft;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f156d138/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
index 1d8c8c8..421f648 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
@@ -36,6 +36,7 @@ import org.apache.kylin.storage.cache.CacheFledgedDynamicQuery;
import org.apache.kylin.storage.cache.CacheFledgedStaticQuery;
import org.apache.kylin.storage.hbase.steps.HBaseMROutput;
import org.apache.kylin.storage.hbase.steps.HBaseMROutput2;
+import org.apache.kylin.storage.hbase.steps.HBaseMROutput2Transition;
import org.apache.kylin.storage.hybrid.HybridInstance;
import org.apache.kylin.storage.hybrid.HybridStorageQuery;
@@ -105,7 +106,7 @@ public class HBaseStorage implements IStorage {
if (engineInterface == IMROutput.class) {
return (I) new HBaseMROutput();
} else if (engineInterface == IMROutput2.class) {
- return (I) new HBaseMROutput2();
+ return (I) new HBaseMROutput2Transition();
} else {
throw new RuntimeException("Cannot adapt to " + engineInterface);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f156d138/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
new file mode 100644
index 0000000..047315b
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import com.google.common.collect.Lists;
+
+/**
+ * This "Transition" MR output generates cuboid files and then convert to HFile.
+ * The additional step slows down build process slightly, but the gains is merge
+ * can read from HDFS instead of over HBase region server. See KYLIN-1007.
+ *
+ * This is transitional because finally we want to merge from HTable snapshot.
+ * However MR input with multiple snapshots is only supported by HBase 1.x.
+ * Before most users upgrade to latest HBase, they can only use this transitional
+ * cuboid file solution.
+ */
+public class HBaseMROutput2Transition implements IMROutput2 {
+
+ @Override
+ public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
+ return new IMRBatchCubingOutputSide2() {
+ HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+ @Override
+ public IMRStorageOutputFormat getStorageOutputFormat() {
+ return new HBaseOutputFormat(seg);
+ }
+
+ @Override
+ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId()));
+ }
+
+ @Override
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+ String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
+
+ jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
+ jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+ }
+
+ @Override
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+ // nothing to do
+ }
+ };
+ }
+
+ @Override
+ public IMRBatchMergeInputSide2 getBatchMergeInputSide(final CubeSegment seg) {
+ return new IMRBatchMergeInputSide2() {
+ @Override
+ public IMRStorageInputFormat getStorageInputFormat() {
+ return new HBaseInputFormat(seg);
+ }
+ };
+ }
+
+ @Override
+ public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(final CubeSegment seg) {
+ return new IMRBatchMergeOutputSide2() {
+ HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+ @Override
+ public IMRStorageOutputFormat getStorageOutputFormat() {
+ return new HBaseOutputFormat(seg);
+ }
+
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId()));
+ }
+
+ @Override
+ public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) {
+ String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
+
+ jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
+ jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+ }
+
+ @Override
+ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.createMergeGCStep());
+ }
+ };
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private static class HBaseInputFormat implements IMRStorageInputFormat {
+ final Iterable<String> htables;
+
+ final RowValueDecoder[] rowValueDecoders;
+ final ByteArrayWritable parsedKey;
+ final Object[] parsedValue;
+ final Pair<ByteArrayWritable, Object[]> parsedPair;
+
+ public HBaseInputFormat(CubeSegment seg) {
+ this.htables = new HBaseMRSteps(seg).getMergingHTables();
+
+ List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
+ List<MeasureDesc> measuresDescs = Lists.newArrayList();
+ for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+ valueDecoderList.add(new RowValueDecoder(colDesc));
+ for (MeasureDesc measure : colDesc.getMeasures()) {
+ measuresDescs.add(measure);
+ }
+ }
+ }
+ this.rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]);
+
+ this.parsedKey = new ByteArrayWritable();
+ this.parsedValue = new Object[measuresDescs.size()];
+ this.parsedPair = new Pair<ByteArrayWritable, Object[]>(parsedKey, parsedValue);
+ }
+
+ @Override
+ public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException {
+ Configuration conf = job.getConfiguration();
+ HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+
+ List<Scan> scans = new ArrayList<Scan>();
+ for (String htable : htables) {
+ Scan scan = new Scan();
+ scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
+ scan.setCacheBlocks(false); // don't set to true for MR jobs
+ scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
+ scans.add(scan);
+ }
+ TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapper, outputKeyClass, outputValueClass, job);
+ }
+
+ @Override
+ public CubeSegment findSourceSegment(Context context, CubeInstance cubeInstance) throws IOException {
+ TableSplit currentSplit = (TableSplit) context.getInputSplit();
+ byte[] tableName = currentSplit.getTableName();
+ String htableName = Bytes.toString(tableName);
+
+ // decide which source segment
+ for (CubeSegment segment : cubeInstance.getSegments()) {
+ String segmentHtable = segment.getStorageLocationIdentifier();
+ if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
+ return segment;
+ }
+ }
+ throw new IllegalStateException("No merging segment's storage location identifier equals " + htableName);
+ }
+
+ @Override
+ public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue) {
+ ImmutableBytesWritable key = (ImmutableBytesWritable) inKey;
+ parsedKey.set(key.get(), key.getOffset(), key.getLength());
+
+ Result hbaseRow = (Result) inValue;
+ for (int i = 0; i < rowValueDecoders.length; i++) {
+ rowValueDecoders[i].decode(hbaseRow);
+ rowValueDecoders[i].loadCubeMeasureArray(parsedValue);
+ }
+
+ return parsedPair;
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private static class HBaseOutputFormat implements IMRStorageOutputFormat {
+ final CubeSegment seg;
+
+ Text outputKey;
+ Text outputValue;
+ ByteBuffer valueBuf;
+ MeasureCodec codec;
+
+ public HBaseOutputFormat(CubeSegment seg) {
+ this.seg = seg;
+ }
+
+ @Override
+ public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException {
+ job.setReducerClass(reducer);
+
+ // the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ Path output = new Path(new HBaseMRSteps(seg).getCuboidRootPath(jobFlowId));
+ FileOutputFormat.setOutputPath(job, output);
+
+ HadoopUtil.deletePath(job.getConfiguration(), output);
+ }
+
+ @Override
+ public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException {
+ // lazy init
+ if (outputKey == null) {
+ outputKey = new Text();
+ outputValue = new Text();
+ valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ codec = new MeasureCodec(seg.getCubeDesc().getMeasures());
+ }
+
+ outputKey.set(key.array(), key.offset(), key.length());
+
+ valueBuf.clear();
+ codec.encode(value, valueBuf);
+ outputValue.set(valueBuf.array(), 0, valueBuf.position());
+
+ context.write(outputKey, outputValue);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f156d138/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 64374e6..03b4361 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -24,16 +24,18 @@ public class HBaseMRSteps extends JobBuilderSupport {
String jobId = jobFlow.getId();
// calculate key distribution
- jobFlow.addTask(createRangeRowkeyDistributionStep(cuboidRootPath + "*", jobId));
+ jobFlow.addTask(createRangeRowkeyDistributionStep(cuboidRootPath, jobId));
// create htable step
jobFlow.addTask(createCreateHTableStep(jobId));
// generate hfiles step
- jobFlow.addTask(createConvertCuboidToHfileStep(cuboidRootPath + "*", jobId));
+ jobFlow.addTask(createConvertCuboidToHfileStep(cuboidRootPath, jobId));
// bulk load step
jobFlow.addTask(createBulkLoadStep(jobId));
}
- public MapReduceExecutable createRangeRowkeyDistributionStep(String inputPath, String jobId) {
+ public MapReduceExecutable createRangeRowkeyDistributionStep(String cuboidRootPath, String jobId) {
+ String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*";
+
MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
StringBuilder cmd = new StringBuilder();
@@ -73,7 +75,9 @@ public class HBaseMRSteps extends JobBuilderSupport {
return createHtableStep;
}
- public MapReduceExecutable createConvertCuboidToHfileStep(String inputPath, String jobId) {
+ public MapReduceExecutable createConvertCuboidToHfileStep(String cuboidRootPath, String jobId) {
+ String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*";
+
MapReduceExecutable createHFilesStep = new MapReduceExecutable();
createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
StringBuilder cmd = new StringBuilder();
[3/3] incubator-kylin git commit: KYLIN-1007 Merge from cuboid files
if available, and from HTable if otherwise
Posted by li...@apache.org.
KYLIN-1007 Merge from cuboid files if available, and from HTable if otherwise
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/acd8c0df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/acd8c0df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/acd8c0df
Branch: refs/heads/2.x-staging
Commit: acd8c0dfb6a3e7a422b00e943bf0c66d7c9de2b8
Parents: f156d13
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Sep 11 11:27:00 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Sep 11 11:34:07 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/engine/mr/IMROutput2.java | 5 +-
.../engine/mr/common/AbstractHadoopJob.java | 8 +-
.../mr/steps/MergeCuboidFromStorageMapper.java | 2 +-
.../engine/mr/steps/MergeCuboidMapper.java | 62 +++--
.../storage/hbase/steps/HBaseMROutput2.java | 19 +-
.../hbase/steps/HBaseMROutput2Transition.java | 237 ++++++++++++++-----
6 files changed, 230 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
index b4aff5d..3ad51c5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -8,7 +8,6 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
@@ -51,10 +50,10 @@ public interface IMROutput2 {
public interface IMRStorageInputFormat {
/** Configure MR mapper class and input file format. */
- public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException;
+ public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException;
/** Given a mapper context, figure out which segment the mapper reads from. */
- public CubeSegment findSourceSegment(Mapper.Context context, CubeInstance cubeInstance) throws IOException;
+ public CubeSegment findSourceSegment(Mapper.Context context) throws IOException;
/**
* Read in a row of cuboid. Given the input KV, de-serialize back cuboid ID, dimensions, and measures.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 50b0cc1..cd9393a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -200,8 +200,12 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
return classpath;
}
- public void addInputDirs(String input, Job job) throws IOException {
- for (String inp : StringSplitter.split(input, ",")) {
+ public static void addInputDirs(String input, Job job) throws IOException {
+ addInputDirs(StringSplitter.split(input, ","), job);
+ }
+
+ public static void addInputDirs(String[] inputs, Job job) throws IOException {
+ for (String inp : inputs) {
inp = inp.trim();
if (inp.endsWith("/*")) {
inp = inp.substring(0, inp.length() - 2);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index fa575ca..4598673 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -108,7 +108,7 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
newKeyBuf = new byte[256]; // size will auto-grow
- sourceCubeSegment = storageInputFormat.findSourceSegment(context, cube);
+ sourceCubeSegment = storageInputFormat.findSourceSegment(context);
logger.info("Source cube segment: " + sourceCubeSegment);
rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index a09ffe1..45f0d32 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -25,7 +25,6 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.BytesUtil;
@@ -66,8 +65,6 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
- private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
-
private Boolean checkNeedMerging(TblColRef col) throws IOException {
Boolean ret = dictsNeedMerging.get(col);
if (ret != null)
@@ -83,28 +80,6 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
}
}
- private String extractJobIDFromPath(String path) {
- Matcher matcher = JOB_NAME_PATTERN.matcher(path);
- // check the first occurance
- if (matcher.find()) {
- return matcher.group(1);
- } else {
- throw new IllegalStateException("Can not extract job ID from file path : " + path);
- }
- }
-
- private CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) {
- for (CubeSegment segment : cubeInstance.getSegments()) {
- String lastBuildJobID = segment.getLastBuildJobID();
- if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) {
- return segment;
- }
- }
-
- throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
-
- }
-
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.bindCurrentConfiguration(context.getConfiguration());
@@ -123,15 +98,38 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
newKeyBuf = new byte[256];// size will auto-grow
// decide which source segment
- InputSplit inputSplit = context.getInputSplit();
- String filePath = ((FileSplit) inputSplit).getPath().toString();
- System.out.println("filePath:" + filePath);
+ FileSplit fileSplit = (FileSplit) context.getInputSplit();
+ sourceCubeSegment = findSourceSegment(fileSplit, cube);
+
+ rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+ }
+
+ private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
+
+ public static CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) {
+ String filePath = fileSplit.getPath().toString();
String jobID = extractJobIDFromPath(filePath);
- System.out.println("jobID:" + jobID);
- sourceCubeSegment = findSegmentWithUuid(jobID, cube);
- System.out.println(sourceCubeSegment);
+ return findSegmentWithUuid(jobID, cube);
+ }
+
+ private static String extractJobIDFromPath(String path) {
+ Matcher matcher = JOB_NAME_PATTERN.matcher(path);
+ // check the first occurrence
+ if (matcher.find()) {
+ return matcher.group(1);
+ } else {
+ throw new IllegalStateException("Can not extract job ID from file path : " + path);
+ }
+ }
- this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+ private static CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) {
+ for (CubeSegment segment : cubeInstance.getSegments()) {
+ String lastBuildJobID = segment.getLastBuildJobID();
+ if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) {
+ return segment;
+ }
+ }
+ throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
index 1e414be..b468009 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.HBaseColumnDesc;
import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
@@ -129,16 +128,16 @@ public class HBaseMROutput2 implements IMROutput2 {
@SuppressWarnings({ "rawtypes", "unchecked" })
private static class HBaseInputFormat implements IMRStorageInputFormat {
- final Iterable<String> htables;
-
+ final CubeSegment seg;
+
final RowValueDecoder[] rowValueDecoders;
final ByteArrayWritable parsedKey;
final Object[] parsedValue;
final Pair<ByteArrayWritable, Object[]> parsedPair;
public HBaseInputFormat(CubeSegment seg) {
- this.htables = new HBaseMRSteps(seg).getMergingHTables();
-
+ this.seg = seg;
+
List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
List<MeasureDesc> measuresDescs = Lists.newArrayList();
for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
@@ -157,29 +156,29 @@ public class HBaseMROutput2 implements IMROutput2 {
}
@Override
- public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException {
+ public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException {
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
List<Scan> scans = new ArrayList<Scan>();
- for (String htable : htables) {
+ for (String htable : new HBaseMRSteps(seg).getMergingHTables()) {
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
scans.add(scan);
}
- TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapper, outputKeyClass, outputValueClass, job);
+ TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapperClz, outputKeyClz, outputValueClz, job);
}
@Override
- public CubeSegment findSourceSegment(Context context, CubeInstance cubeInstance) throws IOException {
+ public CubeSegment findSourceSegment(Context context) throws IOException {
TableSplit currentSplit = (TableSplit) context.getInputSplit();
byte[] tableName = currentSplit.getTableName();
String htableName = Bytes.toString(tableName);
// decide which source segment
- for (CubeSegment segment : cubeInstance.getSegments()) {
+ for (CubeSegment segment : seg.getCubeInstance().getSegments()) {
String segmentHtable = segment.getStorageLocationIdentifier();
if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
return segment;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/acd8c0df/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 047315b..237f0c9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
@@ -40,10 +41,11 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.HBaseColumnDesc;
@@ -51,24 +53,30 @@ import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.engine.mr.ByteArrayWritable;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.steps.MergeCuboidMapper;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.measure.MeasureCodec;
import org.apache.kylin.metadata.model.MeasureDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
/**
- * This "Transition" MR output generates cuboid files and then convert to HFile.
- * The additional step slows down build process slightly, but the gains is merge
+ * This "Transition" impl generates cuboid files and then convert to HFile.
+ * The additional step slows down build process, but the gains is merge
* can read from HDFS instead of over HBase region server. See KYLIN-1007.
*
* This is transitional because finally we want to merge from HTable snapshot.
- * However MR input with multiple snapshots is only supported by HBase 1.x.
+ * However multiple snapshots as MR input is only supported by HBase 1.x.
* Before most users upgrade to latest HBase, they can only use this transitional
* cuboid file solution.
*/
public class HBaseMROutput2Transition implements IMROutput2 {
+ private static final Logger logger = LoggerFactory.getLogger(HBaseMROutput2Transition.class);
+
@Override
public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
return new IMRBatchCubingOutputSide2() {
@@ -87,7 +95,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
@Override
public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
-
+
jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
}
@@ -127,7 +135,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
@Override
public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) {
String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
-
+
jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
}
@@ -141,67 +149,139 @@ public class HBaseMROutput2Transition implements IMROutput2 {
@SuppressWarnings({ "rawtypes", "unchecked" })
private static class HBaseInputFormat implements IMRStorageInputFormat {
- final Iterable<String> htables;
-
- final RowValueDecoder[] rowValueDecoders;
- final ByteArrayWritable parsedKey;
- final Object[] parsedValue;
- final Pair<ByteArrayWritable, Object[]> parsedPair;
+ final CubeSegment seg;
public HBaseInputFormat(CubeSegment seg) {
- this.htables = new HBaseMRSteps(seg).getMergingHTables();
-
- List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
- List<MeasureDesc> measuresDescs = Lists.newArrayList();
- for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- valueDecoderList.add(new RowValueDecoder(colDesc));
- for (MeasureDesc measure : colDesc.getMeasures()) {
- measuresDescs.add(measure);
- }
+ this.seg = seg;
+ }
+
+ @Override
+ public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException {
+ // merge by cuboid files
+ if (isMergeFromCuboidFiles(job.getConfiguration())) {
+ logger.info("Merge from cuboid files for " + seg);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ addCuboidInputDirs(job);
+
+ job.setMapperClass(mapperClz);
+ job.setMapOutputKeyClass(outputKeyClz);
+ job.setMapOutputValueClass(outputValueClz);
+ }
+ // merge from HTable scan
+ else {
+ logger.info("Merge from HTables for " + seg);
+
+ Configuration conf = job.getConfiguration();
+ HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+
+ List<Scan> scans = new ArrayList<Scan>();
+ for (String htable : new HBaseMRSteps(seg).getMergingHTables()) {
+ Scan scan = new Scan();
+ scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
+ scan.setCacheBlocks(false); // don't set to true for MR jobs
+ scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
+ scans.add(scan);
}
+ TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapperClz, outputKeyClz, outputValueClz, job);
}
- this.rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]);
-
- this.parsedKey = new ByteArrayWritable();
- this.parsedValue = new Object[measuresDescs.size()];
- this.parsedPair = new Pair<ByteArrayWritable, Object[]>(parsedKey, parsedValue);
}
- @Override
- public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException {
- Configuration conf = job.getConfiguration();
- HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
-
- List<Scan> scans = new ArrayList<Scan>();
- for (String htable : htables) {
- Scan scan = new Scan();
- scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
- scan.setCacheBlocks(false); // don't set to true for MR jobs
- scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
- scans.add(scan);
+ private void addCuboidInputDirs(Job job) throws IOException {
+ List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+ HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+ String[] inputs = new String[mergingSegments.size()];
+ for (int i = 0; i < mergingSegments.size(); i++) {
+ CubeSegment mergingSeg = mergingSegments.get(i);
+ String cuboidPath = steps.getCuboidRootPath(mergingSeg);
+ inputs[i] = cuboidPath + (cuboidPath.endsWith("/") ? "" : "/") + "*";
}
- TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapper, outputKeyClass, outputValueClass, job);
+
+ AbstractHadoopJob.addInputDirs(inputs, job);
}
@Override
- public CubeSegment findSourceSegment(Context context, CubeInstance cubeInstance) throws IOException {
- TableSplit currentSplit = (TableSplit) context.getInputSplit();
- byte[] tableName = currentSplit.getTableName();
- String htableName = Bytes.toString(tableName);
-
- // decide which source segment
- for (CubeSegment segment : cubeInstance.getSegments()) {
- String segmentHtable = segment.getStorageLocationIdentifier();
- if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
- return segment;
+ public CubeSegment findSourceSegment(Context context) throws IOException {
+ // merge by cuboid files
+ if (isMergeFromCuboidFiles(context.getConfiguration())) {
+ FileSplit fileSplit = (FileSplit) context.getInputSplit();
+ return MergeCuboidMapper.findSourceSegment(fileSplit, seg.getCubeInstance());
+ }
+ // merge from HTable scan
+ else {
+ TableSplit currentSplit = (TableSplit) context.getInputSplit();
+ byte[] tableName = currentSplit.getTableName();
+ String htableName = Bytes.toString(tableName);
+
+ // decide which source segment
+ for (CubeSegment segment : seg.getCubeInstance().getSegments()) {
+ String segmentHtable = segment.getStorageLocationIdentifier();
+ if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
+ return segment;
+ }
}
+ throw new IllegalStateException("No merging segment's storage location identifier equals " + htableName);
}
- throw new IllegalStateException("No merging segment's storage location identifier equals " + htableName);
}
+ transient ByteArrayWritable parsedKey;
+ transient Object[] parsedValue;
+ transient Pair<ByteArrayWritable, Object[]> parsedPair;
+
+ transient MeasureCodec measureCodec;
+ transient RowValueDecoder[] rowValueDecoders;
+
@Override
public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue) {
+ // lazy init
+ if (parsedPair == null) {
+ parsedKey = new ByteArrayWritable();
+ parsedValue = new Object[seg.getCubeDesc().getMeasures().size()];
+ parsedPair = new Pair<ByteArrayWritable, Object[]>(parsedKey, parsedValue);
+ }
+
+ // merge by cuboid files
+ if (isMergeFromCuboidFiles(null)) {
+ return parseMapperInputFromCuboidFile(inKey, inValue);
+ }
+ // merge from HTable scan
+ else {
+ return parseMapperInputFromHTable(inKey, inValue);
+ }
+ }
+
+ private Pair<ByteArrayWritable, Object[]> parseMapperInputFromCuboidFile(Object inKey, Object inValue) {
+ // lazy init
+ if (measureCodec == null) {
+ measureCodec = new MeasureCodec(seg.getCubeDesc().getMeasures());
+ }
+
+ Text key = (Text) inKey;
+ parsedKey.set(key.getBytes(), 0, key.getLength());
+
+ Text value = (Text) inValue;
+ measureCodec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), parsedValue);
+
+ return parsedPair;
+ }
+
+ private Pair<ByteArrayWritable, Object[]> parseMapperInputFromHTable(Object inKey, Object inValue) {
+ // lazy init
+ if (rowValueDecoders == null) {
+ List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
+ List<MeasureDesc> measuresDescs = Lists.newArrayList();
+ for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+ valueDecoderList.add(new RowValueDecoder(colDesc));
+ for (MeasureDesc measure : colDesc.getMeasures()) {
+ measuresDescs.add(measure);
+ }
+ }
+ }
+ rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]);
+ }
+
ImmutableBytesWritable key = (ImmutableBytesWritable) inKey;
parsedKey.set(key.get(), key.getOffset(), key.getLength());
@@ -213,6 +293,53 @@ public class HBaseMROutput2Transition implements IMROutput2 {
return parsedPair;
}
+
+ transient Boolean isMergeFromCuboidFiles;
+
+ // merge from cuboid files is better than merge from HTable, because no workload on HBase region server
+ private boolean isMergeFromCuboidFiles(Configuration jobConf) {
+ // cache in this object?
+ if (isMergeFromCuboidFiles != null)
+ return isMergeFromCuboidFiles.booleanValue();
+
+ final String confKey = "kylin.isMergeFromCuboidFiles";
+
+ // cache in job configuration?
+ if (jobConf != null) {
+ String result = jobConf.get(confKey);
+ if (result != null) {
+ isMergeFromCuboidFiles = Boolean.valueOf(result);
+ return isMergeFromCuboidFiles.booleanValue();
+ }
+ }
+
+ boolean result = true;
+
+ try {
+ List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+ HBaseMRSteps steps = new HBaseMRSteps(seg);
+ for (CubeSegment mergingSeg : mergingSegments) {
+ String cuboidRootPath = steps.getCuboidRootPath(mergingSeg);
+ FileSystem fs = HadoopUtil.getFileSystem(cuboidRootPath);
+
+ boolean cuboidFileExist = fs.exists(new Path(cuboidRootPath));
+ if (cuboidFileExist == false) {
+ logger.info("Merge from HTable because " + cuboidRootPath + " does not exist");
+ result = false;
+ break;
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ // put in cache
+ isMergeFromCuboidFiles = Boolean.valueOf(result);
+ if (jobConf != null) {
+ jobConf.set(confKey, "" + result);
+ }
+ return result;
+ }
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -236,10 +363,10 @@ public class HBaseMROutput2Transition implements IMROutput2 {
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
-
+
Path output = new Path(new HBaseMRSteps(seg).getCuboidRootPath(jobFlowId));
FileOutputFormat.setOutputPath(job, output);
-
+
HadoopUtil.deletePath(job.getConfiguration(), output);
}
@@ -254,11 +381,11 @@ public class HBaseMROutput2Transition implements IMROutput2 {
}
outputKey.set(key.array(), key.offset(), key.length());
-
+
valueBuf.clear();
codec.encode(value, valueBuf);
outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
+
context.write(outputKey, outputValue);
}
}