You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by "Vandana Yadav (JIRA)" <ji...@apache.org> on 2018/04/17 12:48:00 UTC
[jira] [Created] (CARBONDATA-2354) Getting Error while executing
Streaming example from the streaming guide documentation
Vandana Yadav created CARBONDATA-2354:
-----------------------------------------
Summary: Getting Error while executing Streaming example from the streaming guide documentation
Key: CARBONDATA-2354
URL: https://issues.apache.org/jira/browse/CARBONDATA-2354
Project: CarbonData
Issue Type: Bug
Components: examples
Affects Versions: 1.4.0
Environment: spark 2.1, spark 2.2
Reporter: Vandana Yadav
Getting Error while executing Streaming example from the streaming guide documentation
Steps to reproduce:
1) Run spark shell with latest build using:
./spark-shell --jars /home/knoldus/Desktop/CARBONDATA/carbondata/assembly/target/scala-2.11-carbondata-1.4.0-SNAPSHOT-bin-spark2.1.0-hadoop2.7.2.jar
Execute the example:
:paste
// Entering paste mode (ctrl-D to finish)
import java.io.File
import org.apache.spark.sql.\{CarbonEnv, SparkSession}
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery}
import org.apache.carbondata.core.util.path.CarbonStorePath
val warehouse = new File("./warehouse").getCanonicalPath
val metastore = new File("./metastore").getCanonicalPath
val spark = SparkSession
.builder()
.master("local")
.appName("StreamExample")
.config("spark.sql.warehouse.dir", warehouse)
.getOrCreateCarbonSession(warehouse, metastore)
spark.sparkContext.setLogLevel("ERROR")
// drop table if exists previously
spark.sql(s"DROP TABLE IF EXISTS carbon_table")
// Create target carbon table and populate with initial data
spark.sql(
s"""
| CREATE TABLE carbon_table (
| col1 INT,
| col2 STRING
| )
| STORED BY 'carbondata'
| TBLPROPERTIES('streaming'='true')""".stripMargin)
val carbonTable = CarbonEnv.getCarbonTable(Some("default"), "carbon_table")(spark)
val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
// batch load
var qry: StreamingQuery = null
val readSocketDF = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9099)
.load()
// Write data from socket stream to carbondata file
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime("5 seconds"))
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
.option("dbName", "default")
.option("tableName", "carbon_table")
.start()
// start new thread to show data
new Thread() {
override def run(): Unit = {
do {
spark.sql("select * from carbon_table").show(false)
Thread.sleep(10000)
} while (true)
}
}.start()
qry.awaitTermination()
Expected Result: it should be executed successfully.
Actual Result:
// Exiting paste mode, now interpreting.
<console>:27: error: object CarbonStorePath is not a member of package org.apache.carbondata.core.util.path
import org.apache.carbondata.core.util.path.CarbonStorePath
^
<console>:54: error: not found: value CarbonStorePath
val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)