You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/02/10 08:56:34 UTC
kylin git commit: KYLIN-2442 calculate raw data size using custom
counter
Repository: kylin
Updated Branches:
refs/heads/KYLIN-2442 [created] 77be3eb04
KYLIN-2442 calculate raw data size using custom counter
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/77be3eb0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/77be3eb0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/77be3eb0
Branch: refs/heads/KYLIN-2442
Commit: 77be3eb0437daac90a65391570a1ca9ed2e7dd72
Parents: ecf6a69
Author: Li Yang <li...@apache.org>
Authored: Fri Feb 10 16:56:21 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Fri Feb 10 16:56:21 2017 +0800
----------------------------------------------------------------------
.../kylin/engine/mr/BatchCubingJobBuilder.java | 1 -
.../kylin/engine/mr/common/HadoopCmdOutput.java | 9 +++---
.../engine/mr/common/MapReduceExecutable.java | 8 ++---
.../mr/steps/FactDistinctHiveColumnsMapper.java | 32 ++++++++++++++++++++
4 files changed, 39 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/77be3eb0/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 36c12a1..1ec23b6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -100,7 +100,6 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
baseCuboidStep.setMapReduceParams(cmd.toString());
baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
- baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
return baseCuboidStep;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/77be3eb0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
index 9d016cc..73ee7ce 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.kylin.engine.mr.steps.FactDistinctHiveColumnsMapper.RawDataCounter;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +68,7 @@ public class HadoopCmdOutput {
private String mapInputRecords;
private String hdfsBytesWritten;
- private String hdfsBytesRead;
+ private String rawInputBytesRead;
public String getMapInputRecords() {
return mapInputRecords;
@@ -77,8 +78,8 @@ public class HadoopCmdOutput {
return hdfsBytesWritten;
}
- public String getHdfsBytesRead() {
- return hdfsBytesRead;
+ public String getRawInputBytesRead() {
+ return rawInputBytesRead;
}
public void updateJobCounter() {
@@ -95,7 +96,7 @@ public class HadoopCmdOutput {
mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue());
hdfsBytesWritten = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue());
- hdfsBytesRead = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_READ").getValue());
+ rawInputBytesRead = String.valueOf(counters.findCounter(RawDataCounter.BYTES).getValue());
} catch (Exception e) {
logger.error(e.getLocalizedMessage(), e);
output.append(e.getLocalizedMessage());
http://git-wip-us.apache.org/repos/asf/kylin/blob/77be3eb0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 6de07ca..2e7a289 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -202,14 +202,14 @@ public class MapReduceExecutable extends AbstractExecutable {
private void readCounters(final HadoopCmdOutput hadoopCmdOutput, final Map<String, String> info) {
hadoopCmdOutput.updateJobCounter();
info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords());
- info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead());
+ info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getRawInputBytesRead());
info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten());
String saveAs = getParam(KEY_COUNTER_SAVEAS);
if (saveAs != null) {
String[] saveAsNames = saveAs.split(",");
saveCounterAs(hadoopCmdOutput.getMapInputRecords(), saveAsNames, 0, info);
- saveCounterAs(hadoopCmdOutput.getHdfsBytesRead(), saveAsNames, 1, info);
+ saveCounterAs(hadoopCmdOutput.getRawInputBytesRead(), saveAsNames, 1, info);
saveCounterAs(hadoopCmdOutput.getHdfsBytesWritten(), saveAsNames, 2, info);
}
}
@@ -244,10 +244,6 @@ public class MapReduceExecutable extends AbstractExecutable {
setParam(KEY_PARAMS, param);
}
- public String getCounterSaveAs() {
- return getParam(KEY_COUNTER_SAVEAS);
- }
-
public void setCounterSaveAs(String value) {
setParam(KEY_COUNTER_SAVEAS, value);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/77be3eb0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index ed65343..b6efd79 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -44,6 +44,8 @@ import com.google.common.hash.Hashing;
public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> {
private static final Logger logger = LoggerFactory.getLogger(FactDistinctHiveColumnsMapper.class);
+
+ public static enum RawDataCounter { BYTES };
protected boolean collectStatistics = false;
protected CuboidScheduler cuboidScheduler = null;
@@ -130,6 +132,8 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
@Override
public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
String[] row = flatTableInputFormat.parseMapperInput(record);
+
+ context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
for (int i = 0; i < factDictCols.size(); i++) {
String fieldValue = row[dictionaryColumnIndex[i]];
@@ -181,6 +185,34 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
rowCount++;
}
+ private long countSizeInBytes(String[] row) {
+ int size = 0;
+ for (String s : row) {
+ size += s == null ? 1 : utf8Length(s);
+ size++; // delimiter
+ }
+ return size;
+ }
+
+ // calculating length in UTF-8 of Java String without actually encoding it
+ public static int utf8Length(CharSequence sequence) {
+ int count = 0;
+ for (int i = 0, len = sequence.length(); i < len; i++) {
+ char ch = sequence.charAt(i);
+ if (ch <= 0x7F) {
+ count++;
+ } else if (ch <= 0x7FF) {
+ count += 2;
+ } else if (Character.isHighSurrogate(ch)) {
+ count += 4;
+ ++i;
+ } else {
+ count += 3;
+ }
+ }
+ return count;
+ }
+
private void putRowKeyToHLL(String[] row) {
//generate hash for each row key column