You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by "Geetika Gupta (JIRA)" <ji...@apache.org> on 2018/04/17 13:15:00 UTC
[jira] [Assigned] (CARBONDATA-2354) Getting Error while executing
Streaming example from the streaming guide documentation
[ https://issues.apache.org/jira/browse/CARBONDATA-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Geetika Gupta reassigned CARBONDATA-2354:
-----------------------------------------
Assignee: Geetika Gupta
> Getting Error while executing Streaming example from the streaming guide documentation
> --------------------------------------------------------------------------------------
>
> Key: CARBONDATA-2354
> URL: https://issues.apache.org/jira/browse/CARBONDATA-2354
> Project: CarbonData
> Issue Type: Bug
> Components: examples
> Affects Versions: 1.4.0
> Environment: spark 2.1, spark 2.2
> Reporter: Vandana Yadav
> Assignee: Geetika Gupta
> Priority: Major
>
> Getting Error while executing Streaming example from the streaming guide documentation
> Steps to reproduce:
> 1) Run spark shell with latest build using:
> ./spark-shell --jars /home/knoldus/Desktop/CARBONDATA/carbondata/assembly/target/scala-2.11-carbondata-1.4.0-SNAPSHOT-bin-spark2.1.0-hadoop2.7.2.jar
> Execute the example:
> :paste
> // Entering paste mode (ctrl-D to finish)
> import java.io.File
> import org.apache.spark.sql.\{CarbonEnv, SparkSession}
> import org.apache.spark.sql.CarbonSession._
> import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery}
> import org.apache.carbondata.core.util.path.CarbonStorePath
>
> val warehouse = new File("./warehouse").getCanonicalPath
> val metastore = new File("./metastore").getCanonicalPath
>
> val spark = SparkSession
> .builder()
> .master("local")
> .appName("StreamExample")
> .config("spark.sql.warehouse.dir", warehouse)
> .getOrCreateCarbonSession(warehouse, metastore)
> spark.sparkContext.setLogLevel("ERROR")
> // drop table if exists previously
> spark.sql(s"DROP TABLE IF EXISTS carbon_table")
> // Create target carbon table and populate with initial data
> spark.sql(
> s"""
> | CREATE TABLE carbon_table (
> | col1 INT,
> | col2 STRING
> | )
> | STORED BY 'carbondata'
> | TBLPROPERTIES('streaming'='true')""".stripMargin)
> val carbonTable = CarbonEnv.getCarbonTable(Some("default"), "carbon_table")(spark)
> val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
>
> // batch load
> var qry: StreamingQuery = null
> val readSocketDF = spark.readStream
> .format("socket")
> .option("host", "localhost")
> .option("port", 9099)
> .load()
> // Write data from socket stream to carbondata file
> qry = readSocketDF.writeStream
> .format("carbondata")
> .trigger(ProcessingTime("5 seconds"))
> .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
> .option("dbName", "default")
> .option("tableName", "carbon_table")
> .start()
> // start new thread to show data
> new Thread() {
> override def run(): Unit = {
> do {
> spark.sql("select * from carbon_table").show(false)
> Thread.sleep(10000)
> } while (true)
> }
> }.start()
> qry.awaitTermination()
>
> Expected Result: it should be executed successfully.
> Actual Result:
> // Exiting paste mode, now interpreting.
> <console>:27: error: object CarbonStorePath is not a member of package org.apache.carbondata.core.util.path
> import org.apache.carbondata.core.util.path.CarbonStorePath
> ^
> <console>:54: error: not found: value CarbonStorePath
> val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)