You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/04/13 10:45:24 UTC

[1/2] incubator-carbondata git commit: result_size in query statistics is not giving valid row count if vector reader is enabled.

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master f4fc65199 -> f7d7e41ef


result_size in query statistics is not giving valid row count if vector reader is enabled.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ec2d742f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ec2d742f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ec2d742f

Branch: refs/heads/master
Commit: ec2d742f2e479e40883b92df014a8b260d50e526
Parents: f4fc651
Author: nareshpr <pr...@gmail.com>
Authored: Wed Apr 12 19:53:21 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Apr 13 16:14:09 2017 +0530

----------------------------------------------------------------------
 .../carbondata/hadoop/AbstractRecordReader.java | 45 ++++++++++++++++++++
 .../carbondata/hadoop/CarbonRecordReader.java   |  5 ++-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 13 +-----
 .../VectorizedCarbonRecordReader.java           | 11 +++--
 4 files changed, 58 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ec2d742f/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
new file mode 100644
index 0000000..e571ccf
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hadoop;
+
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+
+import org.apache.hadoop.mapreduce.RecordReader;
+
+/**
+ * This class will have all the common methods for vector and row based reader
+ */
+public abstract class AbstractRecordReader<T> extends RecordReader<Void, T> {
+
+  protected int rowCount = 0;
+
+  /**
+   * This method will log query result count and querytime
+   * @param recordCount
+   * @param recorder
+   */
+  public void logStatistics(int recordCount, QueryStatisticsRecorder recorder) {
+    // result size
+    QueryStatistic queryStatistic = new QueryStatistic();
+    queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount);
+    recorder.recordStatistics(queryStatistic);
+    // print executor query statistics for each task_id
+    recorder.logStatisticsAsTableExecutor();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ec2d742f/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 27c8b2f..26b269a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -33,13 +33,12 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 /**
  * Reads the data from Carbon store.
  */
-public class CarbonRecordReader<T> extends RecordReader<Void, T> {
+public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
 
   private QueryModel queryModel;
 
@@ -92,6 +91,7 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> {
   }
 
   @Override public T getCurrentValue() throws IOException, InterruptedException {
+    rowCount += 1;
     return readSupport.readRow(carbonIterator.next());
   }
 
@@ -101,6 +101,7 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> {
   }
 
   @Override public void close() throws IOException {
+    logStatistics(rowCount, queryModel.getStatisticsRecorder());
     // clear dictionary cache
     Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
     if (null != columnToDictionaryMapping) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ec2d742f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index ab0d603..4807b90 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -207,10 +207,9 @@ class CarbonScanRDD(
       new Iterator[Any] {
         private var havePair = false
         private var finished = false
-        private var count = 0
 
         context.addTaskCompletionListener { context =>
-          logStatistics(queryStartTime, count, model.getStatisticsRecorder)
+          logStatistics(queryStartTime, model.getStatisticsRecorder)
           reader.close()
         }
 
@@ -231,7 +230,6 @@ class CarbonScanRDD(
           }
           havePair = false
           val value = reader.getCurrentValue
-          count += 1
           value
         }
       }
@@ -265,18 +263,11 @@ class CarbonScanRDD(
     format
   }
 
-  def logStatistics(queryStartTime: Long, recordCount: Int,
-      recorder: QueryStatisticsRecorder): Unit = {
+  def logStatistics(queryStartTime: Long, recorder: QueryStatisticsRecorder): Unit = {
     var queryStatistic = new QueryStatistic()
     queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
       System.currentTimeMillis - queryStartTime)
     recorder.recordStatistics(queryStatistic)
-    // result size
-    queryStatistic = new QueryStatistic()
-    queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
-    recorder.recordStatistics(queryStatistic)
-    // print executor query statistics for each task_id
-    recorder.logStatisticsAsTableExecutor()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ec2d742f/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index ffff956..3fdf9af 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -38,12 +38,12 @@ import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResult
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.hadoop.AbstractRecordReader;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
 import org.apache.carbondata.spark.util.CarbonScalaUtil;
 
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
@@ -55,7 +55,7 @@ import org.apache.spark.sql.types.StructType;
  * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
  * carbondata column APIs and fills the data directly into columns.
  */
-class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
+class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
 
   private int batchIdx = 0;
 
@@ -116,6 +116,7 @@ class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
   }
 
   @Override public void close() throws IOException {
+    logStatistics(rowCount, queryModel.getStatisticsRecorder());
     if (columnarBatch != null) {
       columnarBatch.close();
       columnarBatch = null;
@@ -147,7 +148,11 @@ class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
   }
 
   @Override public Object getCurrentValue() throws IOException, InterruptedException {
-    if (returnColumnarBatch) return columnarBatch;
+    if (returnColumnarBatch) {
+      rowCount += columnarBatch.numValidRows();
+      return columnarBatch;
+    }
+    rowCount += 1;
     return columnarBatch.getRow(batchIdx - 1);
   }
 


[2/2] incubator-carbondata git commit: [CARBONDATA-919]result_size in query statistics is not giving valid row count if vector reader is enabled. This closes #790

Posted by ra...@apache.org.
[CARBONDATA-919]result_size in query statistics is not giving valid row count if vector reader is enabled. This closes #790


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f7d7e41e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f7d7e41e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f7d7e41e

Branch: refs/heads/master
Commit: f7d7e41efd987d341b5a40085ec550d3ee265e23
Parents: f4fc651 ec2d742
Author: ravipesala <ra...@gmail.com>
Authored: Thu Apr 13 16:15:09 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Apr 13 16:15:09 2017 +0530

----------------------------------------------------------------------
 .../carbondata/hadoop/AbstractRecordReader.java | 45 ++++++++++++++++++++
 .../carbondata/hadoop/CarbonRecordReader.java   |  5 ++-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 13 +-----
 .../VectorizedCarbonRecordReader.java           | 11 +++--
 4 files changed, 58 insertions(+), 16 deletions(-)
----------------------------------------------------------------------