You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/04/13 10:45:24 UTC
[1/2] incubator-carbondata git commit: result_size in query
statistics is not giving valid row count if vector reader is enabled.
Repository: incubator-carbondata
Updated Branches:
refs/heads/master f4fc65199 -> f7d7e41ef
result_size in query statistics is not giving valid row count if vector reader is enabled.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ec2d742f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ec2d742f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ec2d742f
Branch: refs/heads/master
Commit: ec2d742f2e479e40883b92df014a8b260d50e526
Parents: f4fc651
Author: nareshpr <pr...@gmail.com>
Authored: Wed Apr 12 19:53:21 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Apr 13 16:14:09 2017 +0530
----------------------------------------------------------------------
.../carbondata/hadoop/AbstractRecordReader.java | 45 ++++++++++++++++++++
.../carbondata/hadoop/CarbonRecordReader.java | 5 ++-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 13 +-----
.../VectorizedCarbonRecordReader.java | 11 +++--
4 files changed, 58 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ec2d742f/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
new file mode 100644
index 0000000..e571ccf
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hadoop;
+
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+
+import org.apache.hadoop.mapreduce.RecordReader;
+
+/**
+ * This class will have all the common methods for vector and row based reader
+ */
+public abstract class AbstractRecordReader<T> extends RecordReader<Void, T> {
+
+ protected int rowCount = 0;
+
+ /**
+ * This method will log query result count and querytime
+ * @param recordCount
+ * @param recorder
+ */
+ public void logStatistics(int recordCount, QueryStatisticsRecorder recorder) {
+ // result size
+ QueryStatistic queryStatistic = new QueryStatistic();
+ queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount);
+ recorder.recordStatistics(queryStatistic);
+ // print executor query statistics for each task_id
+ recorder.logStatisticsAsTableExecutor();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ec2d742f/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 27c8b2f..26b269a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -33,13 +33,12 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* Reads the data from Carbon store.
*/
-public class CarbonRecordReader<T> extends RecordReader<Void, T> {
+public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
private QueryModel queryModel;
@@ -92,6 +91,7 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> {
}
@Override public T getCurrentValue() throws IOException, InterruptedException {
+ rowCount += 1;
return readSupport.readRow(carbonIterator.next());
}
@@ -101,6 +101,7 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> {
}
@Override public void close() throws IOException {
+ logStatistics(rowCount, queryModel.getStatisticsRecorder());
// clear dictionary cache
Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
if (null != columnToDictionaryMapping) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ec2d742f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index ab0d603..4807b90 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -207,10 +207,9 @@ class CarbonScanRDD(
new Iterator[Any] {
private var havePair = false
private var finished = false
- private var count = 0
context.addTaskCompletionListener { context =>
- logStatistics(queryStartTime, count, model.getStatisticsRecorder)
+ logStatistics(queryStartTime, model.getStatisticsRecorder)
reader.close()
}
@@ -231,7 +230,6 @@ class CarbonScanRDD(
}
havePair = false
val value = reader.getCurrentValue
- count += 1
value
}
}
@@ -265,18 +263,11 @@ class CarbonScanRDD(
format
}
- def logStatistics(queryStartTime: Long, recordCount: Int,
- recorder: QueryStatisticsRecorder): Unit = {
+ def logStatistics(queryStartTime: Long, recorder: QueryStatisticsRecorder): Unit = {
var queryStatistic = new QueryStatistic()
queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
System.currentTimeMillis - queryStartTime)
recorder.recordStatistics(queryStatistic)
- // result size
- queryStatistic = new QueryStatistic()
- queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
- recorder.recordStatistics(queryStatistic)
- // print executor query statistics for each task_id
- recorder.logStatisticsAsTableExecutor()
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ec2d742f/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index ffff956..3fdf9af 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -38,12 +38,12 @@ import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResult
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.hadoop.AbstractRecordReader;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
import org.apache.carbondata.spark.util.CarbonScalaUtil;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
@@ -55,7 +55,7 @@ import org.apache.spark.sql.types.StructType;
* A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
* carbondata column APIs and fills the data directly into columns.
*/
-class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
+class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
private int batchIdx = 0;
@@ -116,6 +116,7 @@ class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
}
@Override public void close() throws IOException {
+ logStatistics(rowCount, queryModel.getStatisticsRecorder());
if (columnarBatch != null) {
columnarBatch.close();
columnarBatch = null;
@@ -147,7 +148,11 @@ class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
}
@Override public Object getCurrentValue() throws IOException, InterruptedException {
- if (returnColumnarBatch) return columnarBatch;
+ if (returnColumnarBatch) {
+ rowCount += columnarBatch.numValidRows();
+ return columnarBatch;
+ }
+ rowCount += 1;
return columnarBatch.getRow(batchIdx - 1);
}
[2/2] incubator-carbondata git commit: [CARBONDATA-919]result_size in
query statistics is not giving valid row count if vector reader is enabled.
This closes #790
Posted by ra...@apache.org.
[CARBONDATA-919]result_size in query statistics is not giving valid row count if vector reader is enabled. This closes #790
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f7d7e41e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f7d7e41e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f7d7e41e
Branch: refs/heads/master
Commit: f7d7e41efd987d341b5a40085ec550d3ee265e23
Parents: f4fc651 ec2d742
Author: ravipesala <ra...@gmail.com>
Authored: Thu Apr 13 16:15:09 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Apr 13 16:15:09 2017 +0530
----------------------------------------------------------------------
.../carbondata/hadoop/AbstractRecordReader.java | 45 ++++++++++++++++++++
.../carbondata/hadoop/CarbonRecordReader.java | 5 ++-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 13 +-----
.../VectorizedCarbonRecordReader.java | 11 +++--
4 files changed, 58 insertions(+), 16 deletions(-)
----------------------------------------------------------------------