You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/12/04 15:25:08 UTC
carbondata git commit: [CARBONDATA-1847] Add inputSize for value read
Repository: carbondata
Updated Branches:
refs/heads/master 5ae596b76 -> 77217b370
[CARBONDATA-1847] Add inputSize for value read
This closes #1607
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/77217b37
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/77217b37
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/77217b37
Branch: refs/heads/master
Commit: 77217b370b6beba7408039fec465a36e0b824028
Parents: 5ae596b
Author: Jacky Li <ja...@qq.com>
Authored: Mon Dec 4 18:49:03 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Mon Dec 4 23:23:08 2017 +0800
----------------------------------------------------------------------
.../org/apache/carbondata/hadoop/InputMetricsStats.java | 4 ++++
.../org/apache/carbondata/spark/rdd/CarbonScanRDD.scala | 1 +
.../spark/vectorreader/VectorizedCarbonRecordReader.java | 4 +++-
.../main/scala/org/apache/spark/CarbonInputMetrics.scala | 9 +++++++++
.../apache/spark/sql/CarbonDatasourceHadoopRelation.scala | 3 ++-
5 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77217b37/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java
index e678100..cc39b34 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java
@@ -35,4 +35,8 @@ public interface InputMetricsStats extends Serializable {
*/
void updateAndClose();
+ /**
+ * update metric by `value`, it can be ColumnarBatch or InternalRow
+ */
+ void updateByValue(Object value);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77217b37/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 7316574..67d75bd 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -307,6 +307,7 @@ class CarbonScanRDD(
}
havePair = false
val value = reader.getCurrentValue
+ inputMetricsStats.updateByValue(value)
value
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77217b37/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 3acedab..eba0787 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -165,7 +165,9 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
if (returnColumnarBatch) {
int value = columnarBatch.numValidRows();
rowCount += value;
- inputMetricsStats.incrementRecordRead((long)value);
+ if (inputMetricsStats != null) {
+ inputMetricsStats.incrementRecordRead((long) value);
+ }
return columnarBatch;
}
rowCount += 1;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77217b37/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
index b562ebc..deef157 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
@@ -20,6 +20,7 @@ import java.lang.Long
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.InputMetrics
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.util.TaskMetricsMap
@@ -75,4 +76,12 @@ class CarbonInputMetrics extends InitInputMetrics{
}
}
}
+
+ override def updateByValue(value: Object): Unit = {
+ value match {
+ case batch: ColumnarBatch =>
+ inputMetrics.incRecordsRead(batch.numRows())
+ case _ =>
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77217b37/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 57233cf..e5de052 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -79,7 +79,8 @@ case class CarbonDatasourceHadoopRelation(
filterExpression.orNull,
identifier,
carbonTable.getTableInfo.serialize(),
- carbonTable.getTableInfo, inputMetricsStats)
+ carbonTable.getTableInfo,
+ inputMetricsStats)
}
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)