You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Abhijeet Kumar <ab...@sentienz.com> on 2018/11/27 08:15:31 UTC

Spark Streaming join taking long to process

Hi All,

I'm just practicing Spark Streaming with joining two different stream. I noticed that it's taking around 15 seconds for each record. Let me share the details and the code:



If you can see for stage id 2, it's taking 15 s. Isn't this strange.

Code:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.TimestampType
import org.apache.log4j.{Level, Logger}

object StreamJoin{

  val kafkaTopic1 = "demo2"
  val kafkaTopic2 = "demo3"
  val bootstrapServer = "localhost:9092"

  def main(args: Array[String]): Unit = {
    val checkPointDir = "hdfs://localhost:8020/checkpo"

    val spark = SparkSession.builder
      .appName("Argoid_Realtime_Pipeline")
      .master("local")
      .getOrCreate()

    val rootLogger = Logger.getRootLogger()
    rootLogger.setLevel(Level.ERROR)

    import spark.implicits._

    val df1 = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServer)
      .option("subscribe", kafkaTopic1)
      .option("failOnDataLoss", "false")
      .load()

    val df2 = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServer)
      .option("subscribe", kafkaTopic2)
      .option("failOnDataLoss", "false")
      .load()

    val order_details = df1
      .withColumn("s_order_id", get_json_object($"value".cast("String"), "$.order_id"))
      .withColumn("s_customer_id", get_json_object($"value".cast("String"), "$.customer_id"))
      .withColumn("s_promotion_id", get_json_object($"value".cast("String"), "$.promotion_id"))
      .withColumn("s_store_id", get_json_object($"value".cast("String"), "$.store_id"))
      .withColumn("s_product_id", get_json_object($"value".cast("String"), "$.product_id"))
      .withColumn("s_warehouse_id", get_json_object($"value".cast("String"), "$.warehouse_id"))
      .withColumn("unit_cost", get_json_object($"value".cast("String"), "$.unit_cost"))
      .withColumn("total_cost", get_json_object($"value".cast("String"), "$.total_cost"))
      .withColumn("units_sold", get_json_object($"value".cast("String"), "$.units_sold"))
      .withColumn("promotion_cost", get_json_object($"value".cast("String"), "$.promotion_cost"))
      .withColumn("date_of_order", get_json_object($"value".cast("String"), "$.date_of_order"))
      .withColumn("tstamp_trans", current_timestamp())
      .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))
      .select($"s_customer_id", $"s_order_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
        $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
        $"total_cost".cast("integer") as "total_cost", $"promotion_cost".cast("integer") as "promotion_cost",
        $"date_of_order", $"tstamp_trans", $"TIMESTAMP", $"units_sold".cast("integer") as "units_sold")

    val invoice_details = df2
      .withColumn("order_id", get_json_object($"value".cast("String"), "$.order_id"))
      .withColumn("invoice_status", get_json_object($"value".cast("String"), "$.invoice_status"))
      .where($"invoice_status" === "Success")

      .withColumn("tstamp_trans", current_timestamp())
      .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))


    val join_df = order_details
      .join(invoice_details, order_details.col("s_order_id") === invoice_details.col("order_id"))
      .select($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
        $"s_warehouse_id", $"unit_cost", $"total_cost",
        $"promotion_cost",
        $"date_of_order",
        $"units_sold" as "units_sold", $"order_id")

    join_df.writeStream
      .format("console")
      .option("truncate", false)
      .start()
      .awaitTermination()

  }
}

Thanks,
Abhijeet Kumar


Re: Spark Streaming join taking long to process

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
If you are using the same code to run on Yarn, I believe it’s still using
the local mode as it overwrites the master url set by CLI. You can check
the “executors” tab in the Spark UI to set how many executors are running,
and verify if it matches your config.
On Tue, Nov 27, 2018 at 6:17 AM Abhijeet Kumar <ab...@sentienz.com>
wrote:

> Yes, it did
> Thanks for the solution. I solved it locally, but I’m worried how I can do
> this when I’m using yarn because that same 15 Sec is taking on the yarn too
> :)
>
> On 27-Nov-2018, at 4:42 PM, Srikanth Sriram <sr...@gmail.com>
> wrote:
>
> Hello Abhijeet,
> I am not sure my answer will solve your problem but just think of the
> master configuration set for spark application.
>
> val spark = SparkSession.builder
>       .appName("Argoid_Realtime_Pipeline")
>       .master("local")
>       .getOrCreate()
>
> I see you have set it has "local" not as "local[*]".
>
> From other blog, i got this information, sharing you in full sentence:
> "We are going to work locally, running the application straight from our
> IDE, so we are setting the master to local[*], meaning we are creating as
> many threads as there are cores on the machine."
>
> Just check if this is reducing the time taken for processing, since by
> this local[*] we are going to use all cores available, not just one core?
>
> Regards,
> Sriram Srikanth
>
> On Tue, Nov 27, 2018 at 1:46 PM Abhijeet Kumar <
> abhijeet.kumar@sentienz.com> wrote:
>
>> Hi All,
>>
>> I'm just practicing Spark Streaming with joining two different stream. I
>> noticed that it's taking around 15 seconds for each record. Let me share
>> the details and the code:
>>
>> If you can see for stage id 2, it's taking 15 s. Isn't this strange.
>>
>> Code:
>>
>> import org.apache.spark.sql.SparkSession
>> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.streaming.Trigger
>> import org.apache.spark.sql.types.TimestampType
>> import org.apache.log4j.{Level, Logger}
>>
>> object StreamJoin{
>>
>>   val kafkaTopic1 = "demo2"
>>   val kafkaTopic2 = "demo3"
>>   val bootstrapServer = "localhost:9092"
>>
>>   def main(args: Array[String]): Unit = {
>>     val checkPointDir = "hdfs://localhost:8020/checkpo"
>>
>>     val spark = SparkSession.builder
>>       .appName("Argoid_Realtime_Pipeline")
>>       .master("local")
>>       .getOrCreate()
>>
>>     val rootLogger = Logger.getRootLogger()
>>     rootLogger.setLevel(Level.ERROR)
>>
>>     import spark.implicits._
>>
>>     val df1 = spark
>>       .readStream
>>       .format("kafka")
>>       .option("kafka.bootstrap.servers", bootstrapServer)
>>       .option("subscribe", kafkaTopic1)
>>       .option("failOnDataLoss", "false")
>>       .load()
>>
>>     val df2 = spark
>>       .readStream
>>       .format("kafka")
>>       .option("kafka.bootstrap.servers", bootstrapServer)
>>       .option("subscribe", kafkaTopic2)
>>       .option("failOnDataLoss", "false")
>>       .load()
>>
>>     val order_details = df1
>>       .withColumn("s_order_id", get_json_object($"value".cast("String"), "$.order_id"))
>>       .withColumn("s_customer_id", get_json_object($"value".cast("String"), "$.customer_id"))
>>       .withColumn("s_promotion_id", get_json_object($"value".cast("String"), "$.promotion_id"))
>>       .withColumn("s_store_id", get_json_object($"value".cast("String"), "$.store_id"))
>>       .withColumn("s_product_id", get_json_object($"value".cast("String"), "$.product_id"))
>>       .withColumn("s_warehouse_id", get_json_object($"value".cast("String"), "$.warehouse_id"))
>>       .withColumn("unit_cost", get_json_object($"value".cast("String"), "$.unit_cost"))
>>       .withColumn("total_cost", get_json_object($"value".cast("String"), "$.total_cost"))
>>       .withColumn("units_sold", get_json_object($"value".cast("String"), "$.units_sold"))
>>       .withColumn("promotion_cost", get_json_object($"value".cast("String"), "$.promotion_cost"))
>>       .withColumn("date_of_order", get_json_object($"value".cast("String"), "$.date_of_order"))
>>       .withColumn("tstamp_trans", current_timestamp())
>>       .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))
>>       .select($"s_customer_id", $"s_order_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
>>         $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
>>         $"total_cost".cast("integer") as "total_cost", $"promotion_cost".cast("integer") as "promotion_cost",
>>         $"date_of_order", $"tstamp_trans", $"TIMESTAMP", $"units_sold".cast("integer") as "units_sold")
>>
>>     val invoice_details = df2
>>       .withColumn("order_id", get_json_object($"value".cast("String"), "$.order_id"))
>>       .withColumn("invoice_status", get_json_object($"value".cast("String"), "$.invoice_status"))
>>       .where($"invoice_status" === "Success")
>>
>>       .withColumn("tstamp_trans", current_timestamp())
>>       .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))
>>
>>
>>     val join_df = order_details
>>       .join(invoice_details, order_details.col("s_order_id") === invoice_details.col("order_id"))
>>       .select($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
>>         $"s_warehouse_id", $"unit_cost", $"total_cost",
>>         $"promotion_cost",
>>         $"date_of_order",
>>         $"units_sold" as "units_sold", $"order_id")
>>
>>     join_df.writeStream
>>       .format("console")
>>       .option("truncate", false)
>>       .start()
>>       .awaitTermination()
>>
>>   }
>> }
>>
>>
>> Thanks,
>> Abhijeet Kumar
>>
>>
>
> --
> Regards,
> Srikanth Sriram
>
> <Screenshot 2018-11-27 at 1.24.47 PM.png><Screenshot 2018-11-27 at
> 1.24.39 PM.png><Screenshot 2018-11-27 at 1.24.39 PM.png><Screenshot
> 2018-11-27 at 1.24.39 PM.png>
>
>
>

Re: Spark Streaming join taking long to process

Posted by Abhijeet Kumar <ab...@sentienz.com>.
Yes, it did 
Thanks for the solution. I solved it locally, but I’m worried how I can do this when I’m using yarn because that same 15 Sec is taking on the yarn too :)

> On 27-Nov-2018, at 4:42 PM, Srikanth Sriram <sr...@gmail.com> wrote:
> 
> Hello Abhijeet,
> I am not sure my answer will solve your problem but just think of the master configuration set for spark application.
> val spark = SparkSession.builder
>       .appName("Argoid_Realtime_Pipeline")
>       .master("local")
>       .getOrCreate()
> I see you have set it has "local" not as "local[*]".
> 
> From other blog, i got this information, sharing you in full sentence:
> "We are going to work locally, running the application straight from our IDE, so we are setting the master to local[*], meaning we are creating as many threads as there are cores on the machine."
> 
> Just check if this is reducing the time taken for processing, since by this local[*] we are going to use all cores available, not just one core?
> 
> Regards,
> Sriram Srikanth
> 
> On Tue, Nov 27, 2018 at 1:46 PM Abhijeet Kumar <abhijeet.kumar@sentienz.com <ma...@sentienz.com>> wrote:
> Hi All,
> 
> I'm just practicing Spark Streaming with joining two different stream. I noticed that it's taking around 15 seconds for each record. Let me share the details and the code:
> 
> 
> 
> If you can see for stage id 2, it's taking 15 s. Isn't this strange.
> 
> Code:
> 
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.types.TimestampType
> import org.apache.log4j.{Level, Logger}
> 
> object StreamJoin{
> 
>   val kafkaTopic1 = "demo2"
>   val kafkaTopic2 = "demo3"
>   val bootstrapServer = "localhost:9092"
> 
>   def main(args: Array[String]): Unit = {
>     val checkPointDir = "hdfs://localhost:8020/checkpo <>"
> 
>     val spark = SparkSession.builder
>       .appName("Argoid_Realtime_Pipeline")
>       .master("local")
>       .getOrCreate()
> 
>     val rootLogger = Logger.getRootLogger()
>     rootLogger.setLevel(Level.ERROR)
> 
>     import spark.implicits._
> 
>     val df1 = spark
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", bootstrapServer)
>       .option("subscribe", kafkaTopic1)
>       .option("failOnDataLoss", "false")
>       .load()
> 
>     val df2 = spark
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", bootstrapServer)
>       .option("subscribe", kafkaTopic2)
>       .option("failOnDataLoss", "false")
>       .load()
> 
>     val order_details = df1
>       .withColumn("s_order_id", get_json_object($"value".cast("String"), "$.order_id"))
>       .withColumn("s_customer_id", get_json_object($"value".cast("String"), "$.customer_id"))
>       .withColumn("s_promotion_id", get_json_object($"value".cast("String"), "$.promotion_id"))
>       .withColumn("s_store_id", get_json_object($"value".cast("String"), "$.store_id"))
>       .withColumn("s_product_id", get_json_object($"value".cast("String"), "$.product_id"))
>       .withColumn("s_warehouse_id", get_json_object($"value".cast("String"), "$.warehouse_id"))
>       .withColumn("unit_cost", get_json_object($"value".cast("String"), "$.unit_cost"))
>       .withColumn("total_cost", get_json_object($"value".cast("String"), "$.total_cost"))
>       .withColumn("units_sold", get_json_object($"value".cast("String"), "$.units_sold"))
>       .withColumn("promotion_cost", get_json_object($"value".cast("String"), "$.promotion_cost"))
>       .withColumn("date_of_order", get_json_object($"value".cast("String"), "$.date_of_order"))
>       .withColumn("tstamp_trans", current_timestamp())
>       .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))
>       .select($"s_customer_id", $"s_order_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
>         $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
>         $"total_cost".cast("integer") as "total_cost", $"promotion_cost".cast("integer") as "promotion_cost",
>         $"date_of_order", $"tstamp_trans", $"TIMESTAMP", $"units_sold".cast("integer") as "units_sold")
> 
>     val invoice_details = df2
>       .withColumn("order_id", get_json_object($"value".cast("String"), "$.order_id"))
>       .withColumn("invoice_status", get_json_object($"value".cast("String"), "$.invoice_status"))
>       .where($"invoice_status" === "Success")
> 
>       .withColumn("tstamp_trans", current_timestamp())
>       .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))
> 
> 
>     val join_df = order_details
>       .join(invoice_details, order_details.col("s_order_id") === invoice_details.col("order_id"))
>       .select($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
>         $"s_warehouse_id", $"unit_cost", $"total_cost",
>         $"promotion_cost",
>         $"date_of_order",
>         $"units_sold" as "units_sold", $"order_id")
> 
>     join_df.writeStream
>       .format("console")
>       .option("truncate", false)
>       .start()
>       .awaitTermination()
> 
>   }
> }
> 
> Thanks,
> Abhijeet Kumar
> 
> 
> 
> -- 
> Regards, 
> Srikanth Sriram
> <Screenshot 2018-11-27 at 1.24.47 PM.png><Screenshot 2018-11-27 at 1.24.39 PM.png><Screenshot 2018-11-27 at 1.24.39 PM.png><Screenshot 2018-11-27 at 1.24.39 PM.png>


Re: Spark Streaming join taking long to process

Posted by Srikanth Sriram <sr...@gmail.com>.
Hello Abhijeet,
I am not sure my answer will solve your problem but just think of the
master configuration set for spark application.

val spark = SparkSession.builder
      .appName("Argoid_Realtime_Pipeline")
      .master("local")
      .getOrCreate()

I see you have set it has "local" not as "local[*]".

From other blog, i got this information, sharing you in full sentence:
"We are going to work locally, running the application straight from our
IDE, so we are setting the master to local[*], meaning we are creating as
many threads as there are cores on the machine."

Just check if this is reducing the time taken for processing, since by this
local[*] we are going to use all cores available, not just one core?

Regards,
Sriram Srikanth

On Tue, Nov 27, 2018 at 1:46 PM Abhijeet Kumar <ab...@sentienz.com>
wrote:

> Hi All,
>
> I'm just practicing Spark Streaming with joining two different stream. I
> noticed that it's taking around 15 seconds for each record. Let me share
> the details and the code:
>
> If you can see for stage id 2, it's taking 15 s. Isn't this strange.
>
> Code:
>
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.types.TimestampType
> import org.apache.log4j.{Level, Logger}
>
> object StreamJoin{
>
>   val kafkaTopic1 = "demo2"
>   val kafkaTopic2 = "demo3"
>   val bootstrapServer = "localhost:9092"
>
>   def main(args: Array[String]): Unit = {
>     val checkPointDir = "hdfs://localhost:8020/checkpo"
>
>     val spark = SparkSession.builder
>       .appName("Argoid_Realtime_Pipeline")
>       .master("local")
>       .getOrCreate()
>
>     val rootLogger = Logger.getRootLogger()
>     rootLogger.setLevel(Level.ERROR)
>
>     import spark.implicits._
>
>     val df1 = spark
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", bootstrapServer)
>       .option("subscribe", kafkaTopic1)
>       .option("failOnDataLoss", "false")
>       .load()
>
>     val df2 = spark
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", bootstrapServer)
>       .option("subscribe", kafkaTopic2)
>       .option("failOnDataLoss", "false")
>       .load()
>
>     val order_details = df1
>       .withColumn("s_order_id", get_json_object($"value".cast("String"), "$.order_id"))
>       .withColumn("s_customer_id", get_json_object($"value".cast("String"), "$.customer_id"))
>       .withColumn("s_promotion_id", get_json_object($"value".cast("String"), "$.promotion_id"))
>       .withColumn("s_store_id", get_json_object($"value".cast("String"), "$.store_id"))
>       .withColumn("s_product_id", get_json_object($"value".cast("String"), "$.product_id"))
>       .withColumn("s_warehouse_id", get_json_object($"value".cast("String"), "$.warehouse_id"))
>       .withColumn("unit_cost", get_json_object($"value".cast("String"), "$.unit_cost"))
>       .withColumn("total_cost", get_json_object($"value".cast("String"), "$.total_cost"))
>       .withColumn("units_sold", get_json_object($"value".cast("String"), "$.units_sold"))
>       .withColumn("promotion_cost", get_json_object($"value".cast("String"), "$.promotion_cost"))
>       .withColumn("date_of_order", get_json_object($"value".cast("String"), "$.date_of_order"))
>       .withColumn("tstamp_trans", current_timestamp())
>       .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))
>       .select($"s_customer_id", $"s_order_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
>         $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
>         $"total_cost".cast("integer") as "total_cost", $"promotion_cost".cast("integer") as "promotion_cost",
>         $"date_of_order", $"tstamp_trans", $"TIMESTAMP", $"units_sold".cast("integer") as "units_sold")
>
>     val invoice_details = df2
>       .withColumn("order_id", get_json_object($"value".cast("String"), "$.order_id"))
>       .withColumn("invoice_status", get_json_object($"value".cast("String"), "$.invoice_status"))
>       .where($"invoice_status" === "Success")
>
>       .withColumn("tstamp_trans", current_timestamp())
>       .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))
>
>
>     val join_df = order_details
>       .join(invoice_details, order_details.col("s_order_id") === invoice_details.col("order_id"))
>       .select($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
>         $"s_warehouse_id", $"unit_cost", $"total_cost",
>         $"promotion_cost",
>         $"date_of_order",
>         $"units_sold" as "units_sold", $"order_id")
>
>     join_df.writeStream
>       .format("console")
>       .option("truncate", false)
>       .start()
>       .awaitTermination()
>
>   }
> }
>
>
> Thanks,
> Abhijeet Kumar
>
>

-- 
Regards,
Srikanth Sriram