You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/11/01 03:40:22 UTC
[kylin] branch 2.5.x updated: KYLIN-3520 Deal with NULL values of
measures for inmem cubing
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch 2.5.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/2.5.x by this push:
new 493dc25 KYLIN-3520 Deal with NULL values of measures for inmem cubing
493dc25 is described below
commit 493dc25cee5dc9ac2083e8fcdb4ba95e9ac372c2
Author: Zhong <nj...@apache.org>
AuthorDate: Wed Aug 29 17:31:27 2018 +0800
KYLIN-3520 Deal with NULL values of measures for inmem cubing
---
.../inmemcubing/InputConverterUnitForRawData.java | 76 ++-------------
.../apache/kylin/cube/util/KeyValueBuilder.java | 106 +++++++++++++++++++++
.../kylin/engine/mr/common/BaseCuboidBuilder.java | 93 ++++--------------
3 files changed, 132 insertions(+), 143 deletions(-)
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
index 2ff7ee0..f3e45bc 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
@@ -17,25 +17,20 @@
*/
package org.apache.kylin.cube.inmemcubing;
-import java.util.List;
import java.util.Map;
-import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.cube.util.KeyValueBuilder;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-
/**
*/
public class InputConverterUnitForRawData implements InputConverterUnit<String[]> {
@@ -51,7 +46,7 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
private final MeasureIngester<?>[] measureIngesters;
private final int measureCount;
private final Map<TblColRef, Dictionary<String>> dictionaryMap;
- protected List<byte[]> nullBytes;
+ private final KeyValueBuilder kvBuilder;
public InputConverterUnitForRawData(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc,
Map<TblColRef, Dictionary<String>> dictionaryMap) {
@@ -60,12 +55,12 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
this.measureIngesters = MeasureIngester.create(cubeDesc.getMeasures());
this.dictionaryMap = dictionaryMap;
- initNullBytes(cubeDesc);
+ this.kvBuilder = new KeyValueBuilder(this.flatDesc);
}
@Override
public final void convert(String[] row, GTRecord record) {
- Object[] dimensions = buildKey(row);
+ Object[] dimensions = kvBuilder.buildKey(row);
Object[] metricsValues = buildValue(row);
Object[] recordValues = new Object[dimensions.length + metricsValues.length];
System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
@@ -93,20 +88,6 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
return CUT_ROW;
}
- private Object[] buildKey(String[] row) {
- int keySize = flatDesc.getRowKeyColumnIndexes().length;
- Object[] key = new Object[keySize];
-
- for (int i = 0; i < keySize; i++) {
- key[i] = row[flatDesc.getRowKeyColumnIndexes()[i]];
- if (key[i] != null && isNull(Bytes.toBytes((String) key[i]))) {
- key[i] = null;
- }
- }
-
- return key;
- }
-
@Override
public boolean ifChange() {
return true;
@@ -115,53 +96,10 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
private Object[] buildValue(String[] row) {
Object[] values = new Object[measureCount];
for (int i = 0; i < measureCount; i++) {
- values[i] = buildValueOf(i, row);
+ String[] colValues = kvBuilder.buildValueOf(i, row);
+ MeasureDesc measure = measureDescs[i];
+ values[i] = measureIngesters[i].valueOf(colValues, measure, dictionaryMap);
}
return values;
}
-
- private Object buildValueOf(int idxOfMeasure, String[] row) {
- MeasureDesc measure = measureDescs[idxOfMeasure];
- FunctionDesc function = measure.getFunction();
- int[] colIdxOnFlatTable = flatDesc.getMeasureColumnIndexes()[idxOfMeasure];
-
- int paramCount = function.getParameterCount();
- String[] inputToMeasure = new String[paramCount];
-
- // pick up parameter values
- ParameterDesc param = function.getParameter();
- int paramColIdx = 0; // index among parameters of column type
- for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
- String value;
- if (function.isCount()) {
- value = "1";
- } else if (param.isColumnType()) {
- value = row[colIdxOnFlatTable[paramColIdx++]];
- } else {
- value = param.getValue();
- }
- inputToMeasure[i] = value;
- }
-
- return measureIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
- }
-
- private void initNullBytes(CubeDesc cubeDesc) {
- nullBytes = Lists.newArrayList();
- String[] nullStrings = cubeDesc.getNullStrings();
- if (nullStrings != null) {
- for (String s : nullStrings) {
- nullBytes.add(Bytes.toBytes(s));
- }
- }
- }
-
- private boolean isNull(byte[] v) {
- for (byte[] nullByte : nullBytes) {
- if (Bytes.equals(v, nullByte))
- return true;
- }
- return false;
- }
-
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/KeyValueBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/util/KeyValueBuilder.java
new file mode 100644
index 0000000..0ba4fd8
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/KeyValueBuilder.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.util;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class KeyValueBuilder implements Serializable {
+
+ public static final String HIVE_NULL = "\\N";
+
+ private Set<String> nullStrs;
+ private CubeJoinedFlatTableEnrich flatDesc;
+ private CubeDesc cubeDesc;
+
+ public KeyValueBuilder(CubeJoinedFlatTableEnrich intermediateTableDesc) {
+ flatDesc = intermediateTableDesc;
+ cubeDesc = flatDesc.getCubeDesc();
+ initNullStrings();
+ }
+
+ private void initNullStrings() {
+ nullStrs = Sets.newHashSet();
+ nullStrs.add(HIVE_NULL);
+ String[] nullStrings = cubeDesc.getNullStrings();
+ if (nullStrings != null) {
+ for (String s : nullStrings) {
+ nullStrs.add(s);
+ }
+ }
+ }
+
+ protected boolean isNull(String v) {
+ return nullStrs.contains(v);
+ }
+
+ private String getCell(int i, String[] flatRow) {
+ if (isNull(flatRow[i]))
+ return null;
+ else
+ return flatRow[i];
+ }
+
+ public String[] buildKey(String[] row) {
+ int keySize = flatDesc.getRowKeyColumnIndexes().length;
+ String[] key = new String[keySize];
+
+ for (int i = 0; i < keySize; i++) {
+ key[i] = getCell(flatDesc.getRowKeyColumnIndexes()[i], row);
+ }
+
+ return key;
+ }
+
+ public String[] buildValueOf(int idxOfMeasure, String[] row) {
+ MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure);
+ FunctionDesc function = measure.getFunction();
+ int[] colIdxOnFlatTable = flatDesc.getMeasureColumnIndexes()[idxOfMeasure];
+
+ int paramCount = function.getParameterCount();
+ List<String> inputToMeasure = Lists.newArrayListWithExpectedSize(paramCount);
+
+ // pick up parameter values
+ ParameterDesc param = function.getParameter();
+ int colParamIdx = 0; // index among parameters of column type
+ for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
+ String value;
+ if (function.isCount()) {
+ value = "1";
+ } else if (param.isColumnType()) {
+ value = getCell(colIdxOnFlatTable[colParamIdx++], row);
+ } else {
+ value = param.getValue();
+ }
+ inputToMeasure.add(value);
+ }
+
+ return inputToMeasure.toArray(new String[inputToMeasure.size()]);
+ }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
index 13bc688..9322162 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
@@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr.common;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Dictionary;
@@ -30,35 +29,31 @@ import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.cube.util.KeyValueBuilder;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
-
/**
*/
@SuppressWarnings("serial")
public class BaseCuboidBuilder implements java.io.Serializable {
protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidBuilder.class);
- protected String cubeName;
- protected Cuboid baseCuboid;
+ protected KylinConfig kylinConfig;
protected CubeDesc cubeDesc;
protected CubeSegment cubeSegment;
- protected Set<String> nullStrs;
protected CubeJoinedFlatTableEnrich intermediateTableDesc;
protected MeasureIngester<?>[] aggrIngesters;
protected Map<TblColRef, Dictionary<String>> dictionaryMap;
protected AbstractRowKeyEncoder rowKeyEncoder;
+ protected List<MeasureDesc> measureDescList;
protected BufferedMeasureCodec measureCodec;
+ protected KeyValueBuilder kvBuilder;
- protected KylinConfig kylinConfig;
public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, CubeJoinedFlatTableEnrich intermediateTableDesc,
AbstractRowKeyEncoder rowKeyEncoder, MeasureIngester<?>[] aggrIngesters, Map<TblColRef, Dictionary<String>> dictionaryMap) {
@@ -66,12 +61,14 @@ public class BaseCuboidBuilder implements java.io.Serializable {
this.cubeDesc = cubeDesc;
this.cubeSegment = cubeSegment;
this.intermediateTableDesc = intermediateTableDesc;
+ this.dictionaryMap = dictionaryMap;
this.rowKeyEncoder = rowKeyEncoder;
this.aggrIngesters = aggrIngesters;
- this.dictionaryMap = dictionaryMap;
- init();
- measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
+ measureDescList = cubeDesc.getMeasures();
+ measureCodec = new BufferedMeasureCodec(measureDescList);
+
+ kvBuilder = new KeyValueBuilder(intermediateTableDesc);
}
public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment,
@@ -82,39 +79,19 @@ public class BaseCuboidBuilder implements java.io.Serializable {
this.intermediateTableDesc = intermediateTableDesc;
this.dictionaryMap = dictionaryMap;
- init();
+ Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc);
rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
- measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
- aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
- }
- private void init() {
- baseCuboid = Cuboid.getBaseCuboid(cubeDesc);
- initNullBytes();
- }
+ measureDescList = cubeDesc.getMeasures();
+ aggrIngesters = MeasureIngester.create(measureDescList);
+ measureCodec = new BufferedMeasureCodec(measureDescList);
- private void initNullBytes() {
- nullStrs = Sets.newHashSet();
- String[] nullStrings = cubeDesc.getNullStrings();
- if (nullStrings != null) {
- for (String s : nullStrings) {
- nullStrs.add(s);
- }
- }
- }
-
- protected boolean isNull(String v) {
- return nullStrs.contains(v);
+ kvBuilder = new KeyValueBuilder(intermediateTableDesc);
}
public byte[] buildKey(String[] flatRow) {
- int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
- List<TblColRef> columns = baseCuboid.getColumns();
- String[] colValues = new String[columns.size()];
- for (int i = 0; i < columns.size(); i++) {
- colValues[i] = getCell(rowKeyColumnIndexes[i], flatRow);
- }
- return rowKeyEncoder.encode(colValues);
+ String[] colKeys = kvBuilder.buildKey(flatRow);
+ return rowKeyEncoder.encode(colKeys);
}
public ByteBuffer buildValue(String[] flatRow) {
@@ -124,7 +101,9 @@ public class BaseCuboidBuilder implements java.io.Serializable {
public Object[] buildValueObjects(String[] flatRow) {
Object[] measures = new Object[cubeDesc.getMeasures().size()];
for (int i = 0; i < measures.length; i++) {
- measures[i] = buildValueOf(i, flatRow);
+ String[] colValues = kvBuilder.buildValueOf(i, flatRow);
+ MeasureDesc measure = measureDescList.get(i);
+ measures[i] = aggrIngesters[i].valueOf(colValues, measure, dictionaryMap);
}
return measures;
@@ -135,38 +114,4 @@ public class BaseCuboidBuilder implements java.io.Serializable {
aggrIngesters[i].reset();
}
}
-
- private Object buildValueOf(int idxOfMeasure, String[] flatRow) {
- MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure);
- FunctionDesc function = measure.getFunction();
- int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure];
-
- int paramCount = function.getParameterCount();
- String[] inputToMeasure = new String[paramCount];
-
- // pick up parameter values
- ParameterDesc param = function.getParameter();
- int colParamIdx = 0; // index among parameters of column type
- for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
- String value;
- if (function.isCount()) {
- value = "1";
- } else if (param.isColumnType()) {
- value = getCell(colIdxOnFlatTable[colParamIdx++], flatRow);
- } else {
- value = param.getValue();
- }
- inputToMeasure[i] = value;
- }
-
- return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
- }
-
- private String getCell(int i, String[] flatRow) {
- if (isNull(flatRow[i]))
- return null;
- else
- return flatRow[i];
- }
-
}