You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by "anubhav tarar (JIRA)" <ji...@apache.org> on 2018/01/08 12:32:00 UTC

[jira] [Updated] (CARBONDATA-1991) Select query from a streaming table throws ClassCastException

     [ https://issues.apache.org/jira/browse/CARBONDATA-1991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

anubhav tarar updated CARBONDATA-1991:
--------------------------------------
    Description: 
Start spark shell using:
./bin/spark-shell --jars carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar

Execute the following code:
 import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")

 carbon.sql("drop table if exists uniqdata_stream")

carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')");
import carbon.sqlContext.implicits._

val lines = carbon.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

Now start a server socket using:
nc -lk 9999 
and write the 2000_uniqdata.csv data to that terminal

Now execute the following on spark shell
val qry = lines.writeStream
             .format("carbondata")
             .option("checkpointLocation","/newtmp1")
            .option("dbName", "default")
            .option("tableName", "uniqdata_stream")
            .start()

          qry.awaitTermination()

Stop your socket and spark-shell when you are done with data writing,

Again start the spark-shell and perform the following query:
carbon.sql("select * from uniqdata_stream").show

It throws the following exception:
18/01/05 18:25:04 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ClassCastException: java.math.BigDecimal cannot be cast to org.apache.spark.sql.types.Decimal
	at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.putRowToColumnBatch(CarbonStreamRecordReader.java:702)
	at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.scanBlockletAndFillVector(CarbonStreamRecordReader.java:424)
	at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextColumnarBatch(CarbonStreamRecordReader.java:324)
	at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextKeyValue(CarbonStreamRecordReader.java:305)
	at org.apache.carbondata.spark.rdd.CarbonScanRDD$$anon$1.hasNext(CarbonScanRDD.scala:370)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)



  was:
Start spark shell using:
./bin/spark-shell --jars carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar

Execute the following code:
 import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")

 carbon.sql("drop table if exists uniqdata_part")

carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')");
import carbon.sqlContext.implicits._

val lines = carbon.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

Now start a server socket using:
nc -lk 9999 
and write the 2000_uniqdata.csv data to that terminal

Now execute the following on spark shell
val qry = lines.writeStream
             .format("carbondata")
             .option("checkpointLocation","/newtmp1")
            .option("dbName", "default")
            .option("tableName", "uniqdata_stream")
            .start()

          qry.awaitTermination()

Stop your socket and spark-shell when you are done with data writing,

Again start the spark-shell and perform the following query:
carbon.sql("select * from uniqdata_stream").show

It throws the following exception:
18/01/05 18:25:04 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ClassCastException: java.math.BigDecimal cannot be cast to org.apache.spark.sql.types.Decimal
	at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.putRowToColumnBatch(CarbonStreamRecordReader.java:702)
	at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.scanBlockletAndFillVector(CarbonStreamRecordReader.java:424)
	at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextColumnarBatch(CarbonStreamRecordReader.java:324)
	at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextKeyValue(CarbonStreamRecordReader.java:305)
	at org.apache.carbondata.spark.rdd.CarbonScanRDD$$anon$1.hasNext(CarbonScanRDD.scala:370)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)




> Select query from a streaming table throws ClassCastException
> -------------------------------------------------------------
>
>                 Key: CARBONDATA-1991
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-1991
>             Project: CarbonData
>          Issue Type: Bug
>          Components: data-query
>    Affects Versions: 1.3.0
>         Environment: spark2.1
>            Reporter: Geetika Gupta
>            Assignee: anubhav tarar
>             Fix For: 1.3.0
>
>         Attachments: 2000_UniqData.csv
>
>
> Start spark shell using:
> ./bin/spark-shell --jars carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar
> Execute the following code:
>  import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.CarbonSession._
> val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")
>  carbon.sql("drop table if exists uniqdata_stream")
> carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')");
> import carbon.sqlContext.implicits._
> val lines = carbon.readStream
>   .format("socket")
>   .option("host", "localhost")
>   .option("port", 9999)
>   .load()
> Now start a server socket using:
> nc -lk 9999 
> and write the 2000_uniqdata.csv data to that terminal
> Now execute the following on spark shell
> val qry = lines.writeStream
>              .format("carbondata")
>              .option("checkpointLocation","/newtmp1")
>             .option("dbName", "default")
>             .option("tableName", "uniqdata_stream")
>             .start()
>           qry.awaitTermination()
> Stop your socket and spark-shell when you are done with data writing,
> Again start the spark-shell and perform the following query:
> carbon.sql("select * from uniqdata_stream").show
> It throws the following exception:
> 18/01/05 18:25:04 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.lang.ClassCastException: java.math.BigDecimal cannot be cast to org.apache.spark.sql.types.Decimal
> 	at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.putRowToColumnBatch(CarbonStreamRecordReader.java:702)
> 	at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.scanBlockletAndFillVector(CarbonStreamRecordReader.java:424)
> 	at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextColumnarBatch(CarbonStreamRecordReader.java:324)
> 	at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextKeyValue(CarbonStreamRecordReader.java:305)
> 	at org.apache.carbondata.spark.rdd.CarbonScanRDD$$anon$1.hasNext(CarbonScanRDD.scala:370)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:99)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)