You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by "Geetika Gupta (JIRA)" <ji...@apache.org> on 2018/02/26 05:10:00 UTC
[jira] [Updated] (CARBONDATA-2198) Streaming data to a table with
bad_records_action as IGNORE throws ClassCastException
[ https://issues.apache.org/jira/browse/CARBONDATA-2198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Geetika Gupta updated CARBONDATA-2198:
--------------------------------------
Description:
Steps to reproduce:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.examples
import java.io.\{File, PrintWriter}
import java.net.ServerSocket
import org.apache.spark.sql.\{CarbonEnv, SparkSession}
import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.\{CarbonStorePath, CarbonTablePath}
// scalastyle:off println
object CarbonStructuredStreamingExample {
def main(args: Array[String]) {
// setup paths
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
val storeLocation = s"$rootPath/examples/spark2/target/store"
val warehouse = s"$rootPath/examples/spark2/target/warehouse"
val metastoredb = s"$rootPath/examples/spark2/target"
val streamTableName = s"stream_table"
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
import org.apache.spark.sql.CarbonSession._
val spark = SparkSession
.builder()
.master("local")
.appName("CarbonStructuredStreamingExample")
.config("spark.sql.warehouse.dir", warehouse)
.getOrCreateCarbonSession(storeLocation, metastoredb)
spark.sparkContext.setLogLevel("ERROR")
val requireCreateTable = true
val useComplexDataType = false
if (requireCreateTable) {
// drop table if exists previously
spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
// Create target carbon table and populate with initial data
if (useComplexDataType) {
spark.sql(
s"""
| CREATE TABLE ${ streamTableName }(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT,
| file struct<school:array<string>, age:int>
| )
| STORED BY 'carbondata'
| TBLPROPERTIES(
| 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
| """.stripMargin)
} else {
spark.sql(
s"""
| CREATE TABLE ${ streamTableName }(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT
| )
| STORED BY 'carbondata'
| TBLPROPERTIES(
| 'streaming'='true', 'sort_columns'='name')
| """.stripMargin)
}
val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
// streaming ingest
val serverSocket = new ServerSocket(7071)
val thread1 = startStreaming(spark, tablePath)
val thread2 = writeSocket(serverSocket)
System.out.println("type enter to interrupt streaming")
System.in.read()
thread1.interrupt()
thread2.interrupt()
serverSocket.close()
}
spark.sql(s"select * from $streamTableName").show
spark.stop()
System.out.println("streaming finished")
}
def showTableCount(spark: SparkSession, tableName: String): Thread = {
val thread = new Thread() {
override def run(): Unit = {
for (_ <- 0 to 1000) {
spark.sql(s"select count(*) from $tableName").show(truncate = false)
Thread.sleep(1000 * 3)
}
}
}
thread.start()
thread
}
def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = {
val thread = new Thread() {
override def run(): Unit = {
var qry: StreamingQuery = null
try {
val readSocketDF = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 7071)
.load()
// Write data from socket stream to carbondata file
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime("5 seconds"))
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
.option("bad_records_action", "ignore")
.option("dbName", "default")
.option("tableName", "stream_table")
.start()
qry.awaitTermination()
} catch {
case ex: Exception =>
ex.printStackTrace()
println("Done reading and writing streaming data")
} finally {
qry.stop()
}
}
}
thread.start()
thread
}
def writeSocket(serverSocket: ServerSocket): Thread = {
val thread = new Thread() {
override def run(): Unit = {
// wait for client to connection request and accept
val clientSocket = serverSocket.accept()
val socketWriter = new PrintWriter(clientSocket.getOutputStream())
var index = 0
for (_ <- 1 to 1000) {
// write 5 records per iteration
for (_ <- 0 to 1000) {
index = index + 1
socketWriter.println("null" + ",name_" + index
+ ",city_" + index + "," + (index * 10000.00).toString +
",school_" + index + ":school_" + index + index + "$" + index)
}
socketWriter.flush()
Thread.sleep(1000)
}
socketWriter.close()
System.out.println("Socket closed")
}
}
thread.start()
thread
}
}
// scalastyle:on println
In the above example we are streaming data to table with bad_records_action as IGNORE, it throws ClassCastException.
Here are the logs:
18/02/23 16:09:50 ERROR StreamSegment: Executor task launch worker-0 Failed to append batch data to stream segment: /home/geetika/Workspace/incubator-carbondata/examples/spark2/target/store/default/stream_table/Fact/Part0/Segment_0
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/02/23 16:09:50 ERROR Utils: Aborting task
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/02/23 16:09:50 ERROR CarbonAppendableStreamSink$: Executor task launch worker-0 Job job_20180223160950_0000 aborted.
18/02/23 16:09:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.carbondata.streaming.CarbonStreamException: Task failed while writing rows
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:324)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
... 8 more
*strong text*
was:
Steps to reproduce:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.examples
import java.io.\{File, PrintWriter}
import java.net.ServerSocket
import org.apache.spark.sql.\{CarbonEnv, SparkSession}
import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.\{CarbonStorePath, CarbonTablePath}
// scalastyle:off println
object CarbonStructuredStreamingExample {
def main(args: Array[String]) {
// setup paths
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
val storeLocation = s"$rootPath/examples/spark2/target/store"
val warehouse = s"$rootPath/examples/spark2/target/warehouse"
val metastoredb = s"$rootPath/examples/spark2/target"
val streamTableName = s"stream_table"
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
import org.apache.spark.sql.CarbonSession._
val spark = SparkSession
.builder()
.master("local")
.appName("CarbonStructuredStreamingExample")
.config("spark.sql.warehouse.dir", warehouse)
.getOrCreateCarbonSession(storeLocation, metastoredb)
spark.sparkContext.setLogLevel("ERROR")
val requireCreateTable = true
val useComplexDataType = false
if (requireCreateTable) {
// drop table if exists previously
spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
// Create target carbon table and populate with initial data
if (useComplexDataType) {
spark.sql(
s"""
| CREATE TABLE ${ streamTableName }(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT,
| file struct<school:array<string>, age:int>
| )
| STORED BY 'carbondata'
| TBLPROPERTIES(
| 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
| """.stripMargin)
} else {
spark.sql(
s"""
| CREATE TABLE ${ streamTableName }(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT
| )
| STORED BY 'carbondata'
| TBLPROPERTIES(
| 'streaming'='true', 'sort_columns'='name')
| """.stripMargin)
}
val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
// streaming ingest
val serverSocket = new ServerSocket(7071)
val thread1 = startStreaming(spark, tablePath)
val thread2 = writeSocket(serverSocket)
System.out.println("type enter to interrupt streaming")
System.in.read()
thread1.interrupt()
thread2.interrupt()
serverSocket.close()
}
spark.sql(s"select * from $streamTableName").show
spark.stop()
System.out.println("streaming finished")
}
def showTableCount(spark: SparkSession, tableName: String): Thread = {
val thread = new Thread() {
override def run(): Unit = {
for (_ <- 0 to 1000) {
spark.sql(s"select count(*) from $tableName").show(truncate = false)
Thread.sleep(1000 * 3)
}
}
}
thread.start()
thread
}
def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = {
val thread = new Thread() {
override def run(): Unit = {
var qry: StreamingQuery = null
try {
val readSocketDF = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 7071)
.load()
// Write data from socket stream to carbondata file
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime("5 seconds"))
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
.option("bad_records_action", "ignore")
.option("dbName", "default")
.option("tableName", "stream_table")
.start()
qry.awaitTermination()
} catch {
case ex: Exception =>
ex.printStackTrace()
println("Done reading and writing streaming data")
} finally {
qry.stop()
}
}
}
thread.start()
thread
}
def writeSocket(serverSocket: ServerSocket): Thread = {
val thread = new Thread() {
override def run(): Unit = {
// wait for client to connection request and accept
val clientSocket = serverSocket.accept()
val socketWriter = new PrintWriter(clientSocket.getOutputStream())
var index = 0
for (_ <- 1 to 1000) {
// write 5 records per iteration
for (_ <- 0 to 1000) {
index = index + 1
socketWriter.println("null" + ",name_" + index
+ ",city_" + index + "," + (index * 10000.00).toString +
",school_" + index + ":school_" + index + index + "$" + index)
}
socketWriter.flush()
Thread.sleep(1000)
}
socketWriter.close()
System.out.println("Socket closed")
}
}
thread.start()
thread
}
}
// scalastyle:on println
In the above example we are streaming data to table with bad_records_action as IGNORE, it throws ClassCastException.
Here are the logs:
18/02/23 16:09:50 ERROR StreamSegment: Executor task launch worker-0 Failed to append batch data to stream segment: /home/geetika/Workspace/incubator-carbondata/examples/spark2/target/store/default/stream_table/Fact/Part0/Segment_0
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/02/23 16:09:50 ERROR Utils: Aborting task
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/02/23 16:09:50 ERROR CarbonAppendableStreamSink$: Executor task launch worker-0 Job job_20180223160950_0000 aborted.
18/02/23 16:09:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.carbondata.streaming.CarbonStreamException: Task failed while writing rows
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:324)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
... 8 more
> Streaming data to a table with bad_records_action as IGNORE throws ClassCastException
> -------------------------------------------------------------------------------------
>
> Key: CARBONDATA-2198
> URL: https://issues.apache.org/jira/browse/CARBONDATA-2198
> Project: CarbonData
> Issue Type: Bug
> Components: data-load
> Affects Versions: 1.4.0
> Reporter: Geetika Gupta
> Priority: Minor
>
> Steps to reproduce:
> /*
> * Licensed to the Apache Software Foundation (ASF) under one or more
> * contributor license agreements. See the NOTICE file distributed with
> * this work for additional information regarding copyright ownership.
> * The ASF licenses this file to You under the Apache License, Version 2.0
> * (the "License"); you may not use this file except in compliance with
> * the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> package org.apache.carbondata.examples
> import java.io.\{File, PrintWriter}
> import java.net.ServerSocket
> import org.apache.spark.sql.\{CarbonEnv, SparkSession}
> import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery}
> import org.apache.carbondata.core.constants.CarbonCommonConstants
> import org.apache.carbondata.core.util.CarbonProperties
> import org.apache.carbondata.core.util.path.\{CarbonStorePath, CarbonTablePath}
> // scalastyle:off println
> object CarbonStructuredStreamingExample {
> def main(args: Array[String]) {
> // setup paths
> val rootPath = new File(this.getClass.getResource("/").getPath
> + "../../../..").getCanonicalPath
> val storeLocation = s"$rootPath/examples/spark2/target/store"
> val warehouse = s"$rootPath/examples/spark2/target/warehouse"
> val metastoredb = s"$rootPath/examples/spark2/target"
> val streamTableName = s"stream_table"
> CarbonProperties.getInstance()
> .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
> import org.apache.spark.sql.CarbonSession._
> val spark = SparkSession
> .builder()
> .master("local")
> .appName("CarbonStructuredStreamingExample")
> .config("spark.sql.warehouse.dir", warehouse)
> .getOrCreateCarbonSession(storeLocation, metastoredb)
> spark.sparkContext.setLogLevel("ERROR")
> val requireCreateTable = true
> val useComplexDataType = false
> if (requireCreateTable) {
> // drop table if exists previously
> spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
> // Create target carbon table and populate with initial data
> if (useComplexDataType) {
> spark.sql(
> s"""
> | CREATE TABLE ${ streamTableName }(
> | id INT,
> | name STRING,
> | city STRING,
> | salary FLOAT,
> | file struct<school:array<string>, age:int>
> | )
> | STORED BY 'carbondata'
> | TBLPROPERTIES(
> | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
> | """.stripMargin)
> } else {
> spark.sql(
> s"""
> | CREATE TABLE ${ streamTableName }(
> | id INT,
> | name STRING,
> | city STRING,
> | salary FLOAT
> | )
> | STORED BY 'carbondata'
> | TBLPROPERTIES(
> | 'streaming'='true', 'sort_columns'='name')
> | """.stripMargin)
> }
> val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
> val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
>
> // streaming ingest
> val serverSocket = new ServerSocket(7071)
> val thread1 = startStreaming(spark, tablePath)
> val thread2 = writeSocket(serverSocket)
> System.out.println("type enter to interrupt streaming")
> System.in.read()
> thread1.interrupt()
> thread2.interrupt()
> serverSocket.close()
> }
>
> spark.sql(s"select * from $streamTableName").show
> spark.stop()
> System.out.println("streaming finished")
> }
> def showTableCount(spark: SparkSession, tableName: String): Thread = {
> val thread = new Thread() {
> override def run(): Unit = {
> for (_ <- 0 to 1000) {
> spark.sql(s"select count(*) from $tableName").show(truncate = false)
> Thread.sleep(1000 * 3)
> }
> }
> }
> thread.start()
> thread
> }
> def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = {
> val thread = new Thread() {
> override def run(): Unit = {
> var qry: StreamingQuery = null
> try {
> val readSocketDF = spark.readStream
> .format("socket")
> .option("host", "localhost")
> .option("port", 7071)
> .load()
> // Write data from socket stream to carbondata file
> qry = readSocketDF.writeStream
> .format("carbondata")
> .trigger(ProcessingTime("5 seconds"))
> .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
> .option("bad_records_action", "ignore")
> .option("dbName", "default")
> .option("tableName", "stream_table")
> .start()
> qry.awaitTermination()
> } catch {
> case ex: Exception =>
> ex.printStackTrace()
> println("Done reading and writing streaming data")
> } finally {
> qry.stop()
> }
> }
> }
> thread.start()
> thread
> }
> def writeSocket(serverSocket: ServerSocket): Thread = {
> val thread = new Thread() {
> override def run(): Unit = {
> // wait for client to connection request and accept
> val clientSocket = serverSocket.accept()
> val socketWriter = new PrintWriter(clientSocket.getOutputStream())
> var index = 0
> for (_ <- 1 to 1000) {
> // write 5 records per iteration
> for (_ <- 0 to 1000) {
> index = index + 1
> socketWriter.println("null" + ",name_" + index
> + ",city_" + index + "," + (index * 10000.00).toString +
> ",school_" + index + ":school_" + index + index + "$" + index)
> }
> socketWriter.flush()
> Thread.sleep(1000)
> }
> socketWriter.close()
> System.out.println("Socket closed")
> }
> }
> thread.start()
> thread
> }
> }
> // scalastyle:on println
> In the above example we are streaming data to table with bad_records_action as IGNORE, it throws ClassCastException.
>
> Here are the logs:
> 18/02/23 16:09:50 ERROR StreamSegment: Executor task launch worker-0 Failed to append batch data to stream segment: /home/geetika/Workspace/incubator-carbondata/examples/spark2/target/store/default/stream_table/Fact/Part0/Segment_0
> java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
> at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
> at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
> at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 18/02/23 16:09:50 ERROR Utils: Aborting task
> java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
> at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
> at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
> at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 18/02/23 16:09:50 ERROR CarbonAppendableStreamSink$: Executor task launch worker-0 Job job_20180223160950_0000 aborted.
> 18/02/23 16:09:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> org.apache.carbondata.streaming.CarbonStreamException: Task failed while writing rows
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:324)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
> at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241)
> at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
> at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
> at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
> ... 8 more
> *strong text*
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)