You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/02/10 08:56:34 UTC

kylin git commit: KYLIN-2442 calculate raw data size using custom counter

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2442 [created] 77be3eb04


KYLIN-2442 calculate raw data size using custom counter


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/77be3eb0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/77be3eb0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/77be3eb0

Branch: refs/heads/KYLIN-2442
Commit: 77be3eb0437daac90a65391570a1ca9ed2e7dd72
Parents: ecf6a69
Author: Li Yang <li...@apache.org>
Authored: Fri Feb 10 16:56:21 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Fri Feb 10 16:56:21 2017 +0800

----------------------------------------------------------------------
 .../kylin/engine/mr/BatchCubingJobBuilder.java  |  1 -
 .../kylin/engine/mr/common/HadoopCmdOutput.java |  9 +++---
 .../engine/mr/common/MapReduceExecutable.java   |  8 ++---
 .../mr/steps/FactDistinctHiveColumnsMapper.java | 32 ++++++++++++++++++++
 4 files changed, 39 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/77be3eb0/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 36c12a1..1ec23b6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -100,7 +100,6 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
 
         baseCuboidStep.setMapReduceParams(cmd.toString());
         baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
-        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
         return baseCuboidStep;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/77be3eb0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
index 9d016cc..73ee7ce 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.kylin.engine.mr.steps.FactDistinctHiveColumnsMapper.RawDataCounter;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,7 +68,7 @@ public class HadoopCmdOutput {
 
     private String mapInputRecords;
     private String hdfsBytesWritten;
-    private String hdfsBytesRead;
+    private String rawInputBytesRead;
 
     public String getMapInputRecords() {
         return mapInputRecords;
@@ -77,8 +78,8 @@ public class HadoopCmdOutput {
         return hdfsBytesWritten;
     }
 
-    public String getHdfsBytesRead() {
-        return hdfsBytesRead;
+    public String getRawInputBytesRead() {
+        return rawInputBytesRead;
     }
 
     public void updateJobCounter() {
@@ -95,7 +96,7 @@ public class HadoopCmdOutput {
 
             mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue());
             hdfsBytesWritten = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue());
-            hdfsBytesRead = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_READ").getValue());
+            rawInputBytesRead = String.valueOf(counters.findCounter(RawDataCounter.BYTES).getValue());
         } catch (Exception e) {
             logger.error(e.getLocalizedMessage(), e);
             output.append(e.getLocalizedMessage());

http://git-wip-us.apache.org/repos/asf/kylin/blob/77be3eb0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 6de07ca..2e7a289 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -202,14 +202,14 @@ public class MapReduceExecutable extends AbstractExecutable {
     private void readCounters(final HadoopCmdOutput hadoopCmdOutput, final Map<String, String> info) {
         hadoopCmdOutput.updateJobCounter();
         info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords());
-        info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead());
+        info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getRawInputBytesRead());
         info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten());
 
         String saveAs = getParam(KEY_COUNTER_SAVEAS);
         if (saveAs != null) {
             String[] saveAsNames = saveAs.split(",");
             saveCounterAs(hadoopCmdOutput.getMapInputRecords(), saveAsNames, 0, info);
-            saveCounterAs(hadoopCmdOutput.getHdfsBytesRead(), saveAsNames, 1, info);
+            saveCounterAs(hadoopCmdOutput.getRawInputBytesRead(), saveAsNames, 1, info);
             saveCounterAs(hadoopCmdOutput.getHdfsBytesWritten(), saveAsNames, 2, info);
         }
     }
@@ -244,10 +244,6 @@ public class MapReduceExecutable extends AbstractExecutable {
         setParam(KEY_PARAMS, param);
     }
 
-    public String getCounterSaveAs() {
-        return getParam(KEY_COUNTER_SAVEAS);
-    }
-
     public void setCounterSaveAs(String value) {
         setParam(KEY_COUNTER_SAVEAS, value);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/77be3eb0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index ed65343..b6efd79 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -44,6 +44,8 @@ import com.google.common.hash.Hashing;
 public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> {
 
     private static final Logger logger = LoggerFactory.getLogger(FactDistinctHiveColumnsMapper.class);
+    
+    public static enum RawDataCounter { BYTES };
 
     protected boolean collectStatistics = false;
     protected CuboidScheduler cuboidScheduler = null;
@@ -130,6 +132,8 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     @Override
     public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
         String[] row = flatTableInputFormat.parseMapperInput(record);
+        
+        context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
 
         for (int i = 0; i < factDictCols.size(); i++) {
             String fieldValue = row[dictionaryColumnIndex[i]];
@@ -181,6 +185,34 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
         rowCount++;
     }
 
+    private long countSizeInBytes(String[] row) {
+        int size = 0;
+        for (String s : row) {
+            size += s == null ? 1 : utf8Length(s);
+            size++; // delimiter
+        }
+        return size;
+    }
+    
+    // calculating length in UTF-8 of Java String without actually encoding it
+    public static int utf8Length(CharSequence sequence) {
+        int count = 0;
+        for (int i = 0, len = sequence.length(); i < len; i++) {
+            char ch = sequence.charAt(i);
+            if (ch <= 0x7F) {
+                count++;
+            } else if (ch <= 0x7FF) {
+                count += 2;
+            } else if (Character.isHighSurrogate(ch)) {
+                count += 4;
+                ++i;
+            } else {
+                count += 3;
+            }
+        }
+        return count;
+    }
+
     private void putRowKeyToHLL(String[] row) {
 
         //generate hash for each row key column