You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/11/01 03:40:22 UTC

[kylin] branch 2.5.x updated: KYLIN-3520 Deal with NULL values of measures for inmem cubing

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch 2.5.x
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/2.5.x by this push:
     new 493dc25  KYLIN-3520 Deal with NULL values of measures for inmem cubing
493dc25 is described below

commit 493dc25cee5dc9ac2083e8fcdb4ba95e9ac372c2
Author: Zhong <nj...@apache.org>
AuthorDate: Wed Aug 29 17:31:27 2018 +0800

    KYLIN-3520 Deal with NULL values of measures for inmem cubing
---
 .../inmemcubing/InputConverterUnitForRawData.java  |  76 ++-------------
 .../apache/kylin/cube/util/KeyValueBuilder.java    | 106 +++++++++++++++++++++
 .../kylin/engine/mr/common/BaseCuboidBuilder.java  |  93 ++++--------------
 3 files changed, 132 insertions(+), 143 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
index 2ff7ee0..f3e45bc 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
@@ -17,25 +17,20 @@
 */
 package org.apache.kylin.cube.inmemcubing;
 
-import java.util.List;
 import java.util.Map;
 
-import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.cube.util.KeyValueBuilder;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-
 /**
  */
 public class InputConverterUnitForRawData implements InputConverterUnit<String[]> {
@@ -51,7 +46,7 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
     private final MeasureIngester<?>[] measureIngesters;
     private final int measureCount;
     private final Map<TblColRef, Dictionary<String>> dictionaryMap;
-    protected List<byte[]> nullBytes;
+    private final KeyValueBuilder kvBuilder;
 
     public InputConverterUnitForRawData(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc,
             Map<TblColRef, Dictionary<String>> dictionaryMap) {
@@ -60,12 +55,12 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
         this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
         this.measureIngesters = MeasureIngester.create(cubeDesc.getMeasures());
         this.dictionaryMap = dictionaryMap;
-        initNullBytes(cubeDesc);
+        this.kvBuilder = new KeyValueBuilder(this.flatDesc);
     }
 
     @Override
     public final void convert(String[] row, GTRecord record) {
-        Object[] dimensions = buildKey(row);
+        Object[] dimensions = kvBuilder.buildKey(row);
         Object[] metricsValues = buildValue(row);
         Object[] recordValues = new Object[dimensions.length + metricsValues.length];
         System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
@@ -93,20 +88,6 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
         return CUT_ROW;
     }
 
-    private Object[] buildKey(String[] row) {
-        int keySize = flatDesc.getRowKeyColumnIndexes().length;
-        Object[] key = new Object[keySize];
-
-        for (int i = 0; i < keySize; i++) {
-            key[i] = row[flatDesc.getRowKeyColumnIndexes()[i]];
-            if (key[i] != null && isNull(Bytes.toBytes((String) key[i]))) {
-                key[i] = null;
-            }
-        }
-
-        return key;
-    }
-
     @Override
     public boolean ifChange() {
         return true;
@@ -115,53 +96,10 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
     private Object[] buildValue(String[] row) {
         Object[] values = new Object[measureCount];
         for (int i = 0; i < measureCount; i++) {
-            values[i] = buildValueOf(i, row);
+            String[] colValues = kvBuilder.buildValueOf(i, row);
+            MeasureDesc measure = measureDescs[i];
+            values[i] = measureIngesters[i].valueOf(colValues, measure, dictionaryMap);
         }
         return values;
     }
-
-    private Object buildValueOf(int idxOfMeasure, String[] row) {
-        MeasureDesc measure = measureDescs[idxOfMeasure];
-        FunctionDesc function = measure.getFunction();
-        int[] colIdxOnFlatTable = flatDesc.getMeasureColumnIndexes()[idxOfMeasure];
-
-        int paramCount = function.getParameterCount();
-        String[] inputToMeasure = new String[paramCount];
-
-        // pick up parameter values
-        ParameterDesc param = function.getParameter();
-        int paramColIdx = 0; // index among parameters of column type
-        for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
-            String value;
-            if (function.isCount()) {
-                value = "1";
-            } else if (param.isColumnType()) {
-                value = row[colIdxOnFlatTable[paramColIdx++]];
-            } else {
-                value = param.getValue();
-            }
-            inputToMeasure[i] = value;
-        }
-
-        return measureIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
-    }
-
-    private void initNullBytes(CubeDesc cubeDesc) {
-        nullBytes = Lists.newArrayList();
-        String[] nullStrings = cubeDesc.getNullStrings();
-        if (nullStrings != null) {
-            for (String s : nullStrings) {
-                nullBytes.add(Bytes.toBytes(s));
-            }
-        }
-    }
-
-    private boolean isNull(byte[] v) {
-        for (byte[] nullByte : nullBytes) {
-            if (Bytes.equals(v, nullByte))
-                return true;
-        }
-        return false;
-    }
-
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/KeyValueBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/util/KeyValueBuilder.java
new file mode 100644
index 0000000..0ba4fd8
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/KeyValueBuilder.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.util;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class KeyValueBuilder implements Serializable {
+
+    public static final String HIVE_NULL = "\\N";
+
+    private Set<String> nullStrs;
+    private CubeJoinedFlatTableEnrich flatDesc;
+    private CubeDesc cubeDesc;
+
+    public KeyValueBuilder(CubeJoinedFlatTableEnrich intermediateTableDesc) {
+        flatDesc = intermediateTableDesc;
+        cubeDesc = flatDesc.getCubeDesc();
+        initNullStrings();
+    }
+
+    private void initNullStrings() {
+        nullStrs = Sets.newHashSet();
+        nullStrs.add(HIVE_NULL);
+        String[] nullStrings = cubeDesc.getNullStrings();
+        if (nullStrings != null) {
+            for (String s : nullStrings) {
+                nullStrs.add(s);
+            }
+        }
+    }
+
+    protected boolean isNull(String v) {
+        return nullStrs.contains(v);
+    }
+
+    private String getCell(int i, String[] flatRow) {
+        if (isNull(flatRow[i]))
+            return null;
+        else
+            return flatRow[i];
+    }
+
+    public String[] buildKey(String[] row) {
+        int keySize = flatDesc.getRowKeyColumnIndexes().length;
+        String[] key = new String[keySize];
+
+        for (int i = 0; i < keySize; i++) {
+            key[i] = getCell(flatDesc.getRowKeyColumnIndexes()[i], row);
+        }
+
+        return key;
+    }
+
+    public String[] buildValueOf(int idxOfMeasure, String[] row) {
+        MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure);
+        FunctionDesc function = measure.getFunction();
+        int[] colIdxOnFlatTable = flatDesc.getMeasureColumnIndexes()[idxOfMeasure];
+
+        int paramCount = function.getParameterCount();
+        List<String> inputToMeasure = Lists.newArrayListWithExpectedSize(paramCount);
+
+        // pick up parameter values
+        ParameterDesc param = function.getParameter();
+        int colParamIdx = 0; // index among parameters of column type
+        for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
+            String value;
+            if (function.isCount()) {
+                value = "1";
+            } else if (param.isColumnType()) {
+                value = getCell(colIdxOnFlatTable[colParamIdx++], row);
+            } else {
+                value = param.getValue();
+            }
+            inputToMeasure.add(value);
+        }
+
+        return inputToMeasure.toArray(new String[inputToMeasure.size()]);
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
index 13bc688..9322162 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
@@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr.common;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Dictionary;
@@ -30,35 +29,31 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.cube.util.KeyValueBuilder;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Sets;
-
 /**
  */
 @SuppressWarnings("serial")
 public class BaseCuboidBuilder implements java.io.Serializable {
 
     protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidBuilder.class);
-    protected String cubeName;
-    protected Cuboid baseCuboid;
+    protected KylinConfig kylinConfig;
     protected CubeDesc cubeDesc;
     protected CubeSegment cubeSegment;
-    protected Set<String> nullStrs;
     protected CubeJoinedFlatTableEnrich intermediateTableDesc;
     protected MeasureIngester<?>[] aggrIngesters;
     protected Map<TblColRef, Dictionary<String>> dictionaryMap;
     protected AbstractRowKeyEncoder rowKeyEncoder;
+    protected List<MeasureDesc> measureDescList;
     protected BufferedMeasureCodec measureCodec;
+    protected KeyValueBuilder kvBuilder;
 
-    protected KylinConfig kylinConfig;
 
     public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, CubeJoinedFlatTableEnrich intermediateTableDesc,
                              AbstractRowKeyEncoder rowKeyEncoder, MeasureIngester<?>[] aggrIngesters, Map<TblColRef, Dictionary<String>> dictionaryMap) {
@@ -66,12 +61,14 @@ public class BaseCuboidBuilder implements java.io.Serializable {
         this.cubeDesc = cubeDesc;
         this.cubeSegment = cubeSegment;
         this.intermediateTableDesc = intermediateTableDesc;
+        this.dictionaryMap = dictionaryMap;
         this.rowKeyEncoder = rowKeyEncoder;
         this.aggrIngesters = aggrIngesters;
-        this.dictionaryMap = dictionaryMap;
 
-        init();
-        measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
+        measureDescList = cubeDesc.getMeasures();
+        measureCodec = new BufferedMeasureCodec(measureDescList);
+
+        kvBuilder = new KeyValueBuilder(intermediateTableDesc);
     }
 
     public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment,
@@ -82,39 +79,19 @@ public class BaseCuboidBuilder implements java.io.Serializable {
         this.intermediateTableDesc = intermediateTableDesc;
         this.dictionaryMap = dictionaryMap;
 
-        init();
+        Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc);
         rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
-        measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
-        aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
-    }
 
-    private void init() {
-        baseCuboid = Cuboid.getBaseCuboid(cubeDesc);
-        initNullBytes();
-    }
+        measureDescList = cubeDesc.getMeasures();
+        aggrIngesters = MeasureIngester.create(measureDescList);
+        measureCodec = new BufferedMeasureCodec(measureDescList);
 
-    private void initNullBytes() {
-        nullStrs = Sets.newHashSet();
-        String[] nullStrings = cubeDesc.getNullStrings();
-        if (nullStrings != null) {
-            for (String s : nullStrings) {
-                nullStrs.add(s);
-            }
-        }
-    }
-
-    protected boolean isNull(String v) {
-        return nullStrs.contains(v);
+        kvBuilder = new KeyValueBuilder(intermediateTableDesc);
     }
 
     public byte[] buildKey(String[] flatRow) {
-        int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
-        List<TblColRef> columns = baseCuboid.getColumns();
-        String[] colValues = new String[columns.size()];
-        for (int i = 0; i < columns.size(); i++) {
-            colValues[i] = getCell(rowKeyColumnIndexes[i], flatRow);
-        }
-        return rowKeyEncoder.encode(colValues);
+        String[] colKeys = kvBuilder.buildKey(flatRow);
+        return rowKeyEncoder.encode(colKeys);
     }
 
     public ByteBuffer buildValue(String[] flatRow) {
@@ -124,7 +101,9 @@ public class BaseCuboidBuilder implements java.io.Serializable {
     public Object[] buildValueObjects(String[] flatRow) {
         Object[] measures = new Object[cubeDesc.getMeasures().size()];
         for (int i = 0; i < measures.length; i++) {
-            measures[i] = buildValueOf(i, flatRow);
+            String[] colValues = kvBuilder.buildValueOf(i, flatRow);
+            MeasureDesc measure = measureDescList.get(i);
+            measures[i] = aggrIngesters[i].valueOf(colValues, measure, dictionaryMap);
         }
 
         return measures;
@@ -135,38 +114,4 @@ public class BaseCuboidBuilder implements java.io.Serializable {
             aggrIngesters[i].reset();
         }
     }
-
-    private Object buildValueOf(int idxOfMeasure, String[] flatRow) {
-        MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure);
-        FunctionDesc function = measure.getFunction();
-        int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure];
-
-        int paramCount = function.getParameterCount();
-        String[] inputToMeasure = new String[paramCount];
-
-        // pick up parameter values
-        ParameterDesc param = function.getParameter();
-        int colParamIdx = 0; // index among parameters of column type
-        for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
-            String value;
-            if (function.isCount()) {
-                value = "1";
-            } else if (param.isColumnType()) {
-                value = getCell(colIdxOnFlatTable[colParamIdx++], flatRow);
-            } else {
-                value = param.getValue();
-            }
-            inputToMeasure[i] = value;
-        }
-
-        return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
-    }
-
-    private String getCell(int i, String[] flatRow) {
-        if (isNull(flatRow[i]))
-            return null;
-        else
-            return flatRow[i];
-    }
-
 }