You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2018/09/05 08:11:32 UTC
getting error: value toDF is not a member of Seq[columns]
Hi,
I have spark streaming that send data and I need to put that data into
MongoDB for test purposes. The easiest way is to create a DF from the
individual list of columns as below
I loop over individual rows in RDD and perform the following
case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
PRICE: Float)
for(line <- pricesRDD.collect.toArray)
{
var key = line._2.split(',').view(0).toString
var ticker = line._2.split(',').view(1).toString
var timeissued = line._2.split(',').view(2).toString
var price = line._2.split(',').view(3).toFloat
val priceToString = line._2.split(',').view(3)
if (price > 90.0)
{
println ("price > 90.0, saving to MongoDB collection!")
// Save prices to mongoDB collection
* var df = Seq(columns(key, ticker, timeissued, price)).toDF*
but it fails with message
value toDF is not a member of Seq[columns].
What would be the easiest way of resolving this please?
thanks
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
Re: getting error: value toDF is not a member of Seq[columns]
Posted by Mich Talebzadeh <mi...@gmail.com>.
Ok somehow this worked!
// Save prices to mongoDB collection
val document = sparkContext.parallelize((1 to 1).
map(i =>
Document.parse(s"{key:'$key',ticker:'$ticker',timeissued:'$timeissued',price:$price,CURRENCY:'$CURRENCY',op_type:$op_type,op_time:'$op_time'}")))
//
// Writing document to Mongo collection
//
MongoSpark.save(document, writeConfig)
Note that all non numeric columns are enclosed with '$column'
I just created a dummy map with one single mapping (1 to 1)
These are the results in MongoDB document
{
"_id" : ObjectId("5b915796d3c6071e82fdca2b"),
"key" : "23c39917-08a9-4845-ba74-51997707d374",
"ticker" : "IBM",
"timeissued" : "2018-09-06T17:51:21",
"price" : 207.23,
"CURRENCY" : "GBP",
"op_type" : NumberInt(1),
"op_time" : "1536251798114"
}
{
"_id" : ObjectId("5b915796d3c6071e82fdca2c"),
"key" : "22f353f9-9b28-463c-9f1c-64213ded7cd5",
"ticker" : "TSCO",
"timeissued" : "2018-09-06T17:51:21",
"price" : 179.52,
"CURRENCY" : "GBP",
"op_type" : NumberInt(1),
"op_time" : "1536251798162"
}
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Thu, 6 Sep 2018 at 10:24, Mich Talebzadeh <mi...@gmail.com>
wrote:
> thanks if you define columns class as below
>
>
> scala> case class columns(KEY: String, TICKER: String, TIMEISSUED:
> String, *PRICE: Double)*
> defined class columns
> scala> var df = Seq(columns("key", "ticker", "timeissued", 1.23f)).toDF
> df: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2
> more fields]
> scala> df.printSchema
> root
> |-- KEY: string (nullable = true)
> |-- TICKER: string (nullable = true)
> |-- TIMEISSUED: string (nullable = true)
> |-- PRICE: double (nullable = false)
>
> looks better
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 6 Sep 2018 at 10:10, Jungtaek Lim <ka...@gmail.com> wrote:
>
>> This code works with Spark 2.3.0 via spark-shell.
>>
>> scala> case class columns(KEY: String, TICKER: String, TIMEISSUED:
>> String, PRICE: Float)
>> defined class columns
>>
>> scala> import spark.implicits._
>> import spark.implicits._
>>
>> scala> var df = Seq(columns("key", "ticker", "timeissued", 1.23f)).toDF
>> 18/09/06 18:02:23 WARN ObjectStore: Failed to get database global_temp,
>> returning NoSuchObjectException
>> df: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2
>> more fields]
>>
>> scala> df
>> res0: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2
>> more fields]
>>
>> Maybe need to know about actual type of key, ticker, timeissued, price
>> from your variables.
>>
>> Jungtaek Lim (HeartSaVioR)
>>
>> 2018년 9월 6일 (목) 오후 5:57, Mich Talebzadeh <mi...@gmail.com>님이
>> 작성:
>>
>>> I am trying to understand why spark cannot convert a simple comma
>>> separated columns as DF.
>>>
>>> I did a test
>>>
>>> I took one line of print and stored it as a one liner csv file as below
>>>
>>> var allInOne = key+","+ticker+","+timeissued+","+price
>>> println(allInOne)
>>>
>>> cat crap.csv
>>> 6e84b11d-cb03-44c0-aab6-37e06e06c996,MRW,2018-09-06T09:35:53,275.45
>>>
>>> Then after storing it in HDFS, I read that file as below
>>>
>>> import org.apache.spark.sql.functions._
>>> val location="hdfs://rhes75:9000/tmp/crap.csv"
>>> val df1 = spark.read.option("header", false).csv(location)
>>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>>> PRICE: Double)
>>> val df2 = df1.map(p => columns(p(0).toString,p(1).toString,
>>> p(2).toString,p(3).toString.toDouble))
>>> df2.printSchema
>>>
>>> This is the result I get
>>>
>>> df1: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 2
>>> more fields]
>>> defined class columns
>>> df2: org.apache.spark.sql.Dataset[columns] = [KEY: string, TICKER:
>>> string ... 2 more fields]
>>> root
>>> |-- KEY: string (nullable = true)
>>> |-- TICKER: string (nullable = true)
>>> |-- TIMEISSUED: string (nullable = true)
>>> |-- PRICE: double (nullable = false)
>>>
>>> So in my case the only difference is that that comma separated line is
>>> stored in a String as opposed to csv.
>>>
>>> How can I achieve this simple transformation?
>>>
>>> Thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 6 Sep 2018 at 03:38, Manu Zhang <ow...@gmail.com> wrote:
>>>
>>>> Have you tried adding Encoder for columns as suggested by Jungtaek Lim
>>>> ?
>>>>
>>>> On Thu, Sep 6, 2018 at 6:24 AM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>> I can rebuild the comma separated list as follows:
>>>>>
>>>>>
>>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED:
>>>>> String, PRICE: Float)
>>>>> val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
>>>>> import sqlContext.implicits._
>>>>>
>>>>>
>>>>> for(line <- pricesRDD.collect.toArray)
>>>>> {
>>>>> var key = line._2.split(',').view(0).toString
>>>>> var ticker = line._2.split(',').view(1).toString
>>>>> var timeissued = line._2.split(',').view(2).toString
>>>>> var price = line._2.split(',').view(3).toFloat
>>>>> var allInOne = key+","+ticker+","+timeissued+","+price
>>>>> println(allInOne)
>>>>>
>>>>> and the print shows the columns separated by ","
>>>>>
>>>>>
>>>>> 34e07d9f-829a-446a-93ab-8b93aa8eda41,SAP,2018-09-05T23:22:34,56.89
>>>>>
>>>>> So I just need to convert that line of rowinto a DataFrame
>>>>>
>>>>> I try this conversion to DF to write to MongoDB document with MongoSpark.save(df,
>>>>> writeConfig)
>>>>>
>>>>> var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued,
>>>>> price))).toDF
>>>>>
>>>>> [error]
>>>>> /data6/hduser/scala/md_streaming_mongoDB/src/main/scala/myPackage/md_streaming_mongoDB.scala:235:
>>>>> value toDF is not a member of org.apache.spark.rdd.RDD[columns]
>>>>> [error] var df = sparkContext.parallelize(Seq(columns(key,
>>>>> ticker, timeissued, price))).toDF
>>>>> [
>>>>>
>>>>>
>>>>> frustrating!
>>>>>
>>>>> has anyone come across this?
>>>>>
>>>>> thanks
>>>>>
>>>>> On Wed, 5 Sep 2018 at 13:30, Mich Talebzadeh <
>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>
>>>>>> yep already tried it and it did not work.
>>>>>>
>>>>>> thanks
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <de...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Try this:
>>>>>>>
>>>>>>> *import **spark*.implicits._
>>>>>>>
>>>>>>> df.toDF()
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <
>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>
>>>>>>>> With the following
>>>>>>>>
>>>>>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>>>>>>>> PRICE: Float)
>>>>>>>>
>>>>>>>> var key = line._2.split(',').view(0).toString
>>>>>>>> var ticker = line._2.split(',').view(1).toString
>>>>>>>> var timeissued = line._2.split(',').view(2).toString
>>>>>>>> var price = line._2.split(',').view(3).toFloat
>>>>>>>>
>>>>>>>> var df = Seq(columns(key, ticker, timeissued, price))
>>>>>>>> println(df)
>>>>>>>>
>>>>>>>> I get
>>>>>>>>
>>>>>>>>
>>>>>>>> List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))
>>>>>>>>
>>>>>>>> So just need to convert that list to DF
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <
>>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> The spark is version 2.3.0
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <ka...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> You may also find below link useful (though it looks far old),
>>>>>>>>>> since case class is the thing which Encoder is available, so there may be
>>>>>>>>>> another reason which prevent implicit conversion.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache/m-p/29994/highlight/true#M973
>>>>>>>>>>
>>>>>>>>>> And which Spark version do you use?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>>>>>>>
>>>>>>>>>>> Sorry I guess I pasted another method. the code is...
>>>>>>>>>>>
>>>>>>>>>>> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
>>>>>>>>>>> DatasetHolder(_sqlContext.createDataset(s))
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>>>>>>>>
>>>>>>>>>>>> I guess you need to have encoder for the type of result for
>>>>>>>>>>>> columns().
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
>>>>>>>>>>>>
>>>>>>>>>>>> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
>>>>>>>>>>>> DatasetHolder(_sqlContext.createDataset(rdd))
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> You can see lots of Encoder implementations in the scala code.
>>>>>>>>>>>> If your type doesn't match anything it may not work and you need to provide
>>>>>>>>>>>> custom Encoder.
>>>>>>>>>>>>
>>>>>>>>>>>> -Jungtaek Lim (HeartSaVioR)
>>>>>>>>>>>>
>>>>>>>>>>>> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <
>>>>>>>>>>>> mich.talebzadeh@gmail.com>님이 작성:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>
>>>>>>>>>>>>> I already do that as below
>>>>>>>>>>>>>
>>>>>>>>>>>>> val sqlContext= new
>>>>>>>>>>>>> org.apache.spark.sql.SQLContext(sparkContext)
>>>>>>>>>>>>> import sqlContext.implicits._
>>>>>>>>>>>>>
>>>>>>>>>>>>> but still getting the error!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>>>>> responsibility for any loss, damage or destruction of data or any other
>>>>>>>>>>>>> property which may arise from relying on this email's technical content is
>>>>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <ka...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> You may need to import implicits from your spark session like
>>>>>>>>>>>>>> below:
>>>>>>>>>>>>>> (Below code is borrowed from
>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/sql-programming-guide.html
>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> import org.apache.spark.sql.SparkSession
>>>>>>>>>>>>>> val spark = SparkSession
>>>>>>>>>>>>>> .builder()
>>>>>>>>>>>>>> .appName("Spark SQL basic example")
>>>>>>>>>>>>>> .config("spark.some.config.option", "some-value")
>>>>>>>>>>>>>> .getOrCreate()
>>>>>>>>>>>>>> // For implicit conversions like converting RDDs to DataFramesimport spark.implicits._
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <
>>>>>>>>>>>>>> mich.talebzadeh@gmail.com>님이 작성:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have spark streaming that send data and I need to put that
>>>>>>>>>>>>>>> data into MongoDB for test purposes. The easiest way is to create a DF from
>>>>>>>>>>>>>>> the individual list of columns as below
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I loop over individual rows in RDD and perform the following
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> case class columns(KEY: String, TICKER: String,
>>>>>>>>>>>>>>> TIMEISSUED: String, PRICE: Float)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> for(line <- pricesRDD.collect.toArray)
>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>> var key = line._2.split(',').view(0).toString
>>>>>>>>>>>>>>> var ticker = line._2.split(',').view(1).toString
>>>>>>>>>>>>>>> var timeissued =
>>>>>>>>>>>>>>> line._2.split(',').view(2).toString
>>>>>>>>>>>>>>> var price = line._2.split(',').view(3).toFloat
>>>>>>>>>>>>>>> val priceToString = line._2.split(',').view(3)
>>>>>>>>>>>>>>> if (price > 90.0)
>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>> println ("price > 90.0, saving to MongoDB
>>>>>>>>>>>>>>> collection!")
>>>>>>>>>>>>>>> // Save prices to mongoDB collection
>>>>>>>>>>>>>>> * var df = Seq(columns(key, ticker, timeissued,
>>>>>>>>>>>>>>> price)).toDF*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> but it fails with message
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> value toDF is not a member of Seq[columns].
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What would be the easiest way of resolving this please?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> thanks
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>>>>>>> responsibility for any loss, damage or destruction of data or any other
>>>>>>>>>>>>>>> property which may arise from relying on this email's technical content is
>>>>>>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Thanks
>>>>>>> Deepak
>>>>>>> www.bigdatabig.com
>>>>>>> www.keosha.net
>>>>>>>
>>>>>>
Re: getting error: value toDF is not a member of Seq[columns]
Posted by Mich Talebzadeh <mi...@gmail.com>.
thanks if you define columns class as below
scala> case class columns(KEY: String, TICKER: String, TIMEISSUED:
String, *PRICE:
Double)*
defined class columns
scala> var df = Seq(columns("key", "ticker", "timeissued", 1.23f)).toDF
df: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2
more fields]
scala> df.printSchema
root
|-- KEY: string (nullable = true)
|-- TICKER: string (nullable = true)
|-- TIMEISSUED: string (nullable = true)
|-- PRICE: double (nullable = false)
looks better
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Thu, 6 Sep 2018 at 10:10, Jungtaek Lim <ka...@gmail.com> wrote:
> This code works with Spark 2.3.0 via spark-shell.
>
> scala> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
> PRICE: Float)
> defined class columns
>
> scala> import spark.implicits._
> import spark.implicits._
>
> scala> var df = Seq(columns("key", "ticker", "timeissued", 1.23f)).toDF
> 18/09/06 18:02:23 WARN ObjectStore: Failed to get database global_temp,
> returning NoSuchObjectException
> df: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2
> more fields]
>
> scala> df
> res0: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2
> more fields]
>
> Maybe need to know about actual type of key, ticker, timeissued, price
> from your variables.
>
> Jungtaek Lim (HeartSaVioR)
>
> 2018년 9월 6일 (목) 오후 5:57, Mich Talebzadeh <mi...@gmail.com>님이 작성:
>
>> I am trying to understand why spark cannot convert a simple comma
>> separated columns as DF.
>>
>> I did a test
>>
>> I took one line of print and stored it as a one liner csv file as below
>>
>> var allInOne = key+","+ticker+","+timeissued+","+price
>> println(allInOne)
>>
>> cat crap.csv
>> 6e84b11d-cb03-44c0-aab6-37e06e06c996,MRW,2018-09-06T09:35:53,275.45
>>
>> Then after storing it in HDFS, I read that file as below
>>
>> import org.apache.spark.sql.functions._
>> val location="hdfs://rhes75:9000/tmp/crap.csv"
>> val df1 = spark.read.option("header", false).csv(location)
>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>> PRICE: Double)
>> val df2 = df1.map(p => columns(p(0).toString,p(1).toString,
>> p(2).toString,p(3).toString.toDouble))
>> df2.printSchema
>>
>> This is the result I get
>>
>> df1: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 2
>> more fields]
>> defined class columns
>> df2: org.apache.spark.sql.Dataset[columns] = [KEY: string, TICKER: string
>> ... 2 more fields]
>> root
>> |-- KEY: string (nullable = true)
>> |-- TICKER: string (nullable = true)
>> |-- TIMEISSUED: string (nullable = true)
>> |-- PRICE: double (nullable = false)
>>
>> So in my case the only difference is that that comma separated line is
>> stored in a String as opposed to csv.
>>
>> How can I achieve this simple transformation?
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 6 Sep 2018 at 03:38, Manu Zhang <ow...@gmail.com> wrote:
>>
>>> Have you tried adding Encoder for columns as suggested by Jungtaek Lim ?
>>>
>>> On Thu, Sep 6, 2018 at 6:24 AM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>> I can rebuild the comma separated list as follows:
>>>>
>>>>
>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>>>> PRICE: Float)
>>>> val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
>>>> import sqlContext.implicits._
>>>>
>>>>
>>>> for(line <- pricesRDD.collect.toArray)
>>>> {
>>>> var key = line._2.split(',').view(0).toString
>>>> var ticker = line._2.split(',').view(1).toString
>>>> var timeissued = line._2.split(',').view(2).toString
>>>> var price = line._2.split(',').view(3).toFloat
>>>> var allInOne = key+","+ticker+","+timeissued+","+price
>>>> println(allInOne)
>>>>
>>>> and the print shows the columns separated by ","
>>>>
>>>>
>>>> 34e07d9f-829a-446a-93ab-8b93aa8eda41,SAP,2018-09-05T23:22:34,56.89
>>>>
>>>> So I just need to convert that line of rowinto a DataFrame
>>>>
>>>> I try this conversion to DF to write to MongoDB document with MongoSpark.save(df,
>>>> writeConfig)
>>>>
>>>> var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued,
>>>> price))).toDF
>>>>
>>>> [error]
>>>> /data6/hduser/scala/md_streaming_mongoDB/src/main/scala/myPackage/md_streaming_mongoDB.scala:235:
>>>> value toDF is not a member of org.apache.spark.rdd.RDD[columns]
>>>> [error] var df = sparkContext.parallelize(Seq(columns(key,
>>>> ticker, timeissued, price))).toDF
>>>> [
>>>>
>>>>
>>>> frustrating!
>>>>
>>>> has anyone come across this?
>>>>
>>>> thanks
>>>>
>>>> On Wed, 5 Sep 2018 at 13:30, Mich Talebzadeh <mi...@gmail.com>
>>>> wrote:
>>>>
>>>>> yep already tried it and it did not work.
>>>>>
>>>>> thanks
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <de...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Try this:
>>>>>>
>>>>>> *import **spark*.implicits._
>>>>>>
>>>>>> df.toDF()
>>>>>>
>>>>>>
>>>>>> On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <
>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>
>>>>>>> With the following
>>>>>>>
>>>>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>>>>>>> PRICE: Float)
>>>>>>>
>>>>>>> var key = line._2.split(',').view(0).toString
>>>>>>> var ticker = line._2.split(',').view(1).toString
>>>>>>> var timeissued = line._2.split(',').view(2).toString
>>>>>>> var price = line._2.split(',').view(3).toFloat
>>>>>>>
>>>>>>> var df = Seq(columns(key, ticker, timeissued, price))
>>>>>>> println(df)
>>>>>>>
>>>>>>> I get
>>>>>>>
>>>>>>>
>>>>>>> List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))
>>>>>>>
>>>>>>> So just need to convert that list to DF
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <
>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> The spark is version 2.3.0
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <ka...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> You may also find below link useful (though it looks far old),
>>>>>>>>> since case class is the thing which Encoder is available, so there may be
>>>>>>>>> another reason which prevent implicit conversion.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache/m-p/29994/highlight/true#M973
>>>>>>>>>
>>>>>>>>> And which Spark version do you use?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>>>>>>
>>>>>>>>>> Sorry I guess I pasted another method. the code is...
>>>>>>>>>>
>>>>>>>>>> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
>>>>>>>>>> DatasetHolder(_sqlContext.createDataset(s))
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>>>>>>>
>>>>>>>>>>> I guess you need to have encoder for the type of result for
>>>>>>>>>>> columns().
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
>>>>>>>>>>>
>>>>>>>>>>> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
>>>>>>>>>>> DatasetHolder(_sqlContext.createDataset(rdd))
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> You can see lots of Encoder implementations in the scala code.
>>>>>>>>>>> If your type doesn't match anything it may not work and you need to provide
>>>>>>>>>>> custom Encoder.
>>>>>>>>>>>
>>>>>>>>>>> -Jungtaek Lim (HeartSaVioR)
>>>>>>>>>>>
>>>>>>>>>>> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <
>>>>>>>>>>> mich.talebzadeh@gmail.com>님이 작성:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>> I already do that as below
>>>>>>>>>>>>
>>>>>>>>>>>> val sqlContext= new
>>>>>>>>>>>> org.apache.spark.sql.SQLContext(sparkContext)
>>>>>>>>>>>> import sqlContext.implicits._
>>>>>>>>>>>>
>>>>>>>>>>>> but still getting the error!
>>>>>>>>>>>>
>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>>>> responsibility for any loss, damage or destruction of data or any other
>>>>>>>>>>>> property which may arise from relying on this email's technical content is
>>>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <ka...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> You may need to import implicits from your spark session like
>>>>>>>>>>>>> below:
>>>>>>>>>>>>> (Below code is borrowed from
>>>>>>>>>>>>> https://spark.apache.org/docs/latest/sql-programming-guide.html
>>>>>>>>>>>>> )
>>>>>>>>>>>>>
>>>>>>>>>>>>> import org.apache.spark.sql.SparkSession
>>>>>>>>>>>>> val spark = SparkSession
>>>>>>>>>>>>> .builder()
>>>>>>>>>>>>> .appName("Spark SQL basic example")
>>>>>>>>>>>>> .config("spark.some.config.option", "some-value")
>>>>>>>>>>>>> .getOrCreate()
>>>>>>>>>>>>> // For implicit conversions like converting RDDs to DataFramesimport spark.implicits._
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <
>>>>>>>>>>>>> mich.talebzadeh@gmail.com>님이 작성:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have spark streaming that send data and I need to put that
>>>>>>>>>>>>>> data into MongoDB for test purposes. The easiest way is to create a DF from
>>>>>>>>>>>>>> the individual list of columns as below
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I loop over individual rows in RDD and perform the following
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> case class columns(KEY: String, TICKER: String,
>>>>>>>>>>>>>> TIMEISSUED: String, PRICE: Float)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> for(line <- pricesRDD.collect.toArray)
>>>>>>>>>>>>>> {
>>>>>>>>>>>>>> var key = line._2.split(',').view(0).toString
>>>>>>>>>>>>>> var ticker = line._2.split(',').view(1).toString
>>>>>>>>>>>>>> var timeissued =
>>>>>>>>>>>>>> line._2.split(',').view(2).toString
>>>>>>>>>>>>>> var price = line._2.split(',').view(3).toFloat
>>>>>>>>>>>>>> val priceToString = line._2.split(',').view(3)
>>>>>>>>>>>>>> if (price > 90.0)
>>>>>>>>>>>>>> {
>>>>>>>>>>>>>> println ("price > 90.0, saving to MongoDB
>>>>>>>>>>>>>> collection!")
>>>>>>>>>>>>>> // Save prices to mongoDB collection
>>>>>>>>>>>>>> * var df = Seq(columns(key, ticker, timeissued,
>>>>>>>>>>>>>> price)).toDF*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> but it fails with message
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> value toDF is not a member of Seq[columns].
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What would be the easiest way of resolving this please?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> thanks
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>>>>>> responsibility for any loss, damage or destruction of data or any other
>>>>>>>>>>>>>> property which may arise from relying on this email's technical content is
>>>>>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks
>>>>>> Deepak
>>>>>> www.bigdatabig.com
>>>>>> www.keosha.net
>>>>>>
>>>>>
Re: getting error: value toDF is not a member of Seq[columns]
Posted by Jungtaek Lim <ka...@gmail.com>.
This code works with Spark 2.3.0 via spark-shell.
scala> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
PRICE: Float)
defined class columns
scala> import spark.implicits._
import spark.implicits._
scala> var df = Seq(columns("key", "ticker", "timeissued", 1.23f)).toDF
18/09/06 18:02:23 WARN ObjectStore: Failed to get database global_temp,
returning NoSuchObjectException
df: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2
more fields]
scala> df
res0: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2
more fields]
Maybe need to know about actual type of key, ticker, timeissued, price from
your variables.
Jungtaek Lim (HeartSaVioR)
2018년 9월 6일 (목) 오후 5:57, Mich Talebzadeh <mi...@gmail.com>님이 작성:
> I am trying to understand why spark cannot convert a simple comma
> separated columns as DF.
>
> I did a test
>
> I took one line of print and stored it as a one liner csv file as below
>
> var allInOne = key+","+ticker+","+timeissued+","+price
> println(allInOne)
>
> cat crap.csv
> 6e84b11d-cb03-44c0-aab6-37e06e06c996,MRW,2018-09-06T09:35:53,275.45
>
> Then after storing it in HDFS, I read that file as below
>
> import org.apache.spark.sql.functions._
> val location="hdfs://rhes75:9000/tmp/crap.csv"
> val df1 = spark.read.option("header", false).csv(location)
> case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE:
> Double)
> val df2 = df1.map(p => columns(p(0).toString,p(1).toString,
> p(2).toString,p(3).toString.toDouble))
> df2.printSchema
>
> This is the result I get
>
> df1: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 2 more
> fields]
> defined class columns
> df2: org.apache.spark.sql.Dataset[columns] = [KEY: string, TICKER: string
> ... 2 more fields]
> root
> |-- KEY: string (nullable = true)
> |-- TICKER: string (nullable = true)
> |-- TIMEISSUED: string (nullable = true)
> |-- PRICE: double (nullable = false)
>
> So in my case the only difference is that that comma separated line is
> stored in a String as opposed to csv.
>
> How can I achieve this simple transformation?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 6 Sep 2018 at 03:38, Manu Zhang <ow...@gmail.com> wrote:
>
>> Have you tried adding Encoder for columns as suggested by Jungtaek Lim ?
>>
>> On Thu, Sep 6, 2018 at 6:24 AM Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> I can rebuild the comma separated list as follows:
>>>
>>>
>>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>>> PRICE: Float)
>>> val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
>>> import sqlContext.implicits._
>>>
>>>
>>> for(line <- pricesRDD.collect.toArray)
>>> {
>>> var key = line._2.split(',').view(0).toString
>>> var ticker = line._2.split(',').view(1).toString
>>> var timeissued = line._2.split(',').view(2).toString
>>> var price = line._2.split(',').view(3).toFloat
>>> var allInOne = key+","+ticker+","+timeissued+","+price
>>> println(allInOne)
>>>
>>> and the print shows the columns separated by ","
>>>
>>>
>>> 34e07d9f-829a-446a-93ab-8b93aa8eda41,SAP,2018-09-05T23:22:34,56.89
>>>
>>> So I just need to convert that line of rowinto a DataFrame
>>>
>>> I try this conversion to DF to write to MongoDB document with MongoSpark.save(df,
>>> writeConfig)
>>>
>>> var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued,
>>> price))).toDF
>>>
>>> [error]
>>> /data6/hduser/scala/md_streaming_mongoDB/src/main/scala/myPackage/md_streaming_mongoDB.scala:235:
>>> value toDF is not a member of org.apache.spark.rdd.RDD[columns]
>>> [error] var df = sparkContext.parallelize(Seq(columns(key,
>>> ticker, timeissued, price))).toDF
>>> [
>>>
>>>
>>> frustrating!
>>>
>>> has anyone come across this?
>>>
>>> thanks
>>>
>>> On Wed, 5 Sep 2018 at 13:30, Mich Talebzadeh <mi...@gmail.com>
>>> wrote:
>>>
>>>> yep already tried it and it did not work.
>>>>
>>>> thanks
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <de...@gmail.com>
>>>> wrote:
>>>>
>>>>> Try this:
>>>>>
>>>>> *import **spark*.implicits._
>>>>>
>>>>> df.toDF()
>>>>>
>>>>>
>>>>> On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <
>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>
>>>>>> With the following
>>>>>>
>>>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>>>>>> PRICE: Float)
>>>>>>
>>>>>> var key = line._2.split(',').view(0).toString
>>>>>> var ticker = line._2.split(',').view(1).toString
>>>>>> var timeissued = line._2.split(',').view(2).toString
>>>>>> var price = line._2.split(',').view(3).toFloat
>>>>>>
>>>>>> var df = Seq(columns(key, ticker, timeissued, price))
>>>>>> println(df)
>>>>>>
>>>>>> I get
>>>>>>
>>>>>>
>>>>>> List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))
>>>>>>
>>>>>> So just need to convert that list to DF
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <
>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> The spark is version 2.3.0
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <ka...@gmail.com> wrote:
>>>>>>>
>>>>>>>> You may also find below link useful (though it looks far old),
>>>>>>>> since case class is the thing which Encoder is available, so there may be
>>>>>>>> another reason which prevent implicit conversion.
>>>>>>>>
>>>>>>>>
>>>>>>>> https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache/m-p/29994/highlight/true#M973
>>>>>>>>
>>>>>>>> And which Spark version do you use?
>>>>>>>>
>>>>>>>>
>>>>>>>> 2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>>>>>
>>>>>>>>> Sorry I guess I pasted another method. the code is...
>>>>>>>>>
>>>>>>>>> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
>>>>>>>>> DatasetHolder(_sqlContext.createDataset(s))
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>>>>>>
>>>>>>>>>> I guess you need to have encoder for the type of result for
>>>>>>>>>> columns().
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
>>>>>>>>>>
>>>>>>>>>> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
>>>>>>>>>> DatasetHolder(_sqlContext.createDataset(rdd))
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> You can see lots of Encoder implementations in the scala code. If
>>>>>>>>>> your type doesn't match anything it may not work and you need to provide
>>>>>>>>>> custom Encoder.
>>>>>>>>>>
>>>>>>>>>> -Jungtaek Lim (HeartSaVioR)
>>>>>>>>>>
>>>>>>>>>> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <
>>>>>>>>>> mich.talebzadeh@gmail.com>님이 작성:
>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>> I already do that as below
>>>>>>>>>>>
>>>>>>>>>>> val sqlContext= new
>>>>>>>>>>> org.apache.spark.sql.SQLContext(sparkContext)
>>>>>>>>>>> import sqlContext.implicits._
>>>>>>>>>>>
>>>>>>>>>>> but still getting the error!
>>>>>>>>>>>
>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>>> responsibility for any loss, damage or destruction of data or any other
>>>>>>>>>>> property which may arise from relying on this email's technical content is
>>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <ka...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> You may need to import implicits from your spark session like
>>>>>>>>>>>> below:
>>>>>>>>>>>> (Below code is borrowed from
>>>>>>>>>>>> https://spark.apache.org/docs/latest/sql-programming-guide.html
>>>>>>>>>>>> )
>>>>>>>>>>>>
>>>>>>>>>>>> import org.apache.spark.sql.SparkSession
>>>>>>>>>>>> val spark = SparkSession
>>>>>>>>>>>> .builder()
>>>>>>>>>>>> .appName("Spark SQL basic example")
>>>>>>>>>>>> .config("spark.some.config.option", "some-value")
>>>>>>>>>>>> .getOrCreate()
>>>>>>>>>>>> // For implicit conversions like converting RDDs to DataFramesimport spark.implicits._
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <
>>>>>>>>>>>> mich.talebzadeh@gmail.com>님이 작성:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have spark streaming that send data and I need to put that
>>>>>>>>>>>>> data into MongoDB for test purposes. The easiest way is to create a DF from
>>>>>>>>>>>>> the individual list of columns as below
>>>>>>>>>>>>>
>>>>>>>>>>>>> I loop over individual rows in RDD and perform the following
>>>>>>>>>>>>>
>>>>>>>>>>>>> case class columns(KEY: String, TICKER: String,
>>>>>>>>>>>>> TIMEISSUED: String, PRICE: Float)
>>>>>>>>>>>>>
>>>>>>>>>>>>> for(line <- pricesRDD.collect.toArray)
>>>>>>>>>>>>> {
>>>>>>>>>>>>> var key = line._2.split(',').view(0).toString
>>>>>>>>>>>>> var ticker = line._2.split(',').view(1).toString
>>>>>>>>>>>>> var timeissued = line._2.split(',').view(2).toString
>>>>>>>>>>>>> var price = line._2.split(',').view(3).toFloat
>>>>>>>>>>>>> val priceToString = line._2.split(',').view(3)
>>>>>>>>>>>>> if (price > 90.0)
>>>>>>>>>>>>> {
>>>>>>>>>>>>> println ("price > 90.0, saving to MongoDB
>>>>>>>>>>>>> collection!")
>>>>>>>>>>>>> // Save prices to mongoDB collection
>>>>>>>>>>>>> * var df = Seq(columns(key, ticker, timeissued,
>>>>>>>>>>>>> price)).toDF*
>>>>>>>>>>>>>
>>>>>>>>>>>>> but it fails with message
>>>>>>>>>>>>>
>>>>>>>>>>>>> value toDF is not a member of Seq[columns].
>>>>>>>>>>>>>
>>>>>>>>>>>>> What would be the easiest way of resolving this please?
>>>>>>>>>>>>>
>>>>>>>>>>>>> thanks
>>>>>>>>>>>>>
>>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>>>>> responsibility for any loss, damage or destruction of data or any other
>>>>>>>>>>>>> property which may arise from relying on this email's technical content is
>>>>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>
>>>>> --
>>>>> Thanks
>>>>> Deepak
>>>>> www.bigdatabig.com
>>>>> www.keosha.net
>>>>>
>>>>
Re: getting error: value toDF is not a member of Seq[columns]
Posted by Mich Talebzadeh <mi...@gmail.com>.
I am trying to understand why spark cannot convert a simple comma separated
columns as DF.
I did a test
I took one line of print and stored it as a one liner csv file as below
var allInOne = key+","+ticker+","+timeissued+","+price
println(allInOne)
cat crap.csv
6e84b11d-cb03-44c0-aab6-37e06e06c996,MRW,2018-09-06T09:35:53,275.45
Then after storing it in HDFS, I read that file as below
import org.apache.spark.sql.functions._
val location="hdfs://rhes75:9000/tmp/crap.csv"
val df1 = spark.read.option("header", false).csv(location)
case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE:
Double)
val df2 = df1.map(p => columns(p(0).toString,p(1).toString,
p(2).toString,p(3).toString.toDouble))
df2.printSchema
This is the result I get
df1: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 2 more
fields]
defined class columns
df2: org.apache.spark.sql.Dataset[columns] = [KEY: string, TICKER: string
... 2 more fields]
root
|-- KEY: string (nullable = true)
|-- TICKER: string (nullable = true)
|-- TIMEISSUED: string (nullable = true)
|-- PRICE: double (nullable = false)
So in my case the only difference is that that comma separated line is
stored in a String as opposed to csv.
How can I achieve this simple transformation?
Thanks
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Thu, 6 Sep 2018 at 03:38, Manu Zhang <ow...@gmail.com> wrote:
> Have you tried adding Encoder for columns as suggested by Jungtaek Lim ?
>
> On Thu, Sep 6, 2018 at 6:24 AM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> I can rebuild the comma separated list as follows:
>>
>>
>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>> PRICE: Float)
>> val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
>> import sqlContext.implicits._
>>
>>
>> for(line <- pricesRDD.collect.toArray)
>> {
>> var key = line._2.split(',').view(0).toString
>> var ticker = line._2.split(',').view(1).toString
>> var timeissued = line._2.split(',').view(2).toString
>> var price = line._2.split(',').view(3).toFloat
>> var allInOne = key+","+ticker+","+timeissued+","+price
>> println(allInOne)
>>
>> and the print shows the columns separated by ","
>>
>>
>> 34e07d9f-829a-446a-93ab-8b93aa8eda41,SAP,2018-09-05T23:22:34,56.89
>>
>> So I just need to convert that line of rowinto a DataFrame
>>
>> I try this conversion to DF to write to MongoDB document with MongoSpark.save(df,
>> writeConfig)
>>
>> var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued,
>> price))).toDF
>>
>> [error]
>> /data6/hduser/scala/md_streaming_mongoDB/src/main/scala/myPackage/md_streaming_mongoDB.scala:235:
>> value toDF is not a member of org.apache.spark.rdd.RDD[columns]
>> [error] var df = sparkContext.parallelize(Seq(columns(key,
>> ticker, timeissued, price))).toDF
>> [
>>
>>
>> frustrating!
>>
>> has anyone come across this?
>>
>> thanks
>>
>> On Wed, 5 Sep 2018 at 13:30, Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> yep already tried it and it did not work.
>>>
>>> thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <de...@gmail.com>
>>> wrote:
>>>
>>>> Try this:
>>>>
>>>> *import **spark*.implicits._
>>>>
>>>> df.toDF()
>>>>
>>>>
>>>> On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> With the following
>>>>>
>>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>>>>> PRICE: Float)
>>>>>
>>>>> var key = line._2.split(',').view(0).toString
>>>>> var ticker = line._2.split(',').view(1).toString
>>>>> var timeissued = line._2.split(',').view(2).toString
>>>>> var price = line._2.split(',').view(3).toFloat
>>>>>
>>>>> var df = Seq(columns(key, ticker, timeissued, price))
>>>>> println(df)
>>>>>
>>>>> I get
>>>>>
>>>>>
>>>>> List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))
>>>>>
>>>>> So just need to convert that list to DF
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <
>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> The spark is version 2.3.0
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <ka...@gmail.com> wrote:
>>>>>>
>>>>>>> You may also find below link useful (though it looks far old), since
>>>>>>> case class is the thing which Encoder is available, so there may be another
>>>>>>> reason which prevent implicit conversion.
>>>>>>>
>>>>>>>
>>>>>>> https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache/m-p/29994/highlight/true#M973
>>>>>>>
>>>>>>> And which Spark version do you use?
>>>>>>>
>>>>>>>
>>>>>>> 2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>>>>
>>>>>>>> Sorry I guess I pasted another method. the code is...
>>>>>>>>
>>>>>>>> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
>>>>>>>> DatasetHolder(_sqlContext.createDataset(s))
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>>>>>
>>>>>>>>> I guess you need to have encoder for the type of result for
>>>>>>>>> columns().
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
>>>>>>>>>
>>>>>>>>> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
>>>>>>>>> DatasetHolder(_sqlContext.createDataset(rdd))
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> You can see lots of Encoder implementations in the scala code. If
>>>>>>>>> your type doesn't match anything it may not work and you need to provide
>>>>>>>>> custom Encoder.
>>>>>>>>>
>>>>>>>>> -Jungtaek Lim (HeartSaVioR)
>>>>>>>>>
>>>>>>>>> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <
>>>>>>>>> mich.talebzadeh@gmail.com>님이 작성:
>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>> I already do that as below
>>>>>>>>>>
>>>>>>>>>> val sqlContext= new
>>>>>>>>>> org.apache.spark.sql.SQLContext(sparkContext)
>>>>>>>>>> import sqlContext.implicits._
>>>>>>>>>>
>>>>>>>>>> but still getting the error!
>>>>>>>>>>
>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>> responsibility for any loss, damage or destruction of data or any other
>>>>>>>>>> property which may arise from relying on this email's technical content is
>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <ka...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> You may need to import implicits from your spark session like
>>>>>>>>>>> below:
>>>>>>>>>>> (Below code is borrowed from
>>>>>>>>>>> https://spark.apache.org/docs/latest/sql-programming-guide.html)
>>>>>>>>>>>
>>>>>>>>>>> import org.apache.spark.sql.SparkSession
>>>>>>>>>>> val spark = SparkSession
>>>>>>>>>>> .builder()
>>>>>>>>>>> .appName("Spark SQL basic example")
>>>>>>>>>>> .config("spark.some.config.option", "some-value")
>>>>>>>>>>> .getOrCreate()
>>>>>>>>>>> // For implicit conversions like converting RDDs to DataFramesimport spark.implicits._
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <
>>>>>>>>>>> mich.talebzadeh@gmail.com>님이 작성:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I have spark streaming that send data and I need to put that
>>>>>>>>>>>> data into MongoDB for test purposes. The easiest way is to create a DF from
>>>>>>>>>>>> the individual list of columns as below
>>>>>>>>>>>>
>>>>>>>>>>>> I loop over individual rows in RDD and perform the following
>>>>>>>>>>>>
>>>>>>>>>>>> case class columns(KEY: String, TICKER: String,
>>>>>>>>>>>> TIMEISSUED: String, PRICE: Float)
>>>>>>>>>>>>
>>>>>>>>>>>> for(line <- pricesRDD.collect.toArray)
>>>>>>>>>>>> {
>>>>>>>>>>>> var key = line._2.split(',').view(0).toString
>>>>>>>>>>>> var ticker = line._2.split(',').view(1).toString
>>>>>>>>>>>> var timeissued = line._2.split(',').view(2).toString
>>>>>>>>>>>> var price = line._2.split(',').view(3).toFloat
>>>>>>>>>>>> val priceToString = line._2.split(',').view(3)
>>>>>>>>>>>> if (price > 90.0)
>>>>>>>>>>>> {
>>>>>>>>>>>> println ("price > 90.0, saving to MongoDB
>>>>>>>>>>>> collection!")
>>>>>>>>>>>> // Save prices to mongoDB collection
>>>>>>>>>>>> * var df = Seq(columns(key, ticker, timeissued,
>>>>>>>>>>>> price)).toDF*
>>>>>>>>>>>>
>>>>>>>>>>>> but it fails with message
>>>>>>>>>>>>
>>>>>>>>>>>> value toDF is not a member of Seq[columns].
>>>>>>>>>>>>
>>>>>>>>>>>> What would be the easiest way of resolving this please?
>>>>>>>>>>>>
>>>>>>>>>>>> thanks
>>>>>>>>>>>>
>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>>>> responsibility for any loss, damage or destruction of data or any other
>>>>>>>>>>>> property which may arise from relying on this email's technical content is
>>>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>
>>>> --
>>>> Thanks
>>>> Deepak
>>>> www.bigdatabig.com
>>>> www.keosha.net
>>>>
>>>
Re: getting error: value toDF is not a member of Seq[columns]
Posted by Manu Zhang <ow...@gmail.com>.
Have you tried adding Encoder for columns as suggested by Jungtaek Lim ?
On Thu, Sep 6, 2018 at 6:24 AM Mich Talebzadeh <mi...@gmail.com>
wrote:
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> I can rebuild the comma separated list as follows:
>
>
> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
> PRICE: Float)
> val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
> import sqlContext.implicits._
>
>
> for(line <- pricesRDD.collect.toArray)
> {
> var key = line._2.split(',').view(0).toString
> var ticker = line._2.split(',').view(1).toString
> var timeissued = line._2.split(',').view(2).toString
> var price = line._2.split(',').view(3).toFloat
> var allInOne = key+","+ticker+","+timeissued+","+price
> println(allInOne)
>
> and the print shows the columns separated by ","
>
>
> 34e07d9f-829a-446a-93ab-8b93aa8eda41,SAP,2018-09-05T23:22:34,56.89
>
> So I just need to convert that line of rowinto a DataFrame
>
> I try this conversion to DF to write to MongoDB document with MongoSpark.save(df,
> writeConfig)
>
> var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued,
> price))).toDF
>
> [error]
> /data6/hduser/scala/md_streaming_mongoDB/src/main/scala/myPackage/md_streaming_mongoDB.scala:235:
> value toDF is not a member of org.apache.spark.rdd.RDD[columns]
> [error] var df = sparkContext.parallelize(Seq(columns(key,
> ticker, timeissued, price))).toDF
> [
>
>
> frustrating!
>
> has anyone come across this?
>
> thanks
>
> On Wed, 5 Sep 2018 at 13:30, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> yep already tried it and it did not work.
>>
>> thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <de...@gmail.com> wrote:
>>
>>> Try this:
>>>
>>> *import **spark*.implicits._
>>>
>>> df.toDF()
>>>
>>>
>>> On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> With the following
>>>>
>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>>>> PRICE: Float)
>>>>
>>>> var key = line._2.split(',').view(0).toString
>>>> var ticker = line._2.split(',').view(1).toString
>>>> var timeissued = line._2.split(',').view(2).toString
>>>> var price = line._2.split(',').view(3).toFloat
>>>>
>>>> var df = Seq(columns(key, ticker, timeissued, price))
>>>> println(df)
>>>>
>>>> I get
>>>>
>>>>
>>>> List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))
>>>>
>>>> So just need to convert that list to DF
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <mi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks!
>>>>>
>>>>> The spark is version 2.3.0
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <ka...@gmail.com> wrote:
>>>>>
>>>>>> You may also find below link useful (though it looks far old), since
>>>>>> case class is the thing which Encoder is available, so there may be another
>>>>>> reason which prevent implicit conversion.
>>>>>>
>>>>>>
>>>>>> https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache/m-p/29994/highlight/true#M973
>>>>>>
>>>>>> And which Spark version do you use?
>>>>>>
>>>>>>
>>>>>> 2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>>>
>>>>>>> Sorry I guess I pasted another method. the code is...
>>>>>>>
>>>>>>> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
>>>>>>> DatasetHolder(_sqlContext.createDataset(s))
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>>>>
>>>>>>>> I guess you need to have encoder for the type of result for
>>>>>>>> columns().
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
>>>>>>>>
>>>>>>>> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
>>>>>>>> DatasetHolder(_sqlContext.createDataset(rdd))
>>>>>>>> }
>>>>>>>>
>>>>>>>> You can see lots of Encoder implementations in the scala code. If
>>>>>>>> your type doesn't match anything it may not work and you need to provide
>>>>>>>> custom Encoder.
>>>>>>>>
>>>>>>>> -Jungtaek Lim (HeartSaVioR)
>>>>>>>>
>>>>>>>> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <mi...@gmail.com>님이
>>>>>>>> 작성:
>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> I already do that as below
>>>>>>>>>
>>>>>>>>> val sqlContext= new
>>>>>>>>> org.apache.spark.sql.SQLContext(sparkContext)
>>>>>>>>> import sqlContext.implicits._
>>>>>>>>>
>>>>>>>>> but still getting the error!
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <ka...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> You may need to import implicits from your spark session like
>>>>>>>>>> below:
>>>>>>>>>> (Below code is borrowed from
>>>>>>>>>> https://spark.apache.org/docs/latest/sql-programming-guide.html)
>>>>>>>>>>
>>>>>>>>>> import org.apache.spark.sql.SparkSession
>>>>>>>>>> val spark = SparkSession
>>>>>>>>>> .builder()
>>>>>>>>>> .appName("Spark SQL basic example")
>>>>>>>>>> .config("spark.some.config.option", "some-value")
>>>>>>>>>> .getOrCreate()
>>>>>>>>>> // For implicit conversions like converting RDDs to DataFramesimport spark.implicits._
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <
>>>>>>>>>> mich.talebzadeh@gmail.com>님이 작성:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I have spark streaming that send data and I need to put that
>>>>>>>>>>> data into MongoDB for test purposes. The easiest way is to create a DF from
>>>>>>>>>>> the individual list of columns as below
>>>>>>>>>>>
>>>>>>>>>>> I loop over individual rows in RDD and perform the following
>>>>>>>>>>>
>>>>>>>>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED:
>>>>>>>>>>> String, PRICE: Float)
>>>>>>>>>>>
>>>>>>>>>>> for(line <- pricesRDD.collect.toArray)
>>>>>>>>>>> {
>>>>>>>>>>> var key = line._2.split(',').view(0).toString
>>>>>>>>>>> var ticker = line._2.split(',').view(1).toString
>>>>>>>>>>> var timeissued = line._2.split(',').view(2).toString
>>>>>>>>>>> var price = line._2.split(',').view(3).toFloat
>>>>>>>>>>> val priceToString = line._2.split(',').view(3)
>>>>>>>>>>> if (price > 90.0)
>>>>>>>>>>> {
>>>>>>>>>>> println ("price > 90.0, saving to MongoDB
>>>>>>>>>>> collection!")
>>>>>>>>>>> // Save prices to mongoDB collection
>>>>>>>>>>> * var df = Seq(columns(key, ticker, timeissued,
>>>>>>>>>>> price)).toDF*
>>>>>>>>>>>
>>>>>>>>>>> but it fails with message
>>>>>>>>>>>
>>>>>>>>>>> value toDF is not a member of Seq[columns].
>>>>>>>>>>>
>>>>>>>>>>> What would be the easiest way of resolving this please?
>>>>>>>>>>>
>>>>>>>>>>> thanks
>>>>>>>>>>>
>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>>> responsibility for any loss, damage or destruction of data or any other
>>>>>>>>>>> property which may arise from relying on this email's technical content is
>>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>
>>> --
>>> Thanks
>>> Deepak
>>> www.bigdatabig.com
>>> www.keosha.net
>>>
>>
Re: getting error: value toDF is not a member of Seq[columns]
Posted by Mich Talebzadeh <mi...@gmail.com>.
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
I can rebuild the comma separated list as follows:
case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
PRICE: Float)
val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
import sqlContext.implicits._
for(line <- pricesRDD.collect.toArray)
{
var key = line._2.split(',').view(0).toString
var ticker = line._2.split(',').view(1).toString
var timeissued = line._2.split(',').view(2).toString
var price = line._2.split(',').view(3).toFloat
var allInOne = key+","+ticker+","+timeissued+","+price
println(allInOne)
and the print shows the columns separated by ","
34e07d9f-829a-446a-93ab-8b93aa8eda41,SAP,2018-09-05T23:22:34,56.89
So I just need to convert that line of rowinto a DataFrame
I try this conversion to DF to write to MongoDB document with
MongoSpark.save(df,
writeConfig)
var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued,
price))).toDF
[error]
/data6/hduser/scala/md_streaming_mongoDB/src/main/scala/myPackage/md_streaming_mongoDB.scala:235:
value toDF is not a member of org.apache.spark.rdd.RDD[columns]
[error] var df = sparkContext.parallelize(Seq(columns(key,
ticker, timeissued, price))).toDF
[
frustrating!
has anyone come across this?
thanks
On Wed, 5 Sep 2018 at 13:30, Mich Talebzadeh <mi...@gmail.com>
wrote:
> yep already tried it and it did not work.
>
> thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <de...@gmail.com> wrote:
>
>> Try this:
>>
>> *import **spark*.implicits._
>>
>> df.toDF()
>>
>>
>> On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> With the following
>>>
>>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>>> PRICE: Float)
>>>
>>> var key = line._2.split(',').view(0).toString
>>> var ticker = line._2.split(',').view(1).toString
>>> var timeissued = line._2.split(',').view(2).toString
>>> var price = line._2.split(',').view(3).toFloat
>>>
>>> var df = Seq(columns(key, ticker, timeissued, price))
>>> println(df)
>>>
>>> I get
>>>
>>>
>>> List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))
>>>
>>> So just need to convert that list to DF
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <mi...@gmail.com>
>>> wrote:
>>>
>>>> Thanks!
>>>>
>>>> The spark is version 2.3.0
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <ka...@gmail.com> wrote:
>>>>
>>>>> You may also find below link useful (though it looks far old), since
>>>>> case class is the thing which Encoder is available, so there may be another
>>>>> reason which prevent implicit conversion.
>>>>>
>>>>>
>>>>> https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache/m-p/29994/highlight/true#M973
>>>>>
>>>>> And which Spark version do you use?
>>>>>
>>>>>
>>>>> 2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>>
>>>>>> Sorry I guess I pasted another method. the code is...
>>>>>>
>>>>>> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
>>>>>> DatasetHolder(_sqlContext.createDataset(s))
>>>>>> }
>>>>>>
>>>>>>
>>>>>> 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>>>
>>>>>>> I guess you need to have encoder for the type of result for
>>>>>>> columns().
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
>>>>>>>
>>>>>>> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
>>>>>>> DatasetHolder(_sqlContext.createDataset(rdd))
>>>>>>> }
>>>>>>>
>>>>>>> You can see lots of Encoder implementations in the scala code. If
>>>>>>> your type doesn't match anything it may not work and you need to provide
>>>>>>> custom Encoder.
>>>>>>>
>>>>>>> -Jungtaek Lim (HeartSaVioR)
>>>>>>>
>>>>>>> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <mi...@gmail.com>님이
>>>>>>> 작성:
>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> I already do that as below
>>>>>>>>
>>>>>>>> val sqlContext= new
>>>>>>>> org.apache.spark.sql.SQLContext(sparkContext)
>>>>>>>> import sqlContext.implicits._
>>>>>>>>
>>>>>>>> but still getting the error!
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <ka...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> You may need to import implicits from your spark session like
>>>>>>>>> below:
>>>>>>>>> (Below code is borrowed from
>>>>>>>>> https://spark.apache.org/docs/latest/sql-programming-guide.html)
>>>>>>>>>
>>>>>>>>> import org.apache.spark.sql.SparkSession
>>>>>>>>> val spark = SparkSession
>>>>>>>>> .builder()
>>>>>>>>> .appName("Spark SQL basic example")
>>>>>>>>> .config("spark.some.config.option", "some-value")
>>>>>>>>> .getOrCreate()
>>>>>>>>> // For implicit conversions like converting RDDs to DataFramesimport spark.implicits._
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <
>>>>>>>>> mich.talebzadeh@gmail.com>님이 작성:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I have spark streaming that send data and I need to put that data
>>>>>>>>>> into MongoDB for test purposes. The easiest way is to create a DF from the
>>>>>>>>>> individual list of columns as below
>>>>>>>>>>
>>>>>>>>>> I loop over individual rows in RDD and perform the following
>>>>>>>>>>
>>>>>>>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED:
>>>>>>>>>> String, PRICE: Float)
>>>>>>>>>>
>>>>>>>>>> for(line <- pricesRDD.collect.toArray)
>>>>>>>>>> {
>>>>>>>>>> var key = line._2.split(',').view(0).toString
>>>>>>>>>> var ticker = line._2.split(',').view(1).toString
>>>>>>>>>> var timeissued = line._2.split(',').view(2).toString
>>>>>>>>>> var price = line._2.split(',').view(3).toFloat
>>>>>>>>>> val priceToString = line._2.split(',').view(3)
>>>>>>>>>> if (price > 90.0)
>>>>>>>>>> {
>>>>>>>>>> println ("price > 90.0, saving to MongoDB
>>>>>>>>>> collection!")
>>>>>>>>>> // Save prices to mongoDB collection
>>>>>>>>>> * var df = Seq(columns(key, ticker, timeissued,
>>>>>>>>>> price)).toDF*
>>>>>>>>>>
>>>>>>>>>> but it fails with message
>>>>>>>>>>
>>>>>>>>>> value toDF is not a member of Seq[columns].
>>>>>>>>>>
>>>>>>>>>> What would be the easiest way of resolving this please?
>>>>>>>>>>
>>>>>>>>>> thanks
>>>>>>>>>>
>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>> responsibility for any loss, damage or destruction of data or any other
>>>>>>>>>> property which may arise from relying on this email's technical content is
>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>
>> --
>> Thanks
>> Deepak
>> www.bigdatabig.com
>> www.keosha.net
>>
>
Re: getting error: value toDF is not a member of Seq[columns]
Posted by Mich Talebzadeh <mi...@gmail.com>.
yep already tried it and it did not work.
thanks
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <de...@gmail.com> wrote:
> Try this:
>
> *import **spark*.implicits._
>
> df.toDF()
>
>
> On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> With the following
>>
>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>> PRICE: Float)
>>
>> var key = line._2.split(',').view(0).toString
>> var ticker = line._2.split(',').view(1).toString
>> var timeissued = line._2.split(',').view(2).toString
>> var price = line._2.split(',').view(3).toFloat
>>
>> var df = Seq(columns(key, ticker, timeissued, price))
>> println(df)
>>
>> I get
>>
>>
>> List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))
>>
>> So just need to convert that list to DF
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> Thanks!
>>>
>>> The spark is version 2.3.0
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <ka...@gmail.com> wrote:
>>>
>>>> You may also find below link useful (though it looks far old), since
>>>> case class is the thing which Encoder is available, so there may be another
>>>> reason which prevent implicit conversion.
>>>>
>>>>
>>>> https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache/m-p/29994/highlight/true#M973
>>>>
>>>> And which Spark version do you use?
>>>>
>>>>
>>>> 2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>
>>>>> Sorry I guess I pasted another method. the code is...
>>>>>
>>>>> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
>>>>> DatasetHolder(_sqlContext.createDataset(s))
>>>>> }
>>>>>
>>>>>
>>>>> 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>>
>>>>>> I guess you need to have encoder for the type of result for columns().
>>>>>>
>>>>>>
>>>>>> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
>>>>>>
>>>>>> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
>>>>>> DatasetHolder(_sqlContext.createDataset(rdd))
>>>>>> }
>>>>>>
>>>>>> You can see lots of Encoder implementations in the scala code. If
>>>>>> your type doesn't match anything it may not work and you need to provide
>>>>>> custom Encoder.
>>>>>>
>>>>>> -Jungtaek Lim (HeartSaVioR)
>>>>>>
>>>>>> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <mi...@gmail.com>님이
>>>>>> 작성:
>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> I already do that as below
>>>>>>>
>>>>>>> val sqlContext= new
>>>>>>> org.apache.spark.sql.SQLContext(sparkContext)
>>>>>>> import sqlContext.implicits._
>>>>>>>
>>>>>>> but still getting the error!
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <ka...@gmail.com> wrote:
>>>>>>>
>>>>>>>> You may need to import implicits from your spark session like below:
>>>>>>>> (Below code is borrowed from
>>>>>>>> https://spark.apache.org/docs/latest/sql-programming-guide.html)
>>>>>>>>
>>>>>>>> import org.apache.spark.sql.SparkSession
>>>>>>>> val spark = SparkSession
>>>>>>>> .builder()
>>>>>>>> .appName("Spark SQL basic example")
>>>>>>>> .config("spark.some.config.option", "some-value")
>>>>>>>> .getOrCreate()
>>>>>>>> // For implicit conversions like converting RDDs to DataFramesimport spark.implicits._
>>>>>>>>
>>>>>>>>
>>>>>>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <mi...@gmail.com>님이
>>>>>>>> 작성:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I have spark streaming that send data and I need to put that data
>>>>>>>>> into MongoDB for test purposes. The easiest way is to create a DF from the
>>>>>>>>> individual list of columns as below
>>>>>>>>>
>>>>>>>>> I loop over individual rows in RDD and perform the following
>>>>>>>>>
>>>>>>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED:
>>>>>>>>> String, PRICE: Float)
>>>>>>>>>
>>>>>>>>> for(line <- pricesRDD.collect.toArray)
>>>>>>>>> {
>>>>>>>>> var key = line._2.split(',').view(0).toString
>>>>>>>>> var ticker = line._2.split(',').view(1).toString
>>>>>>>>> var timeissued = line._2.split(',').view(2).toString
>>>>>>>>> var price = line._2.split(',').view(3).toFloat
>>>>>>>>> val priceToString = line._2.split(',').view(3)
>>>>>>>>> if (price > 90.0)
>>>>>>>>> {
>>>>>>>>> println ("price > 90.0, saving to MongoDB
>>>>>>>>> collection!")
>>>>>>>>> // Save prices to mongoDB collection
>>>>>>>>> * var df = Seq(columns(key, ticker, timeissued,
>>>>>>>>> price)).toDF*
>>>>>>>>>
>>>>>>>>> but it fails with message
>>>>>>>>>
>>>>>>>>> value toDF is not a member of Seq[columns].
>>>>>>>>>
>>>>>>>>> What would be the easiest way of resolving this please?
>>>>>>>>>
>>>>>>>>> thanks
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>
Re: getting error: value toDF is not a member of Seq[columns]
Posted by Deepak Sharma <de...@gmail.com>.
Try this:
*import **spark*.implicits._
df.toDF()
On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <mi...@gmail.com>
wrote:
> With the following
>
> case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE:
> Float)
>
> var key = line._2.split(',').view(0).toString
> var ticker = line._2.split(',').view(1).toString
> var timeissued = line._2.split(',').view(2).toString
> var price = line._2.split(',').view(3).toFloat
>
> var df = Seq(columns(key, ticker, timeissued, price))
> println(df)
>
> I get
>
>
> List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))
>
> So just need to convert that list to DF
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Thanks!
>>
>> The spark is version 2.3.0
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <ka...@gmail.com> wrote:
>>
>>> You may also find below link useful (though it looks far old), since
>>> case class is the thing which Encoder is available, so there may be another
>>> reason which prevent implicit conversion.
>>>
>>>
>>> https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache/m-p/29994/highlight/true#M973
>>>
>>> And which Spark version do you use?
>>>
>>>
>>> 2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>
>>>> Sorry I guess I pasted another method. the code is...
>>>>
>>>> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
>>>> DatasetHolder(_sqlContext.createDataset(s))
>>>> }
>>>>
>>>>
>>>> 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>
>>>>> I guess you need to have encoder for the type of result for columns().
>>>>>
>>>>>
>>>>> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
>>>>>
>>>>> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
>>>>> DatasetHolder(_sqlContext.createDataset(rdd))
>>>>> }
>>>>>
>>>>> You can see lots of Encoder implementations in the scala code. If your
>>>>> type doesn't match anything it may not work and you need to provide custom
>>>>> Encoder.
>>>>>
>>>>> -Jungtaek Lim (HeartSaVioR)
>>>>>
>>>>> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <mi...@gmail.com>님이
>>>>> 작성:
>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> I already do that as below
>>>>>>
>>>>>> val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
>>>>>> import sqlContext.implicits._
>>>>>>
>>>>>> but still getting the error!
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <ka...@gmail.com> wrote:
>>>>>>
>>>>>>> You may need to import implicits from your spark session like below:
>>>>>>> (Below code is borrowed from
>>>>>>> https://spark.apache.org/docs/latest/sql-programming-guide.html)
>>>>>>>
>>>>>>> import org.apache.spark.sql.SparkSession
>>>>>>> val spark = SparkSession
>>>>>>> .builder()
>>>>>>> .appName("Spark SQL basic example")
>>>>>>> .config("spark.some.config.option", "some-value")
>>>>>>> .getOrCreate()
>>>>>>> // For implicit conversions like converting RDDs to DataFramesimport spark.implicits._
>>>>>>>
>>>>>>>
>>>>>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <mi...@gmail.com>님이
>>>>>>> 작성:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I have spark streaming that send data and I need to put that data
>>>>>>>> into MongoDB for test purposes. The easiest way is to create a DF from the
>>>>>>>> individual list of columns as below
>>>>>>>>
>>>>>>>> I loop over individual rows in RDD and perform the following
>>>>>>>>
>>>>>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED:
>>>>>>>> String, PRICE: Float)
>>>>>>>>
>>>>>>>> for(line <- pricesRDD.collect.toArray)
>>>>>>>> {
>>>>>>>> var key = line._2.split(',').view(0).toString
>>>>>>>> var ticker = line._2.split(',').view(1).toString
>>>>>>>> var timeissued = line._2.split(',').view(2).toString
>>>>>>>> var price = line._2.split(',').view(3).toFloat
>>>>>>>> val priceToString = line._2.split(',').view(3)
>>>>>>>> if (price > 90.0)
>>>>>>>> {
>>>>>>>> println ("price > 90.0, saving to MongoDB collection!")
>>>>>>>> // Save prices to mongoDB collection
>>>>>>>> * var df = Seq(columns(key, ticker, timeissued,
>>>>>>>> price)).toDF*
>>>>>>>>
>>>>>>>> but it fails with message
>>>>>>>>
>>>>>>>> value toDF is not a member of Seq[columns].
>>>>>>>>
>>>>>>>> What would be the easiest way of resolving this please?
>>>>>>>>
>>>>>>>> thanks
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
--
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
Re: getting error: value toDF is not a member of Seq[columns]
Posted by Mich Talebzadeh <mi...@gmail.com>.
With the following
case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE:
Float)
var key = line._2.split(',').view(0).toString
var ticker = line._2.split(',').view(1).toString
var timeissued = line._2.split(',').view(2).toString
var price = line._2.split(',').view(3).toFloat
var df = Seq(columns(key, ticker, timeissued, price))
println(df)
I get
List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))
So just need to convert that list to DF
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <mi...@gmail.com>
wrote:
> Thanks!
>
> The spark is version 2.3.0
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <ka...@gmail.com> wrote:
>
>> You may also find below link useful (though it looks far old), since case
>> class is the thing which Encoder is available, so there may be another
>> reason which prevent implicit conversion.
>>
>>
>> https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache/m-p/29994/highlight/true#M973
>>
>> And which Spark version do you use?
>>
>>
>> 2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>
>>> Sorry I guess I pasted another method. the code is...
>>>
>>> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
>>> DatasetHolder(_sqlContext.createDataset(s))
>>> }
>>>
>>>
>>> 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>
>>>> I guess you need to have encoder for the type of result for columns().
>>>>
>>>>
>>>> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
>>>>
>>>> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
>>>> DatasetHolder(_sqlContext.createDataset(rdd))
>>>> }
>>>>
>>>> You can see lots of Encoder implementations in the scala code. If your
>>>> type doesn't match anything it may not work and you need to provide custom
>>>> Encoder.
>>>>
>>>> -Jungtaek Lim (HeartSaVioR)
>>>>
>>>> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <mi...@gmail.com>님이
>>>> 작성:
>>>>
>>>>> Thanks
>>>>>
>>>>> I already do that as below
>>>>>
>>>>> val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
>>>>> import sqlContext.implicits._
>>>>>
>>>>> but still getting the error!
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <ka...@gmail.com> wrote:
>>>>>
>>>>>> You may need to import implicits from your spark session like below:
>>>>>> (Below code is borrowed from
>>>>>> https://spark.apache.org/docs/latest/sql-programming-guide.html)
>>>>>>
>>>>>> import org.apache.spark.sql.SparkSession
>>>>>> val spark = SparkSession
>>>>>> .builder()
>>>>>> .appName("Spark SQL basic example")
>>>>>> .config("spark.some.config.option", "some-value")
>>>>>> .getOrCreate()
>>>>>> // For implicit conversions like converting RDDs to DataFramesimport spark.implicits._
>>>>>>
>>>>>>
>>>>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <mi...@gmail.com>님이
>>>>>> 작성:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have spark streaming that send data and I need to put that data
>>>>>>> into MongoDB for test purposes. The easiest way is to create a DF from the
>>>>>>> individual list of columns as below
>>>>>>>
>>>>>>> I loop over individual rows in RDD and perform the following
>>>>>>>
>>>>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED:
>>>>>>> String, PRICE: Float)
>>>>>>>
>>>>>>> for(line <- pricesRDD.collect.toArray)
>>>>>>> {
>>>>>>> var key = line._2.split(',').view(0).toString
>>>>>>> var ticker = line._2.split(',').view(1).toString
>>>>>>> var timeissued = line._2.split(',').view(2).toString
>>>>>>> var price = line._2.split(',').view(3).toFloat
>>>>>>> val priceToString = line._2.split(',').view(3)
>>>>>>> if (price > 90.0)
>>>>>>> {
>>>>>>> println ("price > 90.0, saving to MongoDB collection!")
>>>>>>> // Save prices to mongoDB collection
>>>>>>> * var df = Seq(columns(key, ticker, timeissued,
>>>>>>> price)).toDF*
>>>>>>>
>>>>>>> but it fails with message
>>>>>>>
>>>>>>> value toDF is not a member of Seq[columns].
>>>>>>>
>>>>>>> What would be the easiest way of resolving this please?
>>>>>>>
>>>>>>> thanks
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
Re: getting error: value toDF is not a member of Seq[columns]
Posted by Mich Talebzadeh <mi...@gmail.com>.
Thanks!
The spark is version 2.3.0
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <ka...@gmail.com> wrote:
> You may also find below link useful (though it looks far old), since case
> class is the thing which Encoder is available, so there may be another
> reason which prevent implicit conversion.
>
>
> https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache/m-p/29994/highlight/true#M973
>
> And which Spark version do you use?
>
>
> 2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <ka...@gmail.com>님이 작성:
>
>> Sorry I guess I pasted another method. the code is...
>>
>> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
>> DatasetHolder(_sqlContext.createDataset(s))
>> }
>>
>>
>> 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>
>>> I guess you need to have encoder for the type of result for columns().
>>>
>>>
>>> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
>>>
>>> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
>>> DatasetHolder(_sqlContext.createDataset(rdd))
>>> }
>>>
>>> You can see lots of Encoder implementations in the scala code. If your
>>> type doesn't match anything it may not work and you need to provide custom
>>> Encoder.
>>>
>>> -Jungtaek Lim (HeartSaVioR)
>>>
>>> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <mi...@gmail.com>님이
>>> 작성:
>>>
>>>> Thanks
>>>>
>>>> I already do that as below
>>>>
>>>> val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
>>>> import sqlContext.implicits._
>>>>
>>>> but still getting the error!
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <ka...@gmail.com> wrote:
>>>>
>>>>> You may need to import implicits from your spark session like below:
>>>>> (Below code is borrowed from
>>>>> https://spark.apache.org/docs/latest/sql-programming-guide.html)
>>>>>
>>>>> import org.apache.spark.sql.SparkSession
>>>>> val spark = SparkSession
>>>>> .builder()
>>>>> .appName("Spark SQL basic example")
>>>>> .config("spark.some.config.option", "some-value")
>>>>> .getOrCreate()
>>>>> // For implicit conversions like converting RDDs to DataFramesimport spark.implicits._
>>>>>
>>>>>
>>>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <mi...@gmail.com>님이
>>>>> 작성:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have spark streaming that send data and I need to put that data
>>>>>> into MongoDB for test purposes. The easiest way is to create a DF from the
>>>>>> individual list of columns as below
>>>>>>
>>>>>> I loop over individual rows in RDD and perform the following
>>>>>>
>>>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED:
>>>>>> String, PRICE: Float)
>>>>>>
>>>>>> for(line <- pricesRDD.collect.toArray)
>>>>>> {
>>>>>> var key = line._2.split(',').view(0).toString
>>>>>> var ticker = line._2.split(',').view(1).toString
>>>>>> var timeissued = line._2.split(',').view(2).toString
>>>>>> var price = line._2.split(',').view(3).toFloat
>>>>>> val priceToString = line._2.split(',').view(3)
>>>>>> if (price > 90.0)
>>>>>> {
>>>>>> println ("price > 90.0, saving to MongoDB collection!")
>>>>>> // Save prices to mongoDB collection
>>>>>> * var df = Seq(columns(key, ticker, timeissued,
>>>>>> price)).toDF*
>>>>>>
>>>>>> but it fails with message
>>>>>>
>>>>>> value toDF is not a member of Seq[columns].
>>>>>>
>>>>>> What would be the easiest way of resolving this please?
>>>>>>
>>>>>> thanks
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>
Re: getting error: value toDF is not a member of Seq[columns]
Posted by Jungtaek Lim <ka...@gmail.com>.
You may also find below link useful (though it looks far old), since case
class is the thing which Encoder is available, so there may be another
reason which prevent implicit conversion.
https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache/m-p/29994/highlight/true#M973
And which Spark version do you use?
2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <ka...@gmail.com>님이 작성:
> Sorry I guess I pasted another method. the code is...
>
> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
> DatasetHolder(_sqlContext.createDataset(s))
> }
>
>
> 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <ka...@gmail.com>님이 작성:
>
>> I guess you need to have encoder for the type of result for columns().
>>
>>
>> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
>>
>> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
>> DatasetHolder(_sqlContext.createDataset(rdd))
>> }
>>
>> You can see lots of Encoder implementations in the scala code. If your
>> type doesn't match anything it may not work and you need to provide custom
>> Encoder.
>>
>> -Jungtaek Lim (HeartSaVioR)
>>
>> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <mi...@gmail.com>님이
>> 작성:
>>
>>> Thanks
>>>
>>> I already do that as below
>>>
>>> val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
>>> import sqlContext.implicits._
>>>
>>> but still getting the error!
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <ka...@gmail.com> wrote:
>>>
>>>> You may need to import implicits from your spark session like below:
>>>> (Below code is borrowed from
>>>> https://spark.apache.org/docs/latest/sql-programming-guide.html)
>>>>
>>>> import org.apache.spark.sql.SparkSession
>>>> val spark = SparkSession
>>>> .builder()
>>>> .appName("Spark SQL basic example")
>>>> .config("spark.some.config.option", "some-value")
>>>> .getOrCreate()
>>>> // For implicit conversions like converting RDDs to DataFramesimport spark.implicits._
>>>>
>>>>
>>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <mi...@gmail.com>님이
>>>> 작성:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have spark streaming that send data and I need to put that data into
>>>>> MongoDB for test purposes. The easiest way is to create a DF from the
>>>>> individual list of columns as below
>>>>>
>>>>> I loop over individual rows in RDD and perform the following
>>>>>
>>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED:
>>>>> String, PRICE: Float)
>>>>>
>>>>> for(line <- pricesRDD.collect.toArray)
>>>>> {
>>>>> var key = line._2.split(',').view(0).toString
>>>>> var ticker = line._2.split(',').view(1).toString
>>>>> var timeissued = line._2.split(',').view(2).toString
>>>>> var price = line._2.split(',').view(3).toFloat
>>>>> val priceToString = line._2.split(',').view(3)
>>>>> if (price > 90.0)
>>>>> {
>>>>> println ("price > 90.0, saving to MongoDB collection!")
>>>>> // Save prices to mongoDB collection
>>>>> * var df = Seq(columns(key, ticker, timeissued,
>>>>> price)).toDF*
>>>>>
>>>>> but it fails with message
>>>>>
>>>>> value toDF is not a member of Seq[columns].
>>>>>
>>>>> What would be the easiest way of resolving this please?
>>>>>
>>>>> thanks
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>
Re: getting error: value toDF is not a member of Seq[columns]
Posted by Jungtaek Lim <ka...@gmail.com>.
Sorry I guess I pasted another method. the code is...
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]):
DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(s))
}
2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <ka...@gmail.com>님이 작성:
> I guess you need to have encoder for the type of result for columns().
>
>
> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
>
> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
> DatasetHolder(_sqlContext.createDataset(rdd))
> }
>
> You can see lots of Encoder implementations in the scala code. If your
> type doesn't match anything it may not work and you need to provide custom
> Encoder.
>
> -Jungtaek Lim (HeartSaVioR)
>
> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <mi...@gmail.com>님이 작성:
>
>> Thanks
>>
>> I already do that as below
>>
>> val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
>> import sqlContext.implicits._
>>
>> but still getting the error!
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <ka...@gmail.com> wrote:
>>
>>> You may need to import implicits from your spark session like below:
>>> (Below code is borrowed from
>>> https://spark.apache.org/docs/latest/sql-programming-guide.html)
>>>
>>> import org.apache.spark.sql.SparkSession
>>> val spark = SparkSession
>>> .builder()
>>> .appName("Spark SQL basic example")
>>> .config("spark.some.config.option", "some-value")
>>> .getOrCreate()
>>> // For implicit conversions like converting RDDs to DataFramesimport spark.implicits._
>>>
>>>
>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <mi...@gmail.com>님이
>>> 작성:
>>>
>>>> Hi,
>>>>
>>>> I have spark streaming that send data and I need to put that data into
>>>> MongoDB for test purposes. The easiest way is to create a DF from the
>>>> individual list of columns as below
>>>>
>>>> I loop over individual rows in RDD and perform the following
>>>>
>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED:
>>>> String, PRICE: Float)
>>>>
>>>> for(line <- pricesRDD.collect.toArray)
>>>> {
>>>> var key = line._2.split(',').view(0).toString
>>>> var ticker = line._2.split(',').view(1).toString
>>>> var timeissued = line._2.split(',').view(2).toString
>>>> var price = line._2.split(',').view(3).toFloat
>>>> val priceToString = line._2.split(',').view(3)
>>>> if (price > 90.0)
>>>> {
>>>> println ("price > 90.0, saving to MongoDB collection!")
>>>> // Save prices to mongoDB collection
>>>> * var df = Seq(columns(key, ticker, timeissued,
>>>> price)).toDF*
>>>>
>>>> but it fails with message
>>>>
>>>> value toDF is not a member of Seq[columns].
>>>>
>>>> What would be the easiest way of resolving this please?
>>>>
>>>> thanks
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>
Re: getting error: value toDF is not a member of Seq[columns]
Posted by Jungtaek Lim <ka...@gmail.com>.
I guess you need to have encoder for the type of result for columns().
https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}
You can see lots of Encoder implementations in the scala code. If your type
doesn't match anything it may not work and you need to provide custom
Encoder.
-Jungtaek Lim (HeartSaVioR)
2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <mi...@gmail.com>님이 작성:
> Thanks
>
> I already do that as below
>
> val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
> import sqlContext.implicits._
>
> but still getting the error!
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <ka...@gmail.com> wrote:
>
>> You may need to import implicits from your spark session like below:
>> (Below code is borrowed from
>> https://spark.apache.org/docs/latest/sql-programming-guide.html)
>>
>> import org.apache.spark.sql.SparkSession
>> val spark = SparkSession
>> .builder()
>> .appName("Spark SQL basic example")
>> .config("spark.some.config.option", "some-value")
>> .getOrCreate()
>> // For implicit conversions like converting RDDs to DataFramesimport spark.implicits._
>>
>>
>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <mi...@gmail.com>님이
>> 작성:
>>
>>> Hi,
>>>
>>> I have spark streaming that send data and I need to put that data into
>>> MongoDB for test purposes. The easiest way is to create a DF from the
>>> individual list of columns as below
>>>
>>> I loop over individual rows in RDD and perform the following
>>>
>>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>>> PRICE: Float)
>>>
>>> for(line <- pricesRDD.collect.toArray)
>>> {
>>> var key = line._2.split(',').view(0).toString
>>> var ticker = line._2.split(',').view(1).toString
>>> var timeissued = line._2.split(',').view(2).toString
>>> var price = line._2.split(',').view(3).toFloat
>>> val priceToString = line._2.split(',').view(3)
>>> if (price > 90.0)
>>> {
>>> println ("price > 90.0, saving to MongoDB collection!")
>>> // Save prices to mongoDB collection
>>> * var df = Seq(columns(key, ticker, timeissued, price)).toDF*
>>>
>>> but it fails with message
>>>
>>> value toDF is not a member of Seq[columns].
>>>
>>> What would be the easiest way of resolving this please?
>>>
>>> thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>
Re: getting error: value toDF is not a member of Seq[columns]
Posted by Mich Talebzadeh <mi...@gmail.com>.
Thanks
I already do that as below
val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
import sqlContext.implicits._
but still getting the error!
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <ka...@gmail.com> wrote:
> You may need to import implicits from your spark session like below:
> (Below code is borrowed from
> https://spark.apache.org/docs/latest/sql-programming-guide.html)
>
> import org.apache.spark.sql.SparkSession
> val spark = SparkSession
> .builder()
> .appName("Spark SQL basic example")
> .config("spark.some.config.option", "some-value")
> .getOrCreate()
> // For implicit conversions like converting RDDs to DataFramesimport spark.implicits._
>
>
> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <mi...@gmail.com>님이 작성:
>
>> Hi,
>>
>> I have spark streaming that send data and I need to put that data into
>> MongoDB for test purposes. The easiest way is to create a DF from the
>> individual list of columns as below
>>
>> I loop over individual rows in RDD and perform the following
>>
>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>> PRICE: Float)
>>
>> for(line <- pricesRDD.collect.toArray)
>> {
>> var key = line._2.split(',').view(0).toString
>> var ticker = line._2.split(',').view(1).toString
>> var timeissued = line._2.split(',').view(2).toString
>> var price = line._2.split(',').view(3).toFloat
>> val priceToString = line._2.split(',').view(3)
>> if (price > 90.0)
>> {
>> println ("price > 90.0, saving to MongoDB collection!")
>> // Save prices to mongoDB collection
>> * var df = Seq(columns(key, ticker, timeissued, price)).toDF*
>>
>> but it fails with message
>>
>> value toDF is not a member of Seq[columns].
>>
>> What would be the easiest way of resolving this please?
>>
>> thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>
Re: getting error: value toDF is not a member of Seq[columns]
Posted by Jungtaek Lim <ka...@gmail.com>.
You may need to import implicits from your spark session like below:
(Below code is borrowed from
https://spark.apache.org/docs/latest/sql-programming-guide.html)
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFramesimport
spark.implicits._
2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <mi...@gmail.com>님이 작성:
> Hi,
>
> I have spark streaming that send data and I need to put that data into
> MongoDB for test purposes. The easiest way is to create a DF from the
> individual list of columns as below
>
> I loop over individual rows in RDD and perform the following
>
> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
> PRICE: Float)
>
> for(line <- pricesRDD.collect.toArray)
> {
> var key = line._2.split(',').view(0).toString
> var ticker = line._2.split(',').view(1).toString
> var timeissued = line._2.split(',').view(2).toString
> var price = line._2.split(',').view(3).toFloat
> val priceToString = line._2.split(',').view(3)
> if (price > 90.0)
> {
> println ("price > 90.0, saving to MongoDB collection!")
> // Save prices to mongoDB collection
> * var df = Seq(columns(key, ticker, timeissued, price)).toDF*
>
> but it fails with message
>
> value toDF is not a member of Seq[columns].
>
> What would be the easiest way of resolving this please?
>
> thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>