You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2017/02/14 03:19:03 UTC
[08/11] kylin git commit: KYLIN-2442 calculate raw data size using
custom counter
KYLIN-2442 calculate raw data size using custom counter
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/405dee26
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/405dee26
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/405dee26
Branch: refs/heads/KYLIN-2428
Commit: 405dee26d7fe463d15bf5f1d7690359c9e83f678
Parents: 43c0566
Author: Li Yang <li...@apache.org>
Authored: Fri Feb 10 16:56:21 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Mon Feb 13 10:20:54 2017 +0800
----------------------------------------------------------------------
.../kylin/engine/mr/BatchCubingJobBuilder.java | 1 -
.../kylin/engine/mr/common/HadoopCmdOutput.java | 9 +-
.../engine/mr/common/MapReduceExecutable.java | 8 +-
.../mr/steps/FactDistinctColumnPartitioner.java | 4 +-
.../engine/mr/steps/FactDistinctColumnsJob.java | 2 +-
.../mr/steps/FactDistinctColumnsMapper.java | 262 +++++++++++++++++++
.../mr/steps/FactDistinctHiveColumnsMapper.java | 230 ----------------
7 files changed, 272 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 36c12a1..1ec23b6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -100,7 +100,6 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
baseCuboidStep.setMapReduceParams(cmd.toString());
baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
- baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
return baseCuboidStep;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
index 9d016cc..2a480e6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.RawDataCounter;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +68,7 @@ public class HadoopCmdOutput {
private String mapInputRecords;
private String hdfsBytesWritten;
- private String hdfsBytesRead;
+ private String rawInputBytesRead;
public String getMapInputRecords() {
return mapInputRecords;
@@ -77,8 +78,8 @@ public class HadoopCmdOutput {
return hdfsBytesWritten;
}
- public String getHdfsBytesRead() {
- return hdfsBytesRead;
+ public String getRawInputBytesRead() {
+ return rawInputBytesRead;
}
public void updateJobCounter() {
@@ -95,7 +96,7 @@ public class HadoopCmdOutput {
mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue());
hdfsBytesWritten = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue());
- hdfsBytesRead = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_READ").getValue());
+ rawInputBytesRead = String.valueOf(counters.findCounter(RawDataCounter.BYTES).getValue());
} catch (Exception e) {
logger.error(e.getLocalizedMessage(), e);
output.append(e.getLocalizedMessage());
http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 6de07ca..2e7a289 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -202,14 +202,14 @@ public class MapReduceExecutable extends AbstractExecutable {
private void readCounters(final HadoopCmdOutput hadoopCmdOutput, final Map<String, String> info) {
hadoopCmdOutput.updateJobCounter();
info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords());
- info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead());
+ info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getRawInputBytesRead());
info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten());
String saveAs = getParam(KEY_COUNTER_SAVEAS);
if (saveAs != null) {
String[] saveAsNames = saveAs.split(",");
saveCounterAs(hadoopCmdOutput.getMapInputRecords(), saveAsNames, 0, info);
- saveCounterAs(hadoopCmdOutput.getHdfsBytesRead(), saveAsNames, 1, info);
+ saveCounterAs(hadoopCmdOutput.getRawInputBytesRead(), saveAsNames, 1, info);
saveCounterAs(hadoopCmdOutput.getHdfsBytesWritten(), saveAsNames, 2, info);
}
}
@@ -244,10 +244,6 @@ public class MapReduceExecutable extends AbstractExecutable {
setParam(KEY_PARAMS, param);
}
- public String getCounterSaveAs() {
- return getParam(KEY_COUNTER_SAVEAS);
- }
-
public void setCounterSaveAs(String value) {
setParam(KEY_COUNTER_SAVEAS, value);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index e8817a5..5fcfe42 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -29,10 +29,10 @@ public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortabl
@Override
public int getPartition(SelfDefineSortableKey skey, Text value, int numReduceTasks) {
Text key = skey.getText();
- if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_HLL) {
+ if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_HLL) {
// the last reducer is for merging hll
return numReduceTasks - 1;
- } else if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_PARTITION_COL) {
+ } else if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_PARTITION_COL) {
// the last but one reducer is for partition col
return numReduceTasks - 2;
} else {
http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index aded600..ee0989a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -140,7 +140,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
flatTableInputFormat.configureJob(job);
- job.setMapperClass(FactDistinctHiveColumnsMapper.class);
+ job.setMapperClass(FactDistinctColumnsMapper.class);
job.setCombinerClass(FactDistinctColumnsCombiner.class);
job.setMapOutputKeyClass(SelfDefineSortableKey.class);
job.setMapOutputValueClass(Text.class);
http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
new file mode 100644
index 0000000..9d0ff10
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+/**
+ */
+public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> {
+
+ private static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsMapper.class);
+
+ public static enum RawDataCounter { BYTES };
+
+ protected boolean collectStatistics = false;
+ protected CuboidScheduler cuboidScheduler = null;
+ protected int nRowKey;
+ private Integer[][] allCuboidsBitSet = null;
+ private HLLCounter[] allCuboidsHLL = null;
+ private Long[] cuboidIds;
+ private HashFunction hf = null;
+ private int rowCount = 0;
+ private int samplingPercentage;
+ private ByteArray[] row_hashcodes = null;
+ private ByteBuffer tmpbuf;
+ private static final Text EMPTY_TEXT = new Text();
+ public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
+ public static final byte MARK_FOR_HLL = (byte) 0xFF;
+
+ private int partitionColumnIndex = -1;
+ private boolean needFetchPartitionCol = true;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.setup(context);
+ tmpbuf = ByteBuffer.allocate(4096);
+ collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
+ if (collectStatistics) {
+ samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
+ cuboidScheduler = new CuboidScheduler(cubeDesc);
+ nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+
+ List<Long> cuboidIdList = Lists.newArrayList();
+ List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
+ addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList);
+
+ allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[cuboidIdList.size()][]);
+ cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]);
+
+ allCuboidsHLL = new HLLCounter[cuboidIds.length];
+ for (int i = 0; i < cuboidIds.length; i++) {
+ allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision());
+ }
+
+ hf = Hashing.murmur3_32();
+ row_hashcodes = new ByteArray[nRowKey];
+ for (int i = 0; i < nRowKey; i++) {
+ row_hashcodes[i] = new ByteArray();
+ }
+
+ TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
+ if (partitionColRef != null) {
+ partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef);
+ }
+
+ // check whether need fetch the partition col values
+ if (partitionColumnIndex < 0) {
+ // if partition col not on cube, no need
+ needFetchPartitionCol = false;
+ } else {
+ needFetchPartitionCol = true;
+ }
+ }
+ }
+
+ private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet, List<Long> allCuboids) {
+ allCuboids.add(cuboidId);
+ Integer[] indice = new Integer[Long.bitCount(cuboidId)];
+
+ long mask = Long.highestOneBit(baseCuboidId);
+ int position = 0;
+ for (int i = 0; i < nRowKey; i++) {
+ if ((mask & cuboidId) > 0) {
+ indice[position] = i;
+ position++;
+ }
+ mask = mask >> 1;
+ }
+
+ allCuboidsBitSet.add(indice);
+ Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
+ for (Long childId : children) {
+ addCuboidBitSet(childId, allCuboidsBitSet, allCuboids);
+ }
+ }
+
+ @Override
+ public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+ String[] row = flatTableInputFormat.parseMapperInput(record);
+
+ context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
+
+ for (int i = 0; i < factDictCols.size(); i++) {
+ String fieldValue = row[dictionaryColumnIndex[i]];
+ if (fieldValue == null)
+ continue;
+
+ int reducerIndex;
+ if (uhcIndex[i] == 0) {
+ //for the normal dictionary column
+ reducerIndex = columnIndexToReducerBeginId.get(i);
+ } else {
+ //for the uhc
+ reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
+ }
+
+ tmpbuf.clear();
+ tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
+ tmpbuf.put(Bytes.toBytes(fieldValue));
+ outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+ sortableKey.setText(outputKey);
+ //judge type
+ sortableKey.setTypeIdByDatatype(factDictCols.get(i).getType());
+ context.write(sortableKey, EMPTY_TEXT);
+
+ // log a few rows for troubleshooting
+ if (rowCount < 10) {
+ logger.info("Sample output: " + factDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
+ }
+ }
+
+ if (collectStatistics) {
+ if (rowCount % 100 < samplingPercentage) {
+ putRowKeyToHLL(row);
+ }
+
+ if (needFetchPartitionCol == true) {
+ String fieldValue = row[partitionColumnIndex];
+ if (fieldValue != null) {
+ tmpbuf.clear();
+ tmpbuf.put(MARK_FOR_PARTITION_COL);
+ tmpbuf.put(Bytes.toBytes(fieldValue));
+ outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+ sortableKey.setText(outputKey);
+ sortableKey.setTypeId((byte) 0);
+ context.write(sortableKey, EMPTY_TEXT);
+ }
+ }
+ }
+ rowCount++;
+ }
+
+ private long countSizeInBytes(String[] row) {
+ int size = 0;
+ for (String s : row) {
+ size += s == null ? 1 : utf8Length(s);
+ size++; // delimiter
+ }
+ return size;
+ }
+
+ // calculating length in UTF-8 of Java String without actually encoding it
+ public static int utf8Length(CharSequence sequence) {
+ int count = 0;
+ for (int i = 0, len = sequence.length(); i < len; i++) {
+ char ch = sequence.charAt(i);
+ if (ch <= 0x7F) {
+ count++;
+ } else if (ch <= 0x7FF) {
+ count += 2;
+ } else if (Character.isHighSurrogate(ch)) {
+ count += 4;
+ ++i;
+ } else {
+ count += 3;
+ }
+ }
+ return count;
+ }
+
+ private void putRowKeyToHLL(String[] row) {
+
+ //generate hash for each row key column
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hf.newHasher();
+ String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
+ if (colValue != null) {
+ row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
+ } else {
+ row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
+ }
+ }
+
+ // user the row key column hash to get a consolidated hash for each cuboid
+ for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+ Hasher hc = hf.newHasher();
+ for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+ hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array());
+ }
+
+ allCuboidsHLL[i].add(hc.hash().asBytes());
+ }
+ }
+
+ @Override
+ protected void doCleanup(Context context) throws IOException, InterruptedException {
+ if (collectStatistics) {
+ ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+ // output each cuboid's hll to reducer, key is 0 - cuboidId
+ HLLCounter hll;
+ for (int i = 0; i < cuboidIds.length; i++) {
+ hll = allCuboidsHLL[i];
+
+ tmpbuf.clear();
+ tmpbuf.put(MARK_FOR_HLL); // one byte
+ tmpbuf.putLong(cuboidIds[i]);
+ outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+ hllBuf.clear();
+ hll.writeRegisters(hllBuf);
+ outputValue.set(hllBuf.array(), 0, hllBuf.position());
+ sortableKey.setText(outputKey);
+ sortableKey.setTypeId((byte) 0);
+ context.write(sortableKey, outputValue);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
deleted file mode 100644
index ed65343..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.hllc.HLLCounter;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-
-/**
- */
-public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> {
-
- private static final Logger logger = LoggerFactory.getLogger(FactDistinctHiveColumnsMapper.class);
-
- protected boolean collectStatistics = false;
- protected CuboidScheduler cuboidScheduler = null;
- protected int nRowKey;
- private Integer[][] allCuboidsBitSet = null;
- private HLLCounter[] allCuboidsHLL = null;
- private Long[] cuboidIds;
- private HashFunction hf = null;
- private int rowCount = 0;
- private int samplingPercentage;
- private ByteArray[] row_hashcodes = null;
- private ByteBuffer tmpbuf;
- private static final Text EMPTY_TEXT = new Text();
- public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
- public static final byte MARK_FOR_HLL = (byte) 0xFF;
-
- private int partitionColumnIndex = -1;
- private boolean needFetchPartitionCol = true;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.setup(context);
- tmpbuf = ByteBuffer.allocate(4096);
- collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
- if (collectStatistics) {
- samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
- cuboidScheduler = new CuboidScheduler(cubeDesc);
- nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
-
- List<Long> cuboidIdList = Lists.newArrayList();
- List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
- addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList);
-
- allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[cuboidIdList.size()][]);
- cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]);
-
- allCuboidsHLL = new HLLCounter[cuboidIds.length];
- for (int i = 0; i < cuboidIds.length; i++) {
- allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision());
- }
-
- hf = Hashing.murmur3_32();
- row_hashcodes = new ByteArray[nRowKey];
- for (int i = 0; i < nRowKey; i++) {
- row_hashcodes[i] = new ByteArray();
- }
-
- TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
- if (partitionColRef != null) {
- partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef);
- }
-
- // check whether need fetch the partition col values
- if (partitionColumnIndex < 0) {
- // if partition col not on cube, no need
- needFetchPartitionCol = false;
- } else {
- needFetchPartitionCol = true;
- }
- }
- }
-
- private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet, List<Long> allCuboids) {
- allCuboids.add(cuboidId);
- Integer[] indice = new Integer[Long.bitCount(cuboidId)];
-
- long mask = Long.highestOneBit(baseCuboidId);
- int position = 0;
- for (int i = 0; i < nRowKey; i++) {
- if ((mask & cuboidId) > 0) {
- indice[position] = i;
- position++;
- }
- mask = mask >> 1;
- }
-
- allCuboidsBitSet.add(indice);
- Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
- for (Long childId : children) {
- addCuboidBitSet(childId, allCuboidsBitSet, allCuboids);
- }
- }
-
- @Override
- public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
- String[] row = flatTableInputFormat.parseMapperInput(record);
-
- for (int i = 0; i < factDictCols.size(); i++) {
- String fieldValue = row[dictionaryColumnIndex[i]];
- if (fieldValue == null)
- continue;
-
- int reducerIndex;
- if (uhcIndex[i] == 0) {
- //for the normal dictionary column
- reducerIndex = columnIndexToReducerBeginId.get(i);
- } else {
- //for the uhc
- reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
- }
-
- tmpbuf.clear();
- tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
- tmpbuf.put(Bytes.toBytes(fieldValue));
- outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
- sortableKey.setText(outputKey);
- //judge type
- sortableKey.setTypeIdByDatatype(factDictCols.get(i).getType());
- context.write(sortableKey, EMPTY_TEXT);
-
- // log a few rows for troubleshooting
- if (rowCount < 10) {
- logger.info("Sample output: " + factDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
- }
- }
-
- if (collectStatistics) {
- if (rowCount % 100 < samplingPercentage) {
- putRowKeyToHLL(row);
- }
-
- if (needFetchPartitionCol == true) {
- String fieldValue = row[partitionColumnIndex];
- if (fieldValue != null) {
- tmpbuf.clear();
- tmpbuf.put(MARK_FOR_PARTITION_COL);
- tmpbuf.put(Bytes.toBytes(fieldValue));
- outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
- sortableKey.setText(outputKey);
- sortableKey.setTypeId((byte) 0);
- context.write(sortableKey, EMPTY_TEXT);
- }
- }
- }
- rowCount++;
- }
-
- private void putRowKeyToHLL(String[] row) {
-
- //generate hash for each row key column
- for (int i = 0; i < nRowKey; i++) {
- Hasher hc = hf.newHasher();
- String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
- if (colValue != null) {
- row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
- } else {
- row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
- }
- }
-
- // user the row key column hash to get a consolidated hash for each cuboid
- for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
- Hasher hc = hf.newHasher();
- for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
- hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array());
- }
-
- allCuboidsHLL[i].add(hc.hash().asBytes());
- }
- }
-
- @Override
- protected void doCleanup(Context context) throws IOException, InterruptedException {
- if (collectStatistics) {
- ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
- // output each cuboid's hll to reducer, key is 0 - cuboidId
- HLLCounter hll;
- for (int i = 0; i < cuboidIds.length; i++) {
- hll = allCuboidsHLL[i];
-
- tmpbuf.clear();
- tmpbuf.put(MARK_FOR_HLL); // one byte
- tmpbuf.putLong(cuboidIds[i]);
- outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
- hllBuf.clear();
- hll.writeRegisters(hllBuf);
- outputValue.set(hllBuf.array(), 0, hllBuf.position());
- sortableKey.setText(outputKey);
- sortableKey.setTypeId((byte) 0);
- context.write(sortableKey, outputValue);
- }
- }
- }
-}