You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:12 UTC

[14/47] incubator-carbondata git commit: [Bug] reset statistics information after dataloading #842

[Bug] reset statistics information after dataloading #842

[Bug] reset statistics information after dataloading #842

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/1b16f765
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/1b16f765
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/1b16f765

Branch: refs/heads/master
Commit: 1b16f765c37202284e2b404c5bc45a06c15c23a7
Parents: 7159dce
Author: Zhangshunyu <zh...@huawei.com>
Authored: Mon Jul 25 11:21:15 2016 +0800
Committer: david <qi...@qq.com>
Committed: Mon Jul 25 11:21:15 2016 +0800

----------------------------------------------------------------------
 .../core/util/CarbonLoadStatisticsDummy.java    | 12 +---
 .../core/util/CarbonLoadStatisticsImpl.java     | 65 ++++++++------------
 .../carbondata/core/util/LoadStatistics.java    |  6 +-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  5 +-
 4 files changed, 30 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1b16f765/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsDummy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsDummy.java b/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsDummy.java
index 1561efa..bb82fcd 100644
--- a/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsDummy.java
+++ b/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsDummy.java
@@ -35,7 +35,7 @@ public class CarbonLoadStatisticsDummy implements LoadStatistics {
   }
 
   @Override
-  public void recordGlobalDicGenTotalTime(Long glblDicTimePoint) {
+  public void recordDicShuffleAndWriteTime() {
 
   }
 
@@ -45,16 +45,6 @@ public class CarbonLoadStatisticsDummy implements LoadStatistics {
   }
 
   @Override
-  public void recordCsvlDicShuffleMaxTime(Long csvlDicShuffleTimePart) {
-
-  }
-
-  @Override
-  public void recordDicWriteFileMaxTime(Long dicWriteFileTimePart) {
-
-  }
-
-  @Override
   public void recordDictionaryValuesTotalTime(String partitionID,
       Long dictionaryValuesTotalTimeTimePoint) {
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1b16f765/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsImpl.java b/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsImpl.java
index e55179e..3a56db2 100644
--- a/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsImpl.java
+++ b/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsImpl.java
@@ -50,13 +50,6 @@ public class CarbonLoadStatisticsImpl implements LoadStatistics {
   private long dicShuffleAndWriteFileTotalStartTime = 0;
   private long dicShuffleAndWriteFileTotalCostTime = 0;
 
-  //Due to thread thread blocking in each task, we only record the max
-  //csvlDicShuffle Time of each single thread
-  private long csvlDicShuffleCostTime = 0;
-  //Due to thread thread blocking in each task, we only record the max
-  //dicWriteFile Time of each single thread
-  private long dicWriteFileCostTime = 0;
-
   //LRU cache load one time
   private double lruCacheLoadTime = 0;
 
@@ -90,7 +83,7 @@ public class CarbonLoadStatisticsImpl implements LoadStatistics {
   private double totalTime = 0;
 
   @Override
-  public void  initPartitonInfo(String PartitionId) {
+  public void initPartitonInfo(String PartitionId) {
     parDictionaryValuesTotalTimeMap.put(PartitionId, new Long[2]);
     parCsvInputStepTimeMap.put(PartitionId, new Long[2]);
     parSortRowsStepTotalTimeMap.put(PartitionId, new Long[2]);
@@ -100,13 +93,15 @@ public class CarbonLoadStatisticsImpl implements LoadStatistics {
   }
 
   //Record the time
-  public void recordGlobalDicGenTotalTime(Long glblDicTimePoint) {
+  public void recordDicShuffleAndWriteTime() {
+    Long dicShuffleAndWriteTimePoint = System.currentTimeMillis();
     if (0 == dicShuffleAndWriteFileTotalStartTime) {
-      dicShuffleAndWriteFileTotalStartTime = glblDicTimePoint;
+      dicShuffleAndWriteFileTotalStartTime = dicShuffleAndWriteTimePoint;
     }
-    if (glblDicTimePoint - dicShuffleAndWriteFileTotalStartTime >
+    if (dicShuffleAndWriteTimePoint - dicShuffleAndWriteFileTotalStartTime >
             dicShuffleAndWriteFileTotalCostTime) {
-      dicShuffleAndWriteFileTotalCostTime = glblDicTimePoint - dicShuffleAndWriteFileTotalStartTime;
+      dicShuffleAndWriteFileTotalCostTime =
+          dicShuffleAndWriteTimePoint - dicShuffleAndWriteFileTotalStartTime;
     }
   }
 
@@ -120,19 +115,6 @@ public class CarbonLoadStatisticsImpl implements LoadStatistics {
     }
   }
 
-  public void recordCsvlDicShuffleMaxTime(Long csvlDicShuffleTimePart) {
-    if (csvlDicShuffleTimePart > csvlDicShuffleCostTime) {
-      csvlDicShuffleCostTime = csvlDicShuffleTimePart;
-    }
-  }
-
-  public void recordDicWriteFileMaxTime(Long dicWriteFileTimePart) {
-    if (dicWriteFileTimePart > dicWriteFileCostTime) {
-      dicWriteFileCostTime = dicWriteFileTimePart;
-    }
-  }
-
-
   public double getLruCacheLoadTime() {
     return lruCacheLoadTime;
   }
@@ -260,14 +242,6 @@ public class CarbonLoadStatisticsImpl implements LoadStatistics {
     return loadCsvfilesToDfCostTime / 1000.0;
   }
 
-  private double getCsvlDicShuffleMaxTime() {
-    return csvlDicShuffleCostTime / 1000.0;
-  }
-
-  private double getDicWriteFileMaxTime() {
-    return dicWriteFileCostTime / 1000.0;
-  }
-
   private double getDictionaryValuesTotalTime(String partitionID) {
     return parDictionaryValuesTotalTimeMap.get(partitionID)[1] / 1000.0;
   }
@@ -342,12 +316,6 @@ public class CarbonLoadStatisticsImpl implements LoadStatistics {
     double dicShuffleAndWriteFileTotalTime = getDicShuffleAndWriteFileTotalTime();
     LOGGER.audit("STAGE 2 ->Global dict shuffle and write dict file: " +
             + dicShuffleAndWriteFileTotalTime + "(s)");
-    double csvShuffleMaxTime = getCsvlDicShuffleMaxTime();
-    LOGGER.audit("STAGE 2.1 ->  |_maximum distinct column shuffle time: "
-            + csvShuffleMaxTime + "(s)");
-    double dicWriteFileMaxTime = getDicWriteFileMaxTime();
-    LOGGER.audit("STAGE 2.2 ->  |_maximum distinct column write dict file time: "
-            + dicWriteFileMaxTime + "(s)");
   }
 
   private void printLruCacheLoadTimeInfo() {
@@ -420,7 +388,26 @@ public class CarbonLoadStatisticsImpl implements LoadStatistics {
       printLoadSpeedInfo(partitionID);
     } catch (Exception e) {
       LOGGER.audit("Can't print Statistics Information");
+    } finally {
+      resetLoadStatistics();
     }
   }
 
+  //Reset the load statistics values
+  private void resetLoadStatistics() {
+    loadCsvfilesToDfStartTime = 0;
+    loadCsvfilesToDfCostTime = 0;
+    dicShuffleAndWriteFileTotalStartTime = 0;
+    dicShuffleAndWriteFileTotalCostTime = 0;
+    lruCacheLoadTime = 0;
+    totalRecords = 0;
+    totalTime = 0;
+    parDictionaryValuesTotalTimeMap.clear();
+    parCsvInputStepTimeMap.clear();
+    parSortRowsStepTotalTimeMap.clear();
+    parGeneratingDictionaryValuesTimeMap.clear();
+    parMdkGenerateTotalTimeMap.clear();
+    parDictionaryValue2MdkAdd2FileTime.clear();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1b16f765/core/src/main/java/org/carbondata/core/util/LoadStatistics.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/LoadStatistics.java b/core/src/main/java/org/carbondata/core/util/LoadStatistics.java
index c353f0c..e5f24e6 100644
--- a/core/src/main/java/org/carbondata/core/util/LoadStatistics.java
+++ b/core/src/main/java/org/carbondata/core/util/LoadStatistics.java
@@ -24,14 +24,10 @@ public interface LoadStatistics {
   void  initPartitonInfo(String PartitionId);
 
   //Record the time
-  void recordGlobalDicGenTotalTime(Long glblDicTimePoint);
+  void recordDicShuffleAndWriteTime();
 
   void recordLoadCsvfilesToDfTime();
 
-  void recordCsvlDicShuffleMaxTime(Long csvlDicShuffleTimePart);
-
-  void recordDicWriteFileMaxTime(Long dicWriteFileTimePart);
-
   void recordDictionaryValuesTotalTime(String partitionID,
       Long dictionaryValuesTotalTimeTimePoint);
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1b16f765/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index e519030..7067fd3 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -308,9 +308,7 @@ class CarbonGlobalDictionaryGenerateRDD(
         val valuesBuffer = new mutable.HashSet[String]
         val rddIter = firstParent[(Int, ColumnDistinctValues)].iterator(split, context)
         var rowCount = 0L
-        val dicShuffleStartTime = System.currentTimeMillis()
-        CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordGlobalDicGenTotalTime(
-          dicShuffleStartTime)
+        CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDicShuffleAndWriteTime()
         breakable {
           while (rddIter.hasNext) {
             val distinctValueList = rddIter.next()._2
@@ -372,6 +370,7 @@ class CarbonGlobalDictionaryGenerateRDD(
             sortIndexWriteTask.execute()
           }
           val sortIndexWriteTime = (System.currentTimeMillis() - t4)
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDicShuffleAndWriteTime()
           // After sortIndex writing, update dictionaryMeta
           dictWriteTask.updateMetaData()
           // clear the value buffer after writing dictionary data