You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:04:39 UTC
[02/50] incubator-kylin git commit: KYLIN-630 add distinct column
mapper for II storage
KYLIN-630 add distinct column mapper for II storage
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1b52438e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1b52438e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1b52438e
Branch: refs/heads/streaming-localdict
Commit: 1b52438e2eec3dd271b66b6a6352ccf1bc0278d3
Parents: 8e0695b
Author: honma <ho...@ebay.com>
Authored: Thu Mar 26 16:03:30 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Mar 26 16:03:30 2015 +0800
----------------------------------------------------------------------
.../model/IIJoinedFlatTableDesc.java | 12 +-
.../apache/kylin/invertedindex/model/IIRow.java | 13 ++
.../org/apache/kylin/job/JoinedFlatTable.java | 1 -
.../kylin/job/hadoop/AbstractHadoopJob.java | 2 +-
.../kylin/job/hadoop/cube/CubeHFileMapper.java | 2 +-
.../kylin/job/hadoop/cube/CuboidReducer.java | 2 +-
.../job/hadoop/cube/FactDistinctColumnsJob.java | 2 +-
.../hadoop/cube/FactDistinctColumnsMapper.java | 129 -------------------
.../cube/FactDistinctColumnsMapperBase.java | 2 +-
.../hadoop/cube/FactDistinctColumnsReducer.java | 2 +-
.../cube/FactDistinctHiveColumnsMapper.java | 129 +++++++++++++++++++
.../cube/FactDistinctIIColumnsMapper.java | 129 +++++++++++++++++++
.../job/hadoop/cube/MergeCuboidMapper.java | 2 +-
.../kylin/job/hadoop/cube/NDCuboidMapper.java | 2 +-
.../job/hadoop/cube/NewBaseCuboidMapper.java | 2 +-
.../job/hadoop/cubev2/InMemCuboidMapper.java | 2 +-
.../job/hadoop/cubev2/InMemCuboidReducer.java | 2 +-
.../invertedindex/InvertedIndexMapper.java | 2 +-
.../invertedindex/InvertedIndexPartitioner.java | 2 +-
.../invertedindex/InvertedIndexReducer.java | 2 +-
.../metadata/model/IJoinedFlatTableDesc.java | 2 -
.../metadata/model/IntermediateColumnDesc.java | 4 +
.../endpoint/HbaseServerKVIterator.java | 9 +-
23 files changed, 296 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java
index 44114da..14934dc 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java
@@ -19,13 +19,13 @@
package org.apache.kylin.invertedindex.model;
import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import com.google.common.collect.Lists;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.IntermediateColumnDesc;
+import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.*;
+import com.google.common.collect.Lists;
/**
* Created by Hongbin Ma(Binmahone) on 12/30/14.
@@ -35,7 +35,6 @@ public class IIJoinedFlatTableDesc implements IJoinedFlatTableDesc {
private IIDesc iiDesc;
private String tableName;
private List<IntermediateColumnDesc> columnList = Lists.newArrayList();
- private Map<String, String> tableAliasMap;
public IIJoinedFlatTableDesc(IIDesc iiDesc) {
this.iiDesc = iiDesc;
@@ -57,6 +56,7 @@ public class IIJoinedFlatTableDesc implements IJoinedFlatTableDesc {
return tableName + "_" + jobUUID.replace("-", "_");
}
+ @Override
public List<IntermediateColumnDesc> getColumnList() {
return columnList;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java
index aba4fff..f3d398a 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java
@@ -34,7 +34,9 @@
package org.apache.kylin.invertedindex.model;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.util.BytesUtil;
/**
* Created by qianzhou on 3/10/15.
@@ -50,6 +52,7 @@ public final class IIRow {
this.value = value;
this.dictionary = dictionary;
}
+
public IIRow() {
this(new ImmutableBytesWritable(), new ImmutableBytesWritable(), new ImmutableBytesWritable());
}
@@ -61,7 +64,17 @@ public final class IIRow {
public ImmutableBytesWritable getValue() {
return value;
}
+
public ImmutableBytesWritable getDictionary() {
return dictionary;
}
+
+ public void updateWith(Cell c) {
+ if (BytesUtil.compareBytes(IIDesc.HBASE_QUALIFIER_BYTES, 0, c.getQualifierArray(), c.getQualifierOffset(), IIDesc.HBASE_QUALIFIER_BYTES.length) == 0) {
+ this.getKey().set(c.getRowArray(), c.getRowOffset(), c.getRowLength());
+ this.getValue().set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+ } else if (BytesUtil.compareBytes(IIDesc.HBASE_DICTIONARY_BYTES, 0, c.getQualifierArray(), c.getQualifierOffset(), IIDesc.HBASE_DICTIONARY_BYTES.length) == 0) {
+ this.getDictionary().set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index cc3dc1b..100fbca 100644
--- a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -27,7 +27,6 @@ import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
-import org.apache.kylin.cube.model.DimensionDesc;
import org.w3c.dom.Document;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index 038fe2f..9f73488 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -330,7 +330,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
return input.getSplits(job).size();
}
- public static KylinConfig loadKylinPropsAndMetadata(Configuration conf) throws IOException {
+ public static KylinConfig loadKylinPropsAndMetadata() throws IOException {
File metaDir = new File("meta");
System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
index 17dc24e..1236f8c 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
@@ -62,7 +62,7 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWrita
super.publishConfiguration(context.getConfiguration());
cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
CubeManager cubeMgr = CubeManager.getInstance(config);
cubeDesc = cubeMgr.getCube(cubeName).getDescriptor();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
index 7181fa1..b747dff 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
@@ -64,7 +64,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
super.publishConfiguration(context.getConfiguration());
cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
measuresDescs = cubeDesc.getMeasures();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
index 094014e..17c5e9b 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
@@ -101,7 +101,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
dbTableNames[1]);
job.setInputFormatClass(HCatInputFormat.class);
- job.setMapperClass(FactDistinctColumnsMapper.class);
+ job.setMapperClass(FactDistinctHiveColumnsMapper.class);
job.setCombinerClass(FactDistinctColumnsCombiner.class);
job.setMapOutputKeyClass(ShortWritable.class);
job.setMapOutputValueClass(Text.class);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
deleted file mode 100644
index 3a50249..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.lookup.HiveTableReader;
-import org.apache.kylin.job.constant.BatchConstants;
-
-import com.google.common.collect.Lists;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, HCatRecord> {
-
- private HCatSchema schema = null;
- private CubeJoinedFlatTableDesc intermediateTableDesc;
-
- protected boolean collectStatistics = false;
- protected CuboidScheduler cuboidScheduler = null;
- protected List<String> rowKeyValues = null;
- protected HyperLogLogPlusCounter hll;
- protected int nRowKey;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.setup(context);
-
- schema = HCatInputFormat.getTableSchema(context.getConfiguration());
- intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
-
-
- collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
- if (collectStatistics) {
- cuboidScheduler = new CuboidScheduler(cubeDesc);
- hll = new HyperLogLogPlusCounter(16);
- rowKeyValues = Lists.newArrayList();
- nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
- }
- }
-
- @Override
- public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
- try {
- int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
- HCatFieldSchema fieldSchema;
- for (int i : factDictCols) {
- outputKey.set((short) i);
- fieldSchema = schema.get(flatTableIndexes[i]);
- Object fieldValue = record.get(fieldSchema.getName(), schema);
- if (fieldValue == null)
- continue;
- byte[] bytes = Bytes.toBytes(fieldValue.toString());
- outputValue.set(bytes, 0, bytes.length);
- context.write(outputKey, outputValue);
- }
- } catch (Exception ex) {
- handleErrorRecord(record, ex);
- }
-
- if (collectStatistics) {
- String[] row = HiveTableReader.getRowAsStringArray(record);
- putRowKeyToHLL(row, baseCuboidId);
- }
- }
-
- private void putRowKeyToHLL(String[] row, long cuboidId) {
- rowKeyValues.clear();
- long mask = Long.highestOneBit(baseCuboidId);
- for (int i = 0; i < nRowKey; i++) {
- if ((mask & cuboidId) == 1) {
- rowKeyValues.add(row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]);
- }
- mask = mask >> 1;
- }
-
- String key = StringUtils.join(rowKeyValues, ",");
- hll.add(key);
-
- Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
- for (Long childId : children) {
- putRowKeyToHLL(row, childId);
- }
-
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- if (collectStatistics) {
- // output hll to reducer, key is -1
- // keyBuf = Bytes.toBytes(-1);
- outputKey.set((short) -1);
- ByteBuffer hllBuf = ByteBuffer.allocate(64 * 1024);
- hll.writeRegisters(hllBuf);
- outputValue.set(hllBuf.array());
- context.write(outputKey, outputValue);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
index 603277c..c0455ff 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
@@ -40,7 +40,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
protected void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
publishConfiguration(conf);
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
cube = CubeManager.getInstance(config).getCube(cubeName);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index 383def4..2052d08 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -61,7 +61,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text
super.publishConfiguration(context.getConfiguration());
Configuration conf = context.getConfiguration();
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
CubeDesc cubeDesc = cube.getDescriptor();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
new file mode 100644
index 0000000..64ae353
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cube;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.dict.lookup.HiveTableReader;
+import org.apache.kylin.job.constant.BatchConstants;
+
+import com.google.common.collect.Lists;
+
+/**
+ * @author yangli9
+ */
+public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, HCatRecord> {
+
+ private HCatSchema schema = null;
+ private CubeJoinedFlatTableDesc intermediateTableDesc;
+
+ protected boolean collectStatistics = false;
+ protected CuboidScheduler cuboidScheduler = null;
+ protected List<String> rowKeyValues = null;
+ protected HyperLogLogPlusCounter hll;
+ protected int nRowKey;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.setup(context);
+
+ schema = HCatInputFormat.getTableSchema(context.getConfiguration());
+ intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+
+
+ collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
+ if (collectStatistics) {
+ cuboidScheduler = new CuboidScheduler(cubeDesc);
+ hll = new HyperLogLogPlusCounter(16);
+ rowKeyValues = Lists.newArrayList();
+ nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+ }
+ }
+
+ @Override
+ public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
+ try {
+ int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
+ HCatFieldSchema fieldSchema;
+ for (int i : factDictCols) {
+ outputKey.set((short) i);
+ fieldSchema = schema.get(flatTableIndexes[i]);
+ Object fieldValue = record.get(fieldSchema.getName(), schema);
+ if (fieldValue == null)
+ continue;
+ byte[] bytes = Bytes.toBytes(fieldValue.toString());
+ outputValue.set(bytes, 0, bytes.length);
+ context.write(outputKey, outputValue);
+ }
+ } catch (Exception ex) {
+ handleErrorRecord(record, ex);
+ }
+
+ if (collectStatistics) {
+ String[] row = HiveTableReader.getRowAsStringArray(record);
+ putRowKeyToHLL(row, baseCuboidId);
+ }
+ }
+
+ private void putRowKeyToHLL(String[] row, long cuboidId) {
+ rowKeyValues.clear();
+ long mask = Long.highestOneBit(baseCuboidId);
+ for (int i = 0; i < nRowKey; i++) {
+ if ((mask & cuboidId) == 1) {
+ rowKeyValues.add(row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]);
+ }
+ mask = mask >> 1;
+ }
+
+ String key = StringUtils.join(rowKeyValues, ",");
+ hll.add(key);
+
+ Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
+ for (Long childId : children) {
+ putRowKeyToHLL(row, childId);
+ }
+
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ if (collectStatistics) {
+ // output hll to reducer, key is -1
+ // keyBuf = Bytes.toBytes(-1);
+ outputKey.set((short) -1);
+ ByteBuffer hllBuf = ByteBuffer.allocate(64 * 1024);
+ hll.writeRegisters(hllBuf);
+ outputValue.set(hllBuf.array());
+ context.write(outputKey, outputValue);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
new file mode 100644
index 0000000..75e127e
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cube;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.*;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.IntermediateColumnDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
+/**
+ * @author yangli9
+ */
+public class FactDistinctIIColumnsMapper extends FactDistinctColumnsMapperBase<ImmutableBytesWritable, Result> {
+
+ private IIJoinedFlatTableDesc intermediateTableDesc;
+ private ArrayList<IIRow> buffer = Lists.newArrayList();
+ private Iterable<Slice> slices;
+
+ private String iiName;
+ private IIInstance ii;
+ private IIDesc iiDesc;
+
+ private int[] baseCuboidCol2FlattenTableCol;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.setup(context);
+
+ Configuration conf = context.getConfiguration();
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ iiName = conf.get(BatchConstants.CFG_II_NAME);
+ ii = IIManager.getInstance(config).getII(iiName);
+ iiDesc = ii.getDescriptor();
+
+ intermediateTableDesc = new IIJoinedFlatTableDesc(iiDesc);
+ TableRecordInfo info = new TableRecordInfo(iiDesc);
+ KeyValueCodec codec = new IIKeyValueCodecWithState(info.getDigest());
+ slices = codec.decodeKeyValue(buffer);
+
+ baseCuboidCol2FlattenTableCol = new int[factDictCols.size()];
+ for (int i = 0; i < factDictCols.size(); ++i) {
+ int index = findTblCol(intermediateTableDesc.getColumnList(), columns.get(factDictCols.get(i)));
+ baseCuboidCol2FlattenTableCol[i] = index;
+ }
+ }
+
+ private int findTblCol(List<IntermediateColumnDesc> columns, final TblColRef col) {
+ return Iterators.indexOf(columns.iterator(), new Predicate<IntermediateColumnDesc>() {
+ @Override
+ public boolean apply(IntermediateColumnDesc input) {
+ return input.getColRef().equals(col);
+ }
+ });
+ }
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result cells, Context context) throws IOException, InterruptedException {
+ IIRow iiRow = new IIRow();
+ for (Cell c : cells.rawCells()) {
+ iiRow.updateWith(c);
+ }
+ buffer.add(iiRow);
+
+ if (slices.iterator().hasNext()) {
+ byte[] vBytesBuffer = null;
+ Slice slice = slices.iterator().next();
+
+ for (RawTableRecord record : slice) {
+ for (int i = 0; i < factDictCols.size(); ++i) {
+ int baseCuboidIndex = factDictCols.get(i);
+ outputKey.set((short) baseCuboidIndex);
+ int indexInRecord = baseCuboidCol2FlattenTableCol[i];
+
+ Dictionary<?> dictionary = slice.getLocalDictionaries().get(indexInRecord);
+ if (vBytesBuffer == null || dictionary.getSizeOfValue() > vBytesBuffer.length) {
+ vBytesBuffer = new byte[dictionary.getSizeOfValue() * 2];
+ }
+
+ int vid = record.getValueID(baseCuboidIndex);
+ if (vid == dictionary.nullId()) {
+ continue;
+ }
+ int vBytesSize = dictionary.getValueBytesFromId(vid, vBytesBuffer, 0);
+
+ outputValue.set(vBytesBuffer, 0, vBytesSize);
+ context.write(outputKey, outputValue);
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
index 431f2b7..417e996 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
@@ -114,7 +114,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
- config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
+ config = AbstractHadoopJob.loadKylinPropsAndMetadata();
cubeManager = CubeManager.getInstance(config);
cube = cubeManager.getCube(cubeName);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java
index dc65baa..e476bd7 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java
@@ -68,7 +68,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
index e75457e..79c334c 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
@@ -125,7 +125,7 @@ public class NewBaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, T
cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
metadataManager = MetadataManager.getInstance(config);
cube = CubeManager.getInstance(config).getCube(cubeName);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
index a58369f..5a3565a 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
@@ -81,7 +81,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Tex
Configuration conf = context.getConfiguration();
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
cube = CubeManager.getInstance(config).getCube(cubeName);
cubeDesc = cube.getDescriptor();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java
index de2539c..48fe3a1 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java
@@ -44,7 +44,7 @@ public class InMemCuboidReducer extends KylinReducer<Text, Text, Text, Text> {
super.publishConfiguration(context.getConfiguration());
cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
measuresDescs = cubeDesc.getMeasures();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
index 735a945..0344043 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
@@ -58,7 +58,7 @@ public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, L
Configuration conf = context.getConfiguration();
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
IIManager mgr = IIManager.getInstance(config);
IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
index 141565f..fa4dccf 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
@@ -54,7 +54,7 @@ public class InvertedIndexPartitioner extends Partitioner<LongWritable, Immutabl
public void setConf(Configuration conf) {
this.conf = conf;
try {
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
IIManager mgr = IIManager.getInstance(config);
IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
index a1c0811..9f238b0 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
@@ -53,7 +53,7 @@ public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBy
super.publishConfiguration(context.getConfiguration());
Configuration conf = context.getConfiguration();
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
IIManager mgr = IIManager.getInstance(config);
IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index 917e12b..abf87b7 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -20,8 +20,6 @@ package org.apache.kylin.metadata.model;
import java.util.List;
-import org.apache.kylin.metadata.model.DataModelDesc;
-
/**
* Created by Hongbin Ma(Binmahone) on 12/30/14.
*/
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java b/metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java
index 860773c..1b931a0 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java
@@ -36,6 +36,10 @@ public class IntermediateColumnDesc {
return id;
}
+ public TblColRef getColRef() {
+ return this.colRef;
+ }
+
public String getColumnName() {
return colRef.getName();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
index a7b67d8..5d9f633 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
@@ -63,7 +63,6 @@ public class HbaseServerKVIterator implements Iterable<IIRow>, Closeable {
this.regionScanner = innerScanner;
}
-
@Override
public boolean hasNext() {
return hasMore;
@@ -81,12 +80,7 @@ public class HbaseServerKVIterator implements Iterable<IIRow>, Closeable {
throw new IllegalStateException("Hbase row contains less than 1 cell");
}
for (Cell c : results) {
- if (BytesUtil.compareBytes(IIDesc.HBASE_QUALIFIER_BYTES, 0, c.getQualifierArray(), c.getQualifierOffset(), IIDesc.HBASE_QUALIFIER_BYTES.length) == 0) {
- row.getKey().set(c.getRowArray(), c.getRowOffset(), c.getRowLength());
- row.getValue().set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
- } else if (BytesUtil.compareBytes(IIDesc.HBASE_DICTIONARY_BYTES, 0, c.getQualifierArray(), c.getQualifierOffset(), IIDesc.HBASE_DICTIONARY_BYTES.length) == 0) {
- row.getDictionary().set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
- }
+ row.updateWith(c);
}
return row;
}
@@ -97,7 +91,6 @@ public class HbaseServerKVIterator implements Iterable<IIRow>, Closeable {
}
}
-
@Override
public Iterator<IIRow> iterator() {
return new IIRowIterator(innerScanner);