You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Zahid Rahman <za...@gmail.com> on 2020/03/28 01:32:17 UTC

spark.readStream.schema(??)

version: spark-3.0.0-preview2-bin-hadoop2.7

The syntax checker objects to the following argument which is what I am
supposed to enter.

.schema(staticSchema)

However when I  provide the  following argument it works but I don't think
that is correct.
What is the correct argument for this case ?

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {


  def main(args: Array[String]): Unit = {

    // crete spark session
    val spark =
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail
Data").getOrCreate();
    // set spark runtime  configuration
    spark.conf.set("spark,sql.shuffle.partitions","5")

    // create a static frame
  val staticDataFrame = spark.read.format("csv")
    .option ("header","true")
    .option("inferschema","true")
    .load("/data/retail-data/by-day/*.csv")

    staticDataFrame.createOrReplaceTempView("retail_data")
    val staticFrame = staticDataFrame.schema

    staticDataFrame
      .selectExpr(
        "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
      .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
      .sum("total_cost")
      .sort(desc("sum(total_cost)"))
      .show(2)

    val streamingDataFrame = spark.readStream
      .schema(staticDataFrame.schema)
      .option("maxFilesPerTrigger", 1)
      .load("/data/retail-data/by-day/*.csv")

      println(streamingDataFrame.isStreaming)

  } // main

} // object







Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org
<http://www.backbutton.co.uk>

Re: spark.readStream.schema(??)

Posted by Zahid Rahman <za...@gmail.com>.
I found another bug.

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org
<http://www.backbutton.co.uk>


On Sat, 28 Mar 2020 at 02:12, Zahid Rahman <za...@gmail.com> wrote:

>
> I have sorted the error anyway because I am the best there is.
> It is downhill for me from here.
>
> There is no nobody using this email list anyway for anything. The email is
> a dead a dodo.
> probably because of people like you.
>
> *That is exactly what this email is for.*
> *It is not just for me to test your buggy software and report the bugs
> free of cost.*
>
> *and not get anything in return.*
>
> *Another words free consultancy for you because you now the software *
> *after spending years of your life while I am going to mastering in weeks.*
>
> Have we eaten something that disagrees with us today.
> Do you have a sore throat ?
> May be a little temperature ?
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> <http://www.backbutton.co.uk>
>
>
> On Sat, 28 Mar 2020 at 02:03, Sean Owen <sr...@gmail.com> wrote:
>
>> (this isn't an email list for user support)
>>
>> On Fri, Mar 27, 2020 at 8:32 PM Zahid Rahman <za...@gmail.com>
>> wrote:
>> >
>> > version: spark-3.0.0-preview2-bin-hadoop2.7
>> >
>> > The syntax checker objects to the following argument which is what I am
>> supposed to enter.
>> >
>> > .schema(staticSchema)
>> >
>> > However when I  provide the  following argument it works but I don't
>> think that is correct.
>> > What is the correct argument for this case ?
>> >
>> > import org.apache.spark.sql.SparkSession
>> > import org.apache.spark.sql.functions.{window,column,desc,col}
>> >
>> > object RetailData {
>> >
>> >
>> >   def main(args: Array[String]): Unit = {
>> >
>> >     // crete spark session
>> >     val spark = SparkSession.builder().master("spark://
>> 192.168.0.38:7077").appName("Retail Data").getOrCreate();
>> >     // set spark runtime  configuration
>> >     spark.conf.set("spark,sql.shuffle.partitions","5")
>> >
>> >     // create a static frame
>> >   val staticDataFrame = spark.read.format("csv")
>> >     .option ("header","true")
>> >     .option("inferschema","true")
>> >     .load("/data/retail-data/by-day/*.csv")
>> >
>> >     staticDataFrame.createOrReplaceTempView("retail_data")
>> >     val staticFrame = staticDataFrame.schema
>> >
>> >     staticDataFrame
>> >       .selectExpr(
>> >         "CustomerId","UnitPrice * Quantity as total_cost",
>> "InvoiceDate")
>> >       .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
>> >       .sum("total_cost")
>> >       .sort(desc("sum(total_cost)"))
>> >       .show(2)
>> >
>> >     val streamingDataFrame = spark.readStream
>> >       .schema(staticDataFrame.schema)
>> >       .option("maxFilesPerTrigger", 1)
>> >       .load("/data/retail-data/by-day/*.csv")
>> >
>> >       println(streamingDataFrame.isStreaming)
>> >
>> >   } // main
>> >
>> > } // object
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > Backbutton.co.uk
>> > ¯\_(ツ)_/¯
>> > ♡۶Java♡۶RMI ♡۶
>> > Make Use Method {MUM}
>> > makeuse.org
>>
>