You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/21 03:34:33 UTC
incubator-kylin git commit: KYLIN-775 simplify the cubing reducer flow
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 35e0b9d56 -> e01982aa5
KYLIN-775 simplify the cubing reducer flow
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e01982aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e01982aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e01982aa
Branch: refs/heads/0.8.0
Commit: e01982aa53722f427a64e3ac66c8121d489ab2b8
Parents: 35e0b9d
Author: shaofengshi <sh...@apache.org>
Authored: Wed May 20 18:37:11 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed May 20 20:29:08 2015 +0800
----------------------------------------------------------------------
.../job/hadoop/cubev2/InMemCuboidReducer.java | 41 ++++-------
.../job/hadoop/cubev2/InMemKeyValueCreator.java | 77 ++++++++++++++++++++
.../hadoop/cubev2/MapContextGTRecordWriter.java | 22 +-----
.../apache/kylin/storage/cube/CubeScanner.java | 5 +-
.../storage/cube/CuboidToGridTableMapping.java | 6 +-
5 files changed, 98 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e01982aa/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java
index 7e024bc..cd4c877 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java
@@ -7,13 +7,11 @@ import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.mr.KylinReducer;
import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.HBaseColumnDesc;
import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.hadoop.cube.KeyValueCreator;
import org.apache.kylin.metadata.measure.MeasureAggregators;
import org.apache.kylin.metadata.measure.MeasureCodec;
import org.apache.kylin.metadata.model.MeasureDesc;
@@ -21,7 +19,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.List;
/**
@@ -37,12 +34,8 @@ public class InMemCuboidReducer extends KylinReducer<ImmutableBytesWritable, Tex
private Object[] input;
private Object[] result;
- private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
- List<KeyValueCreator> keyValueCreators;
- private boolean simpleFullCopy = false;
+ List<InMemKeyValueCreator> keyValueCreators;
private int nColumns = 0;
-// private Text keyText = new Text();
@Override
protected void setup(Context context) throws IOException {
@@ -52,7 +45,15 @@ public class InMemCuboidReducer extends KylinReducer<ImmutableBytesWritable, Tex
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
CubeDesc cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
- List<MeasureDesc> measuresDescs = cubeDesc.getMeasures();
+
+ List<MeasureDesc> measuresDescs = Lists.newArrayList();
+ for (HBaseColumnFamilyDesc familyDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+ for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
+ measuresDescs.add(measure);
+ }
+ }
+ }
codec = new MeasureCodec(measuresDescs);
aggs = new MeasureAggregators(measuresDescs);
@@ -62,20 +63,20 @@ public class InMemCuboidReducer extends KylinReducer<ImmutableBytesWritable, Tex
keyValueCreators = Lists.newArrayList();
+ int startPosition = 0;
for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
+ keyValueCreators.add(new InMemKeyValueCreator(colDesc, startPosition));
+ startPosition += colDesc.getMeasures().length;
}
}
- simpleFullCopy = (keyValueCreators.size() == 1 && keyValueCreators.get(0).isFullCopy);
nColumns = keyValueCreators.size();
}
@Override
public void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-// keyText.set(key.get());
aggs.reset();
for (Text value : values) {
@@ -86,21 +87,9 @@ public class InMemCuboidReducer extends KylinReducer<ImmutableBytesWritable, Tex
KeyValue outputValue;
- if (simpleFullCopy) { // shortcut for
- // simple full copy
-
- valueBuf.clear();
- codec.encode(result, valueBuf);
- outputValue = keyValueCreators.get(0).create(key.get(), 0, key.getLength(), valueBuf.array(), 0, valueBuf.position());
+ for (int i = 0; i < nColumns; i++) {
+ outputValue = keyValueCreators.get(i).create(key.get(), 0, key.getLength(), result);
context.write(key, outputValue);
-
- } else { // normal (complex) case that distributes measures to multiple
- // HBase columns
-
- for (int i = 0; i < nColumns; i++) {
- outputValue = keyValueCreators.get(i).create(key.get(), 0, key.getLength(), result);
- context.write(key, outputValue);
- }
}
counter++;
if (counter % BatchConstants.COUNTER_MAX == 0) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e01982aa/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemKeyValueCreator.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemKeyValueCreator.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemKeyValueCreator.java
new file mode 100644
index 0000000..bc0eba0
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemKeyValueCreator.java
@@ -0,0 +1,77 @@
+package org.apache.kylin.job.hadoop.cubev2;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ */
+public class InMemKeyValueCreator {
+ byte[] cfBytes;
+ byte[] qBytes;
+ long timestamp;
+
+
+ MeasureCodec codec;
+ Object[] colValues;
+ ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+ int startPosition = 0;
+
+ public InMemKeyValueCreator(HBaseColumnDesc colDesc, int startPosition) {
+
+ cfBytes = Bytes.toBytes(colDesc.getColumnFamilyName());
+ qBytes = Bytes.toBytes(colDesc.getQualifier());
+ timestamp = System.currentTimeMillis();
+
+ List<MeasureDesc> measures = Lists.newArrayList();
+ for (MeasureDesc measure : colDesc.getMeasures()) {
+ measures.add(measure);
+ }
+ codec = new MeasureCodec(measures);
+ colValues = new Object[measures.size()];
+
+ this.startPosition = startPosition;
+
+ }
+
+ public KeyValue create(Text key, Object[] measureValues) {
+ return create(key.getBytes(), 0, key.getLength(), measureValues);
+ }
+
+ public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, Object[] measureValues) {
+ for (int i = 0; i < colValues.length; i++) {
+ colValues[i] = measureValues[startPosition + i];
+ }
+
+ valueBuf.clear();
+ codec.encode(colValues, valueBuf);
+
+ return create(keyBytes, keyOffset, keyLength, valueBuf.array(), 0, valueBuf.position());
+ }
+
+
+ public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, byte[] value, int voffset, int vlen) {
+ return new KeyValue(keyBytes, keyOffset, keyLength, //
+ cfBytes, 0, cfBytes.length, //
+ qBytes, 0, qBytes.length, //
+ timestamp, KeyValue.Type.Put, //
+ value, voffset, vlen);
+ }
+
+ public KeyValue create(Text key, byte[] value, int voffset, int vlen) {
+ return create(key.getBytes(), 0, key.getLength(), value, voffset, vlen);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e01982aa/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
index 008819e..b8e1ffe 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
@@ -41,32 +41,12 @@ public class MapContextGTRecordWriter implements IGTRecordWriter {
private ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
private Text outputValue = new Text();
private long cuboidRowCount = 0;
- private int[] hbaseMeasureRefIndex;
public MapContextGTRecordWriter(MapContext<?, ?, ImmutableBytesWritable, Text> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
this.mapContext = mapContext;
this.cubeDesc = cubeDesc;
this.cubeSegment = cubeSegment;
this.measureCount = cubeDesc.getMeasures().size();
- hbaseMeasureRefIndex = new int[measureCount];
-
- List<MeasureDesc> hbaseMeasureList = Lists.newArrayList();
- for (HBaseColumnFamilyDesc familyDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
- for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
- hbaseMeasureList.add(measure);
- }
- }
- }
-
- for (int i = 0; i < measureCount; i++) {
- for (int j = 0; j < measureCount; j++) {
- if (cubeDesc.getMeasures().get(i).equals(hbaseMeasureList.get(j))) {
- hbaseMeasureRefIndex[i] = j;
- break;
- }
- }
- }
}
@@ -113,7 +93,7 @@ public class MapContextGTRecordWriter implements IGTRecordWriter {
dimensions = BitSet.valueOf(new long[]{cuboidId}).cardinality();
measureColumnsIndex = new int[measureCount];
for (int i = 0; i < measureCount; i++) {
- measureColumnsIndex[i] = dimensions + hbaseMeasureRefIndex[i];
+ measureColumnsIndex[i] = dimensions + i;
}
System.arraycopy(Bytes.toBytes(cuboidId), 0, keyBuf, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e01982aa/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
index 5782b30..9751135 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
@@ -62,9 +62,8 @@ public class CubeScanner implements IGTScanner {
BitSet result = new BitSet();
for (TblColRef dim : dimensions) {
int idx = mapping.getIndexOf(dim);
- if (idx < 0)
- throw new IllegalStateException(dim + " not found in " + mapping);
- result.set(idx);
+ if (idx >= 0)
+ result.set(idx);
}
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e01982aa/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java b/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
index bf5aa68..51079c4 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
@@ -62,10 +62,10 @@ public class CuboidToGridTableMapping {
BitSet colBlock = new BitSet();
for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
// count distinct & holistic count distinct are equals() but different
- // assert the holistic version if exists always comes later
+ // assert the holistic version if exists always comes earlier
FunctionDesc func = measure.getFunction();
if (func.isHolisticCountDistinct()) {
- if (metrics2gt.get(func).size() != 1)
+ if (metrics2gt.get(func).size() > 0 )
throw new IllegalStateException();
}
gtDataTypes.add(func.getReturnDataType());
@@ -118,7 +118,7 @@ public class CuboidToGridTableMapping {
// count distinct & its holistic version
else if (list.size() == 2) {
assert metric.isCountDistinct();
- return metric.isHolisticCountDistinct() ? list.get(1) : list.get(0);
+ return metric.isHolisticCountDistinct() ? list.get(0) : list.get(1);
}
// unexpected
else