You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by anandnilkal <an...@gmail.com> on 2016/05/11 06:28:57 UTC

not able to write to cassandra table from spark

I am trying to write incoming stream data to database. Following is the
example program, this code creates a thread to listen to incoming stream of
data which is csv data. this data needs to be split with delimiter and the
array of data needs to be pushed to database as separate columns in the
TABLE.

object dbwrite {
  case class record(id: Long, time: java.sql.Timestamp, rx: Int, tx: Int,
total: Int, multi: Double)
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: CustomReceiver <hostname> <port>")
      System.exit(1)
    }

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf()
                        .set(“spark.cassandra.connection.host", "localhost")
                        .setAppName("dbwrite")
                        .set("spark.driver.allowMultipleContexts", "true")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    val sc = ssc.sparkContext

    // Create a input stream with the custom receiver on target ip:port and
count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    val lines = ssc.receiverStream(new CustomReceiver(args(0),
args(1).toInt))
    val splitRdd = lines.map(line => line.split(",") )
    //val wordCounts = splitRdd.map(x => (x, 1)).reduceByKey(_ + _)
    // RDD[Array[String]

    val yourRdd = splitRdd.flatMap(arr => {
      val id = arr(0).toLong
      val rx = arr(2).toInt
      val tx = arr(3).toInt
      val total = arr(4).toInt
      val mul = arr(5).toInt
      val parsedDate = new java.util.Date()
      val timestamp = new java.sql.Timestamp(parsedDate.getTime());
      val reco = records(id, timestamp, rx, tx, total, mul);
      Seq(reco)
    })

    yourRdd.foreachRDD { rdd =>
        for(item <- rdd.collect().toArray)
          print(item)
    }
    val rec = sc.parallelize(Seq(yourRdd))
    rec.saveToCassandra("records", "record", SomeColumns(“id”, "time", "rx",
"tx", "total”, "multi"))

    ssc.start()
    ssc.awaitTermination()
  }
}
but spark does gives following error -
Exception in thread "main" java.lang.IllegalArgumentException: requirement
failed: Columns not found in
org.apache.spark.streaming.dstream.DStream[dbwrite.records]: [mdn, time, rx,
tx, total, multi]
        at scala.Predef$.require(Predef.scala:233)
        at
com.datastax.spark.connector.mapper.DefaultColumnMapper.columnMapForWriting(DefaultColumnMapper.scala:108)
        at
com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon$1.<init>(MappedToGettableDataConverter.scala:29)
        at
com.datastax.spark.connector.writer.MappedToGettableDataConverter$.apply(MappedToGettableDataConverter.scala:20)
        at
com.datastax.spark.connector.writer.DefaultRowWriter.<init>(DefaultRowWriter.scala:17)
        at
com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:31)
        at
com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:29)
        at
com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:272)
        at
com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
        at dbwrite$.main(dbwrite.scala:63)
        at dbwrite.main(dbwrite.scala)
i am using spark-1.6.1 and cassandra 3.5
the TABLE already created on cassandra has same column names. But the column
display in alphabetical order, but all columns are avaialble.
help me with the error.

thanks.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/not-able-to-write-to-cassandra-table-from-spark-tp26923.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org