You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/21 03:34:33 UTC

incubator-kylin git commit: KYLIN-775 simplify the cubing reducer flow

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 35e0b9d56 -> e01982aa5


KYLIN-775 simplify the cubing reducer flow

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e01982aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e01982aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e01982aa

Branch: refs/heads/0.8.0
Commit: e01982aa53722f427a64e3ac66c8121d489ab2b8
Parents: 35e0b9d
Author: shaofengshi <sh...@apache.org>
Authored: Wed May 20 18:37:11 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed May 20 20:29:08 2015 +0800

----------------------------------------------------------------------
 .../job/hadoop/cubev2/InMemCuboidReducer.java   | 41 ++++-------
 .../job/hadoop/cubev2/InMemKeyValueCreator.java | 77 ++++++++++++++++++++
 .../hadoop/cubev2/MapContextGTRecordWriter.java | 22 +-----
 .../apache/kylin/storage/cube/CubeScanner.java  |  5 +-
 .../storage/cube/CuboidToGridTableMapping.java  |  6 +-
 5 files changed, 98 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e01982aa/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java
index 7e024bc..cd4c877 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java
@@ -7,13 +7,11 @@ import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.mr.KylinReducer;
 import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.hadoop.cube.KeyValueCreator;
 import org.apache.kylin.metadata.measure.MeasureAggregators;
 import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -21,7 +19,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.List;
 
 /**
@@ -37,12 +34,8 @@ public class InMemCuboidReducer extends KylinReducer<ImmutableBytesWritable, Tex
     private Object[] input;
     private Object[] result;
 
-    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
-    List<KeyValueCreator> keyValueCreators;
-    private boolean simpleFullCopy = false;
+    List<InMemKeyValueCreator> keyValueCreators;
     private int nColumns = 0;
-//    private Text keyText = new Text();
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -52,7 +45,15 @@ public class InMemCuboidReducer extends KylinReducer<ImmutableBytesWritable, Tex
         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
         CubeDesc cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
-        List<MeasureDesc> measuresDescs = cubeDesc.getMeasures();
+
+        List<MeasureDesc> measuresDescs = Lists.newArrayList();
+        for (HBaseColumnFamilyDesc familyDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
+            for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+                for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
+                    measuresDescs.add(measure);
+                }
+            }
+        }
 
         codec = new MeasureCodec(measuresDescs);
         aggs = new MeasureAggregators(measuresDescs);
@@ -62,20 +63,20 @@ public class InMemCuboidReducer extends KylinReducer<ImmutableBytesWritable, Tex
 
         keyValueCreators = Lists.newArrayList();
 
+        int startPosition = 0;
         for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
             for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
+                keyValueCreators.add(new InMemKeyValueCreator(colDesc, startPosition));
+                startPosition += colDesc.getMeasures().length;
             }
         }
 
-        simpleFullCopy = (keyValueCreators.size() == 1 && keyValueCreators.get(0).isFullCopy);
         nColumns = keyValueCreators.size();
     }
 
     @Override
     public void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 
-//        keyText.set(key.get());
         aggs.reset();
 
         for (Text value : values) {
@@ -86,21 +87,9 @@ public class InMemCuboidReducer extends KylinReducer<ImmutableBytesWritable, Tex
 
         KeyValue outputValue;
 
-        if (simpleFullCopy) { // shortcut for
-            // simple full copy
-
-            valueBuf.clear();
-            codec.encode(result, valueBuf);
-            outputValue = keyValueCreators.get(0).create(key.get(), 0, key.getLength(), valueBuf.array(), 0, valueBuf.position());
+        for (int i = 0; i < nColumns; i++) {
+            outputValue = keyValueCreators.get(i).create(key.get(), 0, key.getLength(), result);
             context.write(key, outputValue);
-
-        } else { // normal (complex) case that distributes measures to multiple
-            // HBase columns
-
-            for (int i = 0; i < nColumns; i++) {
-                outputValue = keyValueCreators.get(i).create(key.get(), 0, key.getLength(), result);
-                context.write(key, outputValue);
-            }
         }
         counter++;
         if (counter % BatchConstants.COUNTER_MAX == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e01982aa/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemKeyValueCreator.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemKeyValueCreator.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemKeyValueCreator.java
new file mode 100644
index 0000000..bc0eba0
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemKeyValueCreator.java
@@ -0,0 +1,77 @@
+package org.apache.kylin.job.hadoop.cubev2;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ */
+public class InMemKeyValueCreator {
+    byte[] cfBytes;
+    byte[] qBytes;
+    long timestamp;
+
+
+    MeasureCodec codec;
+    Object[] colValues;
+    ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+    int startPosition = 0;
+
+    public InMemKeyValueCreator(HBaseColumnDesc colDesc, int startPosition) {
+
+        cfBytes = Bytes.toBytes(colDesc.getColumnFamilyName());
+        qBytes = Bytes.toBytes(colDesc.getQualifier());
+        timestamp = System.currentTimeMillis();
+
+        List<MeasureDesc> measures = Lists.newArrayList();
+        for (MeasureDesc measure : colDesc.getMeasures()) {
+            measures.add(measure);
+        }
+        codec = new MeasureCodec(measures);
+        colValues = new Object[measures.size()];
+
+        this.startPosition = startPosition;
+
+    }
+
+    public KeyValue create(Text key, Object[] measureValues) {
+        return create(key.getBytes(), 0, key.getLength(), measureValues);
+    }
+
+    public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, Object[] measureValues) {
+        for (int i = 0; i < colValues.length; i++) {
+            colValues[i] = measureValues[startPosition + i];
+        }
+
+        valueBuf.clear();
+        codec.encode(colValues, valueBuf);
+
+        return create(keyBytes, keyOffset, keyLength, valueBuf.array(), 0, valueBuf.position());
+    }
+
+
+    public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, byte[] value, int voffset, int vlen) {
+        return new KeyValue(keyBytes, keyOffset, keyLength, //
+                cfBytes, 0, cfBytes.length, //
+                qBytes, 0, qBytes.length, //
+                timestamp, KeyValue.Type.Put, //
+                value, voffset, vlen);
+    }
+
+    public KeyValue create(Text key, byte[] value, int voffset, int vlen) {
+        return create(key.getBytes(), 0, key.getLength(), value, voffset, vlen);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e01982aa/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
index 008819e..b8e1ffe 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
@@ -41,32 +41,12 @@ public class MapContextGTRecordWriter implements IGTRecordWriter {
     private ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
     private Text outputValue = new Text();
     private long cuboidRowCount = 0;
-    private int[] hbaseMeasureRefIndex;
 
     public MapContextGTRecordWriter(MapContext<?, ?, ImmutableBytesWritable, Text> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
         this.mapContext = mapContext;
         this.cubeDesc = cubeDesc;
         this.cubeSegment = cubeSegment;
         this.measureCount = cubeDesc.getMeasures().size();
-        hbaseMeasureRefIndex = new int[measureCount];
-
-        List<MeasureDesc> hbaseMeasureList = Lists.newArrayList();
-        for (HBaseColumnFamilyDesc familyDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
-                for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
-                    hbaseMeasureList.add(measure);
-                }
-            }
-        }
-
-        for (int i = 0; i < measureCount; i++) {
-            for (int j = 0; j < measureCount; j++) {
-                if (cubeDesc.getMeasures().get(i).equals(hbaseMeasureList.get(j))) {
-                    hbaseMeasureRefIndex[i] = j;
-                    break;
-                }
-            }
-        }
 
     }
 
@@ -113,7 +93,7 @@ public class MapContextGTRecordWriter implements IGTRecordWriter {
         dimensions = BitSet.valueOf(new long[]{cuboidId}).cardinality();
         measureColumnsIndex = new int[measureCount];
         for (int i = 0; i < measureCount; i++) {
-            measureColumnsIndex[i] = dimensions + hbaseMeasureRefIndex[i];
+            measureColumnsIndex[i] = dimensions + i;
         }
 
         System.arraycopy(Bytes.toBytes(cuboidId), 0, keyBuf, 0, RowConstants.ROWKEY_CUBOIDID_LEN);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e01982aa/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
index 5782b30..9751135 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
@@ -62,9 +62,8 @@ public class CubeScanner implements IGTScanner {
         BitSet result = new BitSet();
         for (TblColRef dim : dimensions) {
             int idx = mapping.getIndexOf(dim);
-            if (idx < 0)
-                throw new IllegalStateException(dim + " not found in " + mapping);
-            result.set(idx);
+            if (idx >= 0)
+                result.set(idx);
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e01982aa/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java b/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
index bf5aa68..51079c4 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
@@ -62,10 +62,10 @@ public class CuboidToGridTableMapping {
                 BitSet colBlock = new BitSet();
                 for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
                     // count distinct & holistic count distinct are equals() but different
-                    // assert the holistic version if exists always comes later
+                    // assert the holistic version if exists always comes earlier
                     FunctionDesc func = measure.getFunction();
                     if (func.isHolisticCountDistinct()) {
-                        if (metrics2gt.get(func).size() != 1)
+                        if (metrics2gt.get(func).size() > 0 )
                             throw new IllegalStateException();
                     }
                     gtDataTypes.add(func.getReturnDataType());
@@ -118,7 +118,7 @@ public class CuboidToGridTableMapping {
         // count distinct & its holistic version
         else if (list.size() == 2) {
             assert metric.isCountDistinct();
-            return metric.isHolisticCountDistinct() ? list.get(1) : list.get(0);
+            return metric.isHolisticCountDistinct() ? list.get(0) : list.get(1);
         }
         // unexpected
         else