You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Arun Manivannan <ar...@arunma.com> on 2018/11/05 23:29:14 UTC

Equivalent of emptyDataFrame in StructuredStreaming

Hi,

I would like to create a "zero" value for a Structured Streaming Dataframe
and unfortunately, I couldn't find any leads.  With Spark batch, I can do a
"emptyDataFrame" or "createDataFrame" with "emptyRDD" but with
StructuredStreaming, I am lost.

If I use the "emptyDataFrame" as the zero value, I wouldn't be able to join
them with any other DataFrames in the program because Spark doesn't allow
you to mix batch and stream data frames. (isStreaming=false for the Batch
ones).

Any clue is greatly appreciated. Here are the alternatives that I have at
the moment.

*1. Reading from an empty file *
*Disadvantages : poll is expensive because it involves IO and it's error
prone in the sense that someone might accidentally update the file.*

val emptyErrorStream = (spark: SparkSession) => {
  spark
    .readStream
    .format("csv")
    .schema(DataErrorSchema)
    .load("/Users/arunma/IdeaProjects/OSS/SparkDatalakeKitchenSink/src/test/resources/dummy1.txt")
    .as[DataError]
}

*2. Use MemoryStream*

*Disadvantages: MemoryStream itself is not recommended for production
use because of the ability to mutate it but I am converting it to DS
immediately. So, I am leaning towards this at the moment. *


val emptyErrorStream = (spark:SparkSession) => {
  implicit val sqlC = spark.sqlContext
  MemoryStream[DataError].toDS()
}

Cheers,
Arun

Re: Equivalent of emptyDataFrame in StructuredStreaming

Posted by Arun Manivannan <ar...@arunma.com>.
Hi Jungtaek,

Sorry about the delay in my response and thanks a ton for responding.

I am just trying to build a data pipeline which has a bunch of stages. The
goal is to use a Dataset to accumulate the transformation errors that may
happen in the stages of the pipeline.  As a benefit, I can pass only the
filtered Dataframe to the next stage.

The stages look something like this:

val pipelineStages = List(
  new AddRowKeyStage(EvergreenSchema),
  new WriteToHBaseStage(hBaseCatalog),
  new ReplaceCharDataStage(DoubleColsReplaceMap, EvergreenSchema, DoubleCols),
  new ReplaceCharDataStage(SpecialCharMap, EvergreenSchema, StringCols),
  new DataTypeValidatorStage(EvergreenSchema),
  new DataTypeCastStage(EvergreenSchema)
)

Each of the stage's implementation looks something like the following. Some
may return errors or some are just side-effecting. Say, the following stage
(AddRowKeyStage) just adds an UUID column to each row and therefore returns
an empty DataSet[Error]. A DataTypeValidatorStage on the other hand may
return a filled in DataSet[Errors] along with the filtered Dataframe value.


import cats.data.Writer
import com.thoughtworks.awayday.ingest.DataFrameOps
import com.thoughtworks.awayday.ingest.UDFs.generateUUID
import com.thoughtworks.awayday.ingest.models.ErrorModels.{DataError,
DataSetWithErrors}
import com.thoughtworks.awayday.ingest.stages.StageConstants.RowKey
import org.apache.spark.sql.{DataFrame, Encoder, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
class AddRowKeyStage(schemaWithRowKey: StructType)
(implicit spark: SparkSession, encoder: Encoder[DataError])
extends DataStage[DataFrame] {
override val stage: String = getClass.getSimpleName
def apply(dataRecords: DataFrame): DataSetWithErrors[DataFrame] =
addRowKeys(dataRecords)
def addRowKeys(data: DataFrame): DataSetWithErrors[DataFrame] = {
val colOrder = schemaWithRowKey.fields.map(_.name)
val withRowKeyDf = data.withColumn(RowKey, lit(generateUUID()))
val returnDf = withRowKeyDf.select(colOrder.map(col): _*)
Writer(DataFrameOps.emptyErrors(spark, encoder), returnDf)
}
}


For accumulating the errors at each stage, I am using a Writer monad from
the Cats library.  I have made provisions that the combination of errors
happen automatically by implementing a Semigroup for Spark Dataset.  This
way, I could do the following and have two Datasets (one for error and one
for value) when I start the stream.

val validRecordsWithErrors = pipelineStages.foldLeft(initDf) { case
(dfWithErrors, stage) =>
  for {
    df <- dfWithErrors
    applied <- stage(df)
  } yield applied
}

The validRecords is a combination of both transformation errors (left side)
and the dataframe of records that has successfully passed through the
stages (right)

Now, the tricky bit is this :

val initDf = Writer(*DataFrameOps.emptyErrorStream(spark)*, sourceRawDf)

The "zero" value of the fold and the error value for side-effecting stages
must be an empty stream. With Spark batch, I can always use an
"emptyDataFrame" but I have no clue on how to achieve this in Spark
streaming.  Unfortunately, "emptyDataFrame"  is not "isStreaming" and
therefore I won't be able to union the errors together.

I am sorry if I haven't done a good job in explaining it well.

Cheers,
Arun



On Tue, Nov 6, 2018 at 7:34 AM Jungtaek Lim <ka...@gmail.com> wrote:

> Could you explain what you're trying to do? It should have no batch for no
> data in stream, so it will end up to no-op even it is possible.
>
> - Jungtaek Lim (HeartSaVioR)
>
> 2018년 11월 6일 (화) 오전 8:29, Arun Manivannan <ar...@arunma.com>님이 작성:
>
>> Hi,
>>
>> I would like to create a "zero" value for a Structured Streaming
>> Dataframe and unfortunately, I couldn't find any leads.  With Spark batch,
>> I can do a "emptyDataFrame" or "createDataFrame" with "emptyRDD" but with
>> StructuredStreaming, I am lost.
>>
>> If I use the "emptyDataFrame" as the zero value, I wouldn't be able to
>> join them with any other DataFrames in the program because Spark doesn't
>> allow you to mix batch and stream data frames. (isStreaming=false for the
>> Batch ones).
>>
>> Any clue is greatly appreciated. Here are the alternatives that I have at
>> the moment.
>>
>> *1. Reading from an empty file *
>> *Disadvantages : poll is expensive because it involves IO and it's error
>> prone in the sense that someone might accidentally update the file.*
>>
>> val emptyErrorStream = (spark: SparkSession) => {
>>   spark
>>     .readStream
>>     .format("csv")
>>     .schema(DataErrorSchema)
>>     .load("/Users/arunma/IdeaProjects/OSS/SparkDatalakeKitchenSink/src/test/resources/dummy1.txt")
>>     .as[DataError]
>> }
>>
>> *2. Use MemoryStream*
>>
>> *Disadvantages: MemoryStream itself is not recommended for production use because of the ability to mutate it but I am converting it to DS immediately. So, I am leaning towards this at the moment. *
>>
>>
>> val emptyErrorStream = (spark:SparkSession) => {
>>   implicit val sqlC = spark.sqlContext
>>   MemoryStream[DataError].toDS()
>> }
>>
>> Cheers,
>> Arun
>>
>

Re: Equivalent of emptyDataFrame in StructuredStreaming

Posted by Jungtaek Lim <ka...@gmail.com>.
Could you explain what you're trying to do? It should have no batch for no
data in stream, so it will end up to no-op even it is possible.

- Jungtaek Lim (HeartSaVioR)

2018년 11월 6일 (화) 오전 8:29, Arun Manivannan <ar...@arunma.com>님이 작성:

> Hi,
>
> I would like to create a "zero" value for a Structured Streaming Dataframe
> and unfortunately, I couldn't find any leads.  With Spark batch, I can do a
> "emptyDataFrame" or "createDataFrame" with "emptyRDD" but with
> StructuredStreaming, I am lost.
>
> If I use the "emptyDataFrame" as the zero value, I wouldn't be able to
> join them with any other DataFrames in the program because Spark doesn't
> allow you to mix batch and stream data frames. (isStreaming=false for the
> Batch ones).
>
> Any clue is greatly appreciated. Here are the alternatives that I have at
> the moment.
>
> *1. Reading from an empty file *
> *Disadvantages : poll is expensive because it involves IO and it's error
> prone in the sense that someone might accidentally update the file.*
>
> val emptyErrorStream = (spark: SparkSession) => {
>   spark
>     .readStream
>     .format("csv")
>     .schema(DataErrorSchema)
>     .load("/Users/arunma/IdeaProjects/OSS/SparkDatalakeKitchenSink/src/test/resources/dummy1.txt")
>     .as[DataError]
> }
>
> *2. Use MemoryStream*
>
> *Disadvantages: MemoryStream itself is not recommended for production use because of the ability to mutate it but I am converting it to DS immediately. So, I am leaning towards this at the moment. *
>
>
> val emptyErrorStream = (spark:SparkSession) => {
>   implicit val sqlC = spark.sqlContext
>   MemoryStream[DataError].toDS()
> }
>
> Cheers,
> Arun
>