You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/09/14 14:05:15 UTC
carbondata git commit: [CARBONDATA-2925]Wrong data displayed for
spark file format if carbon has more blocklets
Repository: carbondata
Updated Branches:
refs/heads/master c40d85478 -> 2b7c8b374
[CARBONDATA-2925]Wrong data displayed for spark file format if carbon has more blocklets
Issue:- if Carbon file has multiple blocklet ,in select query wrong data displayed.
Root Cause :- it is showing records of only for 1st Blocklet and other blocklet in that block is getting skipped. This is because default blocklet is 0 and CarbonFileformat create blockletInfo with default configuration (not changed blockletID).
Solution :- Set default blockletID to -1 so that all blocklets are considered.
refer org.apache.carbondata.core.scan.executor.impl.AbstractQueryExecutor#readAndFillBlockletInfo
This closes #2703
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2b7c8b37
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2b7c8b37
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2b7c8b37
Branch: refs/heads/master
Commit: 2b7c8b3743de1cccf3a2943ceb345684b7fb8d69
Parents: c40d854
Author: BJangir <ba...@gmail.com>
Authored: Mon Sep 10 19:35:33 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Sep 14 19:35:02 2018 +0530
----------------------------------------------------------------------
.../core/indexstore/BlockletDetailInfo.java | 3 +-
...tCreateTableUsingSparkCarbonFileFormat.scala | 60 +++++++++++++++++++-
2 files changed, 59 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b7c8b37/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
index 47455c7..973a240 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -51,7 +51,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
private short versionNumber;
- private short blockletId;
+ // default blockletId should be -1,which means consider all the blocklets in block
+ private short blockletId = -1;
private int[] dimLens;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b7c8b37/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
index 43f04b8..6a803fc 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -18,18 +18,24 @@
package org.apache.spark.sql.carbondata.datasource
import java.io.File
+import java.text.SimpleDateFormat
+import java.util.{Date, Random}
import org.apache.commons.io.FileUtils
import org.apache.commons.lang.RandomStringUtils
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.spark.util.SparkUtil
import org.apache.spark.sql.carbondata.datasource.TestUtil.{spark, _}
-
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonV3DataFormatConstants}
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndAfterAll {
@@ -322,6 +328,54 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
assert(new File(filePath).exists())
cleanTestData()
}
+ test("Read data having multi blocklet ") {
+ buildTestDataMuliBlockLet(700000)
+ assert(new File(writerPath).exists())
+ spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ //data source file format
+ spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """)
+ } else {
+ //data source file format
+ spark.sql(
+ s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
+ |'$writerPath' """.stripMargin)
+ }
+ spark.sql("select count(*) from sdkOutputTable").show(false)
+ val result=checkAnswer(spark.sql("select count(*) from sdkOutputTable"),Seq(Row(700000)))
+ if(result.isDefined){
+ assert(false,result.get)
+ }
+ spark.sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+ def buildTestDataMuliBlockLet(records :Int): Unit ={
+ FileUtils.deleteDirectory(new File(writerPath))
+ val fields=new Array[Field](8)
+ fields(0)=new Field("myid",DataTypes.INT);
+ fields(1)=new Field("event_id",DataTypes.STRING);
+ fields(2)=new Field("eve_time",DataTypes.DATE);
+ fields(3)=new Field("ingestion_time",DataTypes.TIMESTAMP);
+ fields(4)=new Field("alldate",DataTypes.createArrayType(DataTypes.DATE));
+ fields(5)=new Field("subject",DataTypes.STRING);
+ fields(6)=new Field("from_email",DataTypes.STRING);
+ fields(7)=new Field("sal",DataTypes.DOUBLE);
+ import scala.collection.JavaConverters._
+ try{
+ val options=Map("bad_records_action"->"FORCE","complex_delimiter_level_1"->"$").asJava
+ val writer=CarbonWriter.builder().outputPath(writerPath).withBlockletSize(16).sortBy(Array("myid","ingestion_time","event_id")).withLoadOptions(options).buildWriterForCSVInput(new Schema(fields),spark.sessionState.newHadoopConf())
+ val timeF=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+ val date_F=new SimpleDateFormat("yyyy-MM-dd")
+ for(i<-1 to records){
+ val time=new Date(System.currentTimeMillis())
+ writer.write(Array(""+i,"event_"+i,""+date_F.format(time),""+timeF.format(time),""+date_F.format(time)+"$"+date_F.format(time),"Subject_0","FromEmail",""+new Random().nextDouble()))
+ }
+ writer.close()
+ }
+ }
test("Test with long string columns") {
FileUtils.deleteDirectory(new File(writerPath))