You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by JeffEvans <Je...@protonmail.com.INVALID> on 2020/05/07 00:17:51 UTC

Abstract of child object from Parent Object

@Jeff Evans

@Sean Owen

Both of these postings are examples of same object orientated concept.

They examples of extraction of child Object from Parent Object.

The difference is that when a Muslim asked he was told by Jeff Evans

"we are not here handhold  you."

“do a simple Google search”

“They're not being paid to handhold you and quickly answer to your every whim.”

COMPARATIVELY

BUT when the good Dr Mich Talebzadeh asked same. No humiliation or offensive comments.

No comments at all.

Hi,

Thank you all,

I am just thinking of passing that date 06/04/2020 12:03:43 and getting the correct format from the module. In effect

This date format yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSZ as pattern

in other words rather than new Date() pass "06/04/2020 12:03:43" as string

REgards,

Dr Mich Talebzadeh

val fixedStr = "2020-06-04T12:03:43";

val dt = new DateTime(fixedStr);

val jdkDate = dt.toDate();

val pattern3 = "dd yyyy MM HH:mm:ss.SSSSSSSSSZ";

val simpleDateFormat3 = (new SimpleDateFormat(pattern2, new Locale("en", "UK")));

val date3 = simpleDateFormat3.format(jdkDate);

System.out.println(date3);

On Sat, 28 Mar 2020, 15:50 Jeff Evans, <[hidden email]> wrote:

Dude, you really need to chill. Have you ever worked with a large open source project before? It seems not. Even so, insinuating there are tons of bugs that were left uncovered until you came along (despite the fact that the project is used by millions across many different organizations) is ludicrous. Learn a little bit of humility

If you're new to something, assume you have made a mistake rather than that there is a bug. Lurk a bit more, or even do a simple Google search, and you will realize Sean is a very senior committer (i.e. expert) in Spark, and has been for many years. He, and everyone else participating in these lists, is doing it voluntarily on their own time. They're not being paid to handhold you and quickly answer to your every whim.

As you can see from the code :

STEP 1: I create a object of type static frame which holds all the information to the datasource (csv files).

STEP 2: Then I create a variable called staticSchema assigning the information of the schema from the original static data frame.

STEP 3: then I create another variable called val streamingDataFrame of type spark.readStream.

and Into the .schema function parameters I pass the object staticSchema which is meant to hold the information to the csv files including the .load(path) function etc.

So then when I am creating val StreamingDataFrame and passing it .schema(staticSchema)

the variable StreamingDataFrame should have all the information.

I should only have to call .option("maxFilePerTrigger",1) and not .format ("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")

Otherwise what is the point of passing .schema(staticSchema) to StreamingDataFrame.

You can replicate it using the complete code below.

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {

def main(args: Array[String]): Unit = {

// create spark session

val spark = SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail Data").getOrCreate();

// set spark runtime configuration

spark.conf.set("spark.sql.shuffle.partitions","5")

spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")

// create a static frame

val staticDataFrame = spark.read.format("csv")

.option ("header","true")

.option("inferschema","true")

.load("/data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")

val staticSchema = staticDataFrame.schema

staticDataFrame

.selectExpr(

"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")

.groupBy(col("CustomerId"),

window(col("InvoiceDate"),

"1 day"))

.sum("total_cost")

.sort(desc("sum(total_cost)"))

.show(2)

val streamingDataFrame = spark.readStream

.schema(staticSchema)

.format("csv")

.option("maxFilesPerTrigger", 1)

.option("header","true")

.load("/data/retail-data/by-day/*.csv")

println(streamingDataFrame.isStreaming)

// lazy operation so we will need to call a streaming action to start the action

val purchaseByCustomerPerHour = streamingDataFrame

.selectExpr(

"CustomerId",

"(UnitPrice * Quantity) as total_cost",

"InvoiceDate")

.groupBy(

col("CustomerId"), window(col("InvoiceDate"), "1 day"))

.sum("total_cost")

// stream action to write to console

purchaseByCustomerPerHour.writeStream

.format("console")

.queryName("customer_purchases")

.outputMode("complete")

.start()

} // main

} // object