You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Zahid Rahman <za...@gmail.com> on 2020/03/28 01:32:17 UTC
spark.readStream.schema(??)
version: spark-3.0.0-preview2-bin-hadoop2.7
The syntax checker objects to the following argument which is what I am
supposed to enter.
.schema(staticSchema)
However when I provide the following argument it works but I don't think
that is correct.
What is the correct argument for this case ?
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}
object RetailData {
def main(args: Array[String]): Unit = {
// crete spark session
val spark =
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail
Data").getOrCreate();
// set spark runtime configuration
spark.conf.set("spark,sql.shuffle.partitions","5")
// create a static frame
val staticDataFrame = spark.read.format("csv")
.option ("header","true")
.option("inferschema","true")
.load("/data/retail-data/by-day/*.csv")
staticDataFrame.createOrReplaceTempView("retail_data")
val staticFrame = staticDataFrame.schema
staticDataFrame
.selectExpr(
"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
.groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")
.sort(desc("sum(total_cost)"))
.show(2)
val streamingDataFrame = spark.readStream
.schema(staticDataFrame.schema)
.option("maxFilesPerTrigger", 1)
.load("/data/retail-data/by-day/*.csv")
println(streamingDataFrame.isStreaming)
} // main
} // object
Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org
<http://www.backbutton.co.uk>
Re: spark.readStream.schema(??)
Posted by Zahid Rahman <za...@gmail.com>.
I found another bug.
Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org
<http://www.backbutton.co.uk>
On Sat, 28 Mar 2020 at 02:12, Zahid Rahman <za...@gmail.com> wrote:
>
> I have sorted the error anyway because I am the best there is.
> It is downhill for me from here.
>
> There is no nobody using this email list anyway for anything. The email is
> a dead a dodo.
> probably because of people like you.
>
> *That is exactly what this email is for.*
> *It is not just for me to test your buggy software and report the bugs
> free of cost.*
>
> *and not get anything in return.*
>
> *Another words free consultancy for you because you now the software *
> *after spending years of your life while I am going to mastering in weeks.*
>
> Have we eaten something that disagrees with us today.
> Do you have a sore throat ?
> May be a little temperature ?
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> <http://www.backbutton.co.uk>
>
>
> On Sat, 28 Mar 2020 at 02:03, Sean Owen <sr...@gmail.com> wrote:
>
>> (this isn't an email list for user support)
>>
>> On Fri, Mar 27, 2020 at 8:32 PM Zahid Rahman <za...@gmail.com>
>> wrote:
>> >
>> > version: spark-3.0.0-preview2-bin-hadoop2.7
>> >
>> > The syntax checker objects to the following argument which is what I am
>> supposed to enter.
>> >
>> > .schema(staticSchema)
>> >
>> > However when I provide the following argument it works but I don't
>> think that is correct.
>> > What is the correct argument for this case ?
>> >
>> > import org.apache.spark.sql.SparkSession
>> > import org.apache.spark.sql.functions.{window,column,desc,col}
>> >
>> > object RetailData {
>> >
>> >
>> > def main(args: Array[String]): Unit = {
>> >
>> > // crete spark session
>> > val spark = SparkSession.builder().master("spark://
>> 192.168.0.38:7077").appName("Retail Data").getOrCreate();
>> > // set spark runtime configuration
>> > spark.conf.set("spark,sql.shuffle.partitions","5")
>> >
>> > // create a static frame
>> > val staticDataFrame = spark.read.format("csv")
>> > .option ("header","true")
>> > .option("inferschema","true")
>> > .load("/data/retail-data/by-day/*.csv")
>> >
>> > staticDataFrame.createOrReplaceTempView("retail_data")
>> > val staticFrame = staticDataFrame.schema
>> >
>> > staticDataFrame
>> > .selectExpr(
>> > "CustomerId","UnitPrice * Quantity as total_cost",
>> "InvoiceDate")
>> > .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
>> > .sum("total_cost")
>> > .sort(desc("sum(total_cost)"))
>> > .show(2)
>> >
>> > val streamingDataFrame = spark.readStream
>> > .schema(staticDataFrame.schema)
>> > .option("maxFilesPerTrigger", 1)
>> > .load("/data/retail-data/by-day/*.csv")
>> >
>> > println(streamingDataFrame.isStreaming)
>> >
>> > } // main
>> >
>> > } // object
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > Backbutton.co.uk
>> > ¯\_(ツ)_/¯
>> > ♡۶Java♡۶RMI ♡۶
>> > Make Use Method {MUM}
>> > makeuse.org
>>
>