You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by "Geetika Gupta (JIRA)" <ji...@apache.org> on 2018/02/26 05:10:00 UTC

[jira] [Updated] (CARBONDATA-2198) Streaming data to a table with bad_records_action as IGNORE throws ClassCastException

     [ https://issues.apache.org/jira/browse/CARBONDATA-2198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Geetika Gupta updated CARBONDATA-2198:
--------------------------------------
    Description: 
Steps to reproduce:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.carbondata.examples

import java.io.\{File, PrintWriter}
import java.net.ServerSocket

import org.apache.spark.sql.\{CarbonEnv, SparkSession}
import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery}

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.\{CarbonStorePath, CarbonTablePath}

// scalastyle:off println
object CarbonStructuredStreamingExample {
 def main(args: Array[String]) {

 // setup paths
 val rootPath = new File(this.getClass.getResource("/").getPath
 + "../../../..").getCanonicalPath
 val storeLocation = s"$rootPath/examples/spark2/target/store"
 val warehouse = s"$rootPath/examples/spark2/target/warehouse"
 val metastoredb = s"$rootPath/examples/spark2/target"
 val streamTableName = s"stream_table"

 CarbonProperties.getInstance()
 .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")

 import org.apache.spark.sql.CarbonSession._
 val spark = SparkSession
 .builder()
 .master("local")
 .appName("CarbonStructuredStreamingExample")
 .config("spark.sql.warehouse.dir", warehouse)
 .getOrCreateCarbonSession(storeLocation, metastoredb)

 spark.sparkContext.setLogLevel("ERROR")

 val requireCreateTable = true
 val useComplexDataType = false

 if (requireCreateTable) {
 // drop table if exists previously
 spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
 // Create target carbon table and populate with initial data
 if (useComplexDataType) {
 spark.sql(
 s"""
 | CREATE TABLE ${ streamTableName }(
 | id INT,
 | name STRING,
 | city STRING,
 | salary FLOAT,
 | file struct<school:array<string>, age:int>
 | )
 | STORED BY 'carbondata'
 | TBLPROPERTIES(
 | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
 | """.stripMargin)
 } else {
 spark.sql(
 s"""
 | CREATE TABLE ${ streamTableName }(
 | id INT,
 | name STRING,
 | city STRING,
 | salary FLOAT
 | )
 | STORED BY 'carbondata'
 | TBLPROPERTIES(
 | 'streaming'='true', 'sort_columns'='name')
 | """.stripMargin)
 }

 val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
 val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
 
 // streaming ingest
 val serverSocket = new ServerSocket(7071)
 val thread1 = startStreaming(spark, tablePath)
 val thread2 = writeSocket(serverSocket)

 System.out.println("type enter to interrupt streaming")
 System.in.read()
 thread1.interrupt()
 thread2.interrupt()
 serverSocket.close()
 }
 
 spark.sql(s"select * from $streamTableName").show
 spark.stop()
 System.out.println("streaming finished")
 }

 def showTableCount(spark: SparkSession, tableName: String): Thread = {
 val thread = new Thread() {
 override def run(): Unit = {
 for (_ <- 0 to 1000) {
 spark.sql(s"select count(*) from $tableName").show(truncate = false)
 Thread.sleep(1000 * 3)
 }
 }
 }
 thread.start()
 thread
 }

 def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = {
 val thread = new Thread() {
 override def run(): Unit = {
 var qry: StreamingQuery = null
 try {
 val readSocketDF = spark.readStream
 .format("socket")
 .option("host", "localhost")
 .option("port", 7071)
 .load()

 // Write data from socket stream to carbondata file
 qry = readSocketDF.writeStream
 .format("carbondata")
 .trigger(ProcessingTime("5 seconds"))
 .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
 .option("bad_records_action", "ignore")
 .option("dbName", "default")
 .option("tableName", "stream_table")
 .start()

 qry.awaitTermination()
 } catch {
 case ex: Exception =>
 ex.printStackTrace()
 println("Done reading and writing streaming data")
 } finally {
 qry.stop()
 }
 }
 }
 thread.start()
 thread
 }

 def writeSocket(serverSocket: ServerSocket): Thread = {
 val thread = new Thread() {
 override def run(): Unit = {
 // wait for client to connection request and accept
 val clientSocket = serverSocket.accept()
 val socketWriter = new PrintWriter(clientSocket.getOutputStream())
 var index = 0
 for (_ <- 1 to 1000) {
 // write 5 records per iteration
 for (_ <- 0 to 1000) {
 index = index + 1
 socketWriter.println("null" + ",name_" + index
 + ",city_" + index + "," + (index * 10000.00).toString +
 ",school_" + index + ":school_" + index + index + "$" + index)
 }
 socketWriter.flush()
 Thread.sleep(1000)
 }
 socketWriter.close()
 System.out.println("Socket closed")
 }
 }
 thread.start()
 thread
 }
}
// scalastyle:on println

In the above example we are streaming data to table with bad_records_action as IGNORE, it throws ClassCastException.

 

Here are the logs:

18/02/23 16:09:50 ERROR StreamSegment: Executor task launch worker-0 Failed to append batch data to stream segment: /home/geetika/Workspace/incubator-carbondata/examples/spark2/target/store/default/stream_table/Fact/Part0/Segment_0
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
 at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
 at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:99)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
18/02/23 16:09:50 ERROR Utils: Aborting task
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
 at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
 at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:99)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
18/02/23 16:09:50 ERROR CarbonAppendableStreamSink$: Executor task launch worker-0 Job job_20180223160950_0000 aborted.
18/02/23 16:09:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.carbondata.streaming.CarbonStreamException: Task failed while writing rows
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:324)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:99)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
 at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
 at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
 ... 8 more

 *strong text*

  was:
Steps to reproduce:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.carbondata.examples

import java.io.\{File, PrintWriter}
import java.net.ServerSocket

import org.apache.spark.sql.\{CarbonEnv, SparkSession}
import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery}

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.\{CarbonStorePath, CarbonTablePath}

// scalastyle:off println
object CarbonStructuredStreamingExample {
 def main(args: Array[String]) {

 // setup paths
 val rootPath = new File(this.getClass.getResource("/").getPath
 + "../../../..").getCanonicalPath
 val storeLocation = s"$rootPath/examples/spark2/target/store"
 val warehouse = s"$rootPath/examples/spark2/target/warehouse"
 val metastoredb = s"$rootPath/examples/spark2/target"
 val streamTableName = s"stream_table"

 CarbonProperties.getInstance()
 .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")

 import org.apache.spark.sql.CarbonSession._
 val spark = SparkSession
 .builder()
 .master("local")
 .appName("CarbonStructuredStreamingExample")
 .config("spark.sql.warehouse.dir", warehouse)
 .getOrCreateCarbonSession(storeLocation, metastoredb)

 spark.sparkContext.setLogLevel("ERROR")

 val requireCreateTable = true
 val useComplexDataType = false

 if (requireCreateTable) {
 // drop table if exists previously
 spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
 // Create target carbon table and populate with initial data
 if (useComplexDataType) {
 spark.sql(
 s"""
 | CREATE TABLE ${ streamTableName }(
 | id INT,
 | name STRING,
 | city STRING,
 | salary FLOAT,
 | file struct<school:array<string>, age:int>
 | )
 | STORED BY 'carbondata'
 | TBLPROPERTIES(
 | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
 | """.stripMargin)
 } else {
 spark.sql(
 s"""
 | CREATE TABLE ${ streamTableName }(
 | id INT,
 | name STRING,
 | city STRING,
 | salary FLOAT
 | )
 | STORED BY 'carbondata'
 | TBLPROPERTIES(
 | 'streaming'='true', 'sort_columns'='name')
 | """.stripMargin)
 }

 val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
 val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
 
 // streaming ingest
 val serverSocket = new ServerSocket(7071)
 val thread1 = startStreaming(spark, tablePath)
 val thread2 = writeSocket(serverSocket)

 System.out.println("type enter to interrupt streaming")
 System.in.read()
 thread1.interrupt()
 thread2.interrupt()
 serverSocket.close()
 }
 
 spark.sql(s"select * from $streamTableName").show
 spark.stop()
 System.out.println("streaming finished")
 }

 def showTableCount(spark: SparkSession, tableName: String): Thread = {
 val thread = new Thread() {
 override def run(): Unit = {
 for (_ <- 0 to 1000) {
 spark.sql(s"select count(*) from $tableName").show(truncate = false)
 Thread.sleep(1000 * 3)
 }
 }
 }
 thread.start()
 thread
 }

 def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = {
 val thread = new Thread() {
 override def run(): Unit = {
 var qry: StreamingQuery = null
 try {
 val readSocketDF = spark.readStream
 .format("socket")
 .option("host", "localhost")
 .option("port", 7071)
 .load()

 // Write data from socket stream to carbondata file
 qry = readSocketDF.writeStream
 .format("carbondata")
 .trigger(ProcessingTime("5 seconds"))
 .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
 .option("bad_records_action", "ignore")
 .option("dbName", "default")
 .option("tableName", "stream_table")
 .start()

 qry.awaitTermination()
 } catch {
 case ex: Exception =>
 ex.printStackTrace()
 println("Done reading and writing streaming data")
 } finally {
 qry.stop()
 }
 }
 }
 thread.start()
 thread
 }

 def writeSocket(serverSocket: ServerSocket): Thread = {
 val thread = new Thread() {
 override def run(): Unit = {
 // wait for client to connection request and accept
 val clientSocket = serverSocket.accept()
 val socketWriter = new PrintWriter(clientSocket.getOutputStream())
 var index = 0
 for (_ <- 1 to 1000) {
 // write 5 records per iteration
 for (_ <- 0 to 1000) {
 index = index + 1
 socketWriter.println("null" + ",name_" + index
 + ",city_" + index + "," + (index * 10000.00).toString +
 ",school_" + index + ":school_" + index + index + "$" + index)
 }
 socketWriter.flush()
 Thread.sleep(1000)
 }
 socketWriter.close()
 System.out.println("Socket closed")
 }
 }
 thread.start()
 thread
 }
}
// scalastyle:on println

In the above example we are streaming data to table with bad_records_action as IGNORE, it throws ClassCastException.

 

Here are the logs:

18/02/23 16:09:50 ERROR StreamSegment: Executor task launch worker-0 Failed to append batch data to stream segment: /home/geetika/Workspace/incubator-carbondata/examples/spark2/target/store/default/stream_table/Fact/Part0/Segment_0
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
 at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
 at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:99)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
18/02/23 16:09:50 ERROR Utils: Aborting task
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
 at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
 at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:99)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
18/02/23 16:09:50 ERROR CarbonAppendableStreamSink$: Executor task launch worker-0 Job job_20180223160950_0000 aborted.
18/02/23 16:09:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.carbondata.streaming.CarbonStreamException: Task failed while writing rows
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:324)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:99)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
 at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
 at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
 at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
 ... 8 more

 


> Streaming data to a table with bad_records_action as IGNORE throws ClassCastException
> -------------------------------------------------------------------------------------
>
>                 Key: CARBONDATA-2198
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-2198
>             Project: CarbonData
>          Issue Type: Bug
>          Components: data-load
>    Affects Versions: 1.4.0
>            Reporter: Geetika Gupta
>            Priority: Minor
>
> Steps to reproduce:
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements. See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License. You may obtain a copy of the License at
>  *
>  * http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.carbondata.examples
> import java.io.\{File, PrintWriter}
> import java.net.ServerSocket
> import org.apache.spark.sql.\{CarbonEnv, SparkSession}
> import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery}
> import org.apache.carbondata.core.constants.CarbonCommonConstants
> import org.apache.carbondata.core.util.CarbonProperties
> import org.apache.carbondata.core.util.path.\{CarbonStorePath, CarbonTablePath}
> // scalastyle:off println
> object CarbonStructuredStreamingExample {
>  def main(args: Array[String]) {
>  // setup paths
>  val rootPath = new File(this.getClass.getResource("/").getPath
>  + "../../../..").getCanonicalPath
>  val storeLocation = s"$rootPath/examples/spark2/target/store"
>  val warehouse = s"$rootPath/examples/spark2/target/warehouse"
>  val metastoredb = s"$rootPath/examples/spark2/target"
>  val streamTableName = s"stream_table"
>  CarbonProperties.getInstance()
>  .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
>  import org.apache.spark.sql.CarbonSession._
>  val spark = SparkSession
>  .builder()
>  .master("local")
>  .appName("CarbonStructuredStreamingExample")
>  .config("spark.sql.warehouse.dir", warehouse)
>  .getOrCreateCarbonSession(storeLocation, metastoredb)
>  spark.sparkContext.setLogLevel("ERROR")
>  val requireCreateTable = true
>  val useComplexDataType = false
>  if (requireCreateTable) {
>  // drop table if exists previously
>  spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
>  // Create target carbon table and populate with initial data
>  if (useComplexDataType) {
>  spark.sql(
>  s"""
>  | CREATE TABLE ${ streamTableName }(
>  | id INT,
>  | name STRING,
>  | city STRING,
>  | salary FLOAT,
>  | file struct<school:array<string>, age:int>
>  | )
>  | STORED BY 'carbondata'
>  | TBLPROPERTIES(
>  | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
>  | """.stripMargin)
>  } else {
>  spark.sql(
>  s"""
>  | CREATE TABLE ${ streamTableName }(
>  | id INT,
>  | name STRING,
>  | city STRING,
>  | salary FLOAT
>  | )
>  | STORED BY 'carbondata'
>  | TBLPROPERTIES(
>  | 'streaming'='true', 'sort_columns'='name')
>  | """.stripMargin)
>  }
>  val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
>  val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
>  
>  // streaming ingest
>  val serverSocket = new ServerSocket(7071)
>  val thread1 = startStreaming(spark, tablePath)
>  val thread2 = writeSocket(serverSocket)
>  System.out.println("type enter to interrupt streaming")
>  System.in.read()
>  thread1.interrupt()
>  thread2.interrupt()
>  serverSocket.close()
>  }
>  
>  spark.sql(s"select * from $streamTableName").show
>  spark.stop()
>  System.out.println("streaming finished")
>  }
>  def showTableCount(spark: SparkSession, tableName: String): Thread = {
>  val thread = new Thread() {
>  override def run(): Unit = {
>  for (_ <- 0 to 1000) {
>  spark.sql(s"select count(*) from $tableName").show(truncate = false)
>  Thread.sleep(1000 * 3)
>  }
>  }
>  }
>  thread.start()
>  thread
>  }
>  def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = {
>  val thread = new Thread() {
>  override def run(): Unit = {
>  var qry: StreamingQuery = null
>  try {
>  val readSocketDF = spark.readStream
>  .format("socket")
>  .option("host", "localhost")
>  .option("port", 7071)
>  .load()
>  // Write data from socket stream to carbondata file
>  qry = readSocketDF.writeStream
>  .format("carbondata")
>  .trigger(ProcessingTime("5 seconds"))
>  .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
>  .option("bad_records_action", "ignore")
>  .option("dbName", "default")
>  .option("tableName", "stream_table")
>  .start()
>  qry.awaitTermination()
>  } catch {
>  case ex: Exception =>
>  ex.printStackTrace()
>  println("Done reading and writing streaming data")
>  } finally {
>  qry.stop()
>  }
>  }
>  }
>  thread.start()
>  thread
>  }
>  def writeSocket(serverSocket: ServerSocket): Thread = {
>  val thread = new Thread() {
>  override def run(): Unit = {
>  // wait for client to connection request and accept
>  val clientSocket = serverSocket.accept()
>  val socketWriter = new PrintWriter(clientSocket.getOutputStream())
>  var index = 0
>  for (_ <- 1 to 1000) {
>  // write 5 records per iteration
>  for (_ <- 0 to 1000) {
>  index = index + 1
>  socketWriter.println("null" + ",name_" + index
>  + ",city_" + index + "," + (index * 10000.00).toString +
>  ",school_" + index + ":school_" + index + index + "$" + index)
>  }
>  socketWriter.flush()
>  Thread.sleep(1000)
>  }
>  socketWriter.close()
>  System.out.println("Socket closed")
>  }
>  }
>  thread.start()
>  thread
>  }
> }
> // scalastyle:on println
> In the above example we are streaming data to table with bad_records_action as IGNORE, it throws ClassCastException.
>  
> Here are the logs:
> 18/02/23 16:09:50 ERROR StreamSegment: Executor task launch worker-0 Failed to append batch data to stream segment: /home/geetika/Workspace/incubator-carbondata/examples/spark2/target/store/default/stream_table/Fact/Part0/Segment_0
> java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
>  at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
>  at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
>  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  at org.apache.spark.scheduler.Task.run(Task.scala:99)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> 18/02/23 16:09:50 ERROR Utils: Aborting task
> java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
>  at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
>  at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
>  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  at org.apache.spark.scheduler.Task.run(Task.scala:99)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> 18/02/23 16:09:50 ERROR CarbonAppendableStreamSink$: Executor task launch worker-0 Job job_20180223160950_0000 aborted.
> 18/02/23 16:09:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> org.apache.carbondata.streaming.CarbonStreamException: Task failed while writing rows
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:324)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  at org.apache.spark.scheduler.Task.run(Task.scala:99)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
>  at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
>  at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
>  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
>  at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
>  ... 8 more
>  *strong text*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)