You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/12/04 15:25:08 UTC

carbondata git commit: [CARBONDATA-1847] Add inputSize for value read

Repository: carbondata
Updated Branches:
  refs/heads/master 5ae596b76 -> 77217b370


[CARBONDATA-1847] Add inputSize for value read

This closes #1607


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/77217b37
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/77217b37
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/77217b37

Branch: refs/heads/master
Commit: 77217b370b6beba7408039fec465a36e0b824028
Parents: 5ae596b
Author: Jacky Li <ja...@qq.com>
Authored: Mon Dec 4 18:49:03 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Mon Dec 4 23:23:08 2017 +0800

----------------------------------------------------------------------
 .../org/apache/carbondata/hadoop/InputMetricsStats.java     | 4 ++++
 .../org/apache/carbondata/spark/rdd/CarbonScanRDD.scala     | 1 +
 .../spark/vectorreader/VectorizedCarbonRecordReader.java    | 4 +++-
 .../main/scala/org/apache/spark/CarbonInputMetrics.scala    | 9 +++++++++
 .../apache/spark/sql/CarbonDatasourceHadoopRelation.scala   | 3 ++-
 5 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/77217b37/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java
index e678100..cc39b34 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java
@@ -35,4 +35,8 @@ public interface InputMetricsStats extends Serializable {
    */
   void updateAndClose();
 
+  /**
+   * update metric by `value`, it can be ColumnarBatch or InternalRow
+   */
+  void updateByValue(Object value);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/77217b37/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 7316574..67d75bd 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -307,6 +307,7 @@ class CarbonScanRDD(
           }
           havePair = false
           val value = reader.getCurrentValue
+          inputMetricsStats.updateByValue(value)
           value
         }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/77217b37/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 3acedab..eba0787 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -165,7 +165,9 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
     if (returnColumnarBatch) {
       int value = columnarBatch.numValidRows();
       rowCount += value;
-      inputMetricsStats.incrementRecordRead((long)value);
+      if (inputMetricsStats != null) {
+        inputMetricsStats.incrementRecordRead((long) value);
+      }
       return columnarBatch;
     }
     rowCount += 1;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/77217b37/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
index b562ebc..deef157 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
@@ -20,6 +20,7 @@ import java.lang.Long
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.InputMetrics
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.util.TaskMetricsMap
@@ -75,4 +76,12 @@ class CarbonInputMetrics extends InitInputMetrics{
       }
     }
   }
+
+  override def updateByValue(value: Object): Unit = {
+    value match {
+      case batch: ColumnarBatch =>
+        inputMetrics.incRecordsRead(batch.numRows())
+      case _ =>
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/77217b37/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 57233cf..e5de052 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -79,7 +79,8 @@ case class CarbonDatasourceHadoopRelation(
       filterExpression.orNull,
       identifier,
       carbonTable.getTableInfo.serialize(),
-      carbonTable.getTableInfo, inputMetricsStats)
+      carbonTable.getTableInfo,
+      inputMetricsStats)
   }
 
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)