You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by "Vandana Yadav (JIRA)" <ji...@apache.org> on 2018/04/17 12:48:00 UTC

[jira] [Created] (CARBONDATA-2354) Getting Error while executing Streaming example from the streaming guide documentation

Vandana Yadav created CARBONDATA-2354:
-----------------------------------------

             Summary: Getting Error while executing Streaming example from the streaming guide documentation
                 Key: CARBONDATA-2354
                 URL: https://issues.apache.org/jira/browse/CARBONDATA-2354
             Project: CarbonData
          Issue Type: Bug
          Components: examples
    Affects Versions: 1.4.0
         Environment: spark 2.1, spark 2.2
            Reporter: Vandana Yadav


Getting Error while executing Streaming example from the streaming guide documentation

Steps to reproduce:

1) Run spark shell with latest build using:

./spark-shell --jars /home/knoldus/Desktop/CARBONDATA/carbondata/assembly/target/scala-2.11-carbondata-1.4.0-SNAPSHOT-bin-spark2.1.0-hadoop2.7.2.jar

Execute the example:

:paste
// Entering paste mode (ctrl-D to finish)

import java.io.File
 import org.apache.spark.sql.\{CarbonEnv, SparkSession}
 import org.apache.spark.sql.CarbonSession._
 import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 
 val warehouse = new File("./warehouse").getCanonicalPath
 val metastore = new File("./metastore").getCanonicalPath
 
 val spark = SparkSession
 .builder()
 .master("local")
 .appName("StreamExample")
 .config("spark.sql.warehouse.dir", warehouse)
 .getOrCreateCarbonSession(warehouse, metastore)

spark.sparkContext.setLogLevel("ERROR")

// drop table if exists previously
 spark.sql(s"DROP TABLE IF EXISTS carbon_table")
 // Create target carbon table and populate with initial data
 spark.sql(
 s"""
 | CREATE TABLE carbon_table (
 | col1 INT,
 | col2 STRING
 | )
 | STORED BY 'carbondata'
 | TBLPROPERTIES('streaming'='true')""".stripMargin)

val carbonTable = CarbonEnv.getCarbonTable(Some("default"), "carbon_table")(spark)
 val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
 
 // batch load
 var qry: StreamingQuery = null
 val readSocketDF = spark.readStream
 .format("socket")
 .option("host", "localhost")
 .option("port", 9099)
 .load()

// Write data from socket stream to carbondata file
 qry = readSocketDF.writeStream
 .format("carbondata")
 .trigger(ProcessingTime("5 seconds"))
 .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
 .option("dbName", "default")
 .option("tableName", "carbon_table")
 .start()

// start new thread to show data
 new Thread() {
 override def run(): Unit = {
 do {
 spark.sql("select * from carbon_table").show(false)
 Thread.sleep(10000)
 } while (true)
 }
 }.start()

qry.awaitTermination()

 

Expected Result: it should be executed successfully.

Actual Result:

// Exiting paste mode, now interpreting.

<console>:27: error: object CarbonStorePath is not a member of package org.apache.carbondata.core.util.path
 import org.apache.carbondata.core.util.path.CarbonStorePath
 ^
<console>:54: error: not found: value CarbonStorePath
 val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)