You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Arun Manivannan <ar...@arunma.com> on 2018/11/05 23:29:14 UTC
Equivalent of emptyDataFrame in StructuredStreaming
Hi,
I would like to create a "zero" value for a Structured Streaming Dataframe
and unfortunately, I couldn't find any leads. With Spark batch, I can do a
"emptyDataFrame" or "createDataFrame" with "emptyRDD" but with
StructuredStreaming, I am lost.
If I use the "emptyDataFrame" as the zero value, I wouldn't be able to join
them with any other DataFrames in the program because Spark doesn't allow
you to mix batch and stream data frames. (isStreaming=false for the Batch
ones).
Any clue is greatly appreciated. Here are the alternatives that I have at
the moment.
*1. Reading from an empty file *
*Disadvantages : poll is expensive because it involves IO and it's error
prone in the sense that someone might accidentally update the file.*
val emptyErrorStream = (spark: SparkSession) => {
spark
.readStream
.format("csv")
.schema(DataErrorSchema)
.load("/Users/arunma/IdeaProjects/OSS/SparkDatalakeKitchenSink/src/test/resources/dummy1.txt")
.as[DataError]
}
*2. Use MemoryStream*
*Disadvantages: MemoryStream itself is not recommended for production
use because of the ability to mutate it but I am converting it to DS
immediately. So, I am leaning towards this at the moment. *
val emptyErrorStream = (spark:SparkSession) => {
implicit val sqlC = spark.sqlContext
MemoryStream[DataError].toDS()
}
Cheers,
Arun
Re: Equivalent of emptyDataFrame in StructuredStreaming
Posted by Arun Manivannan <ar...@arunma.com>.
Hi Jungtaek,
Sorry about the delay in my response and thanks a ton for responding.
I am just trying to build a data pipeline which has a bunch of stages. The
goal is to use a Dataset to accumulate the transformation errors that may
happen in the stages of the pipeline. As a benefit, I can pass only the
filtered Dataframe to the next stage.
The stages look something like this:
val pipelineStages = List(
new AddRowKeyStage(EvergreenSchema),
new WriteToHBaseStage(hBaseCatalog),
new ReplaceCharDataStage(DoubleColsReplaceMap, EvergreenSchema, DoubleCols),
new ReplaceCharDataStage(SpecialCharMap, EvergreenSchema, StringCols),
new DataTypeValidatorStage(EvergreenSchema),
new DataTypeCastStage(EvergreenSchema)
)
Each of the stage's implementation looks something like the following. Some
may return errors or some are just side-effecting. Say, the following stage
(AddRowKeyStage) just adds an UUID column to each row and therefore returns
an empty DataSet[Error]. A DataTypeValidatorStage on the other hand may
return a filled in DataSet[Errors] along with the filtered Dataframe value.
import cats.data.Writer
import com.thoughtworks.awayday.ingest.DataFrameOps
import com.thoughtworks.awayday.ingest.UDFs.generateUUID
import com.thoughtworks.awayday.ingest.models.ErrorModels.{DataError,
DataSetWithErrors}
import com.thoughtworks.awayday.ingest.stages.StageConstants.RowKey
import org.apache.spark.sql.{DataFrame, Encoder, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
class AddRowKeyStage(schemaWithRowKey: StructType)
(implicit spark: SparkSession, encoder: Encoder[DataError])
extends DataStage[DataFrame] {
override val stage: String = getClass.getSimpleName
def apply(dataRecords: DataFrame): DataSetWithErrors[DataFrame] =
addRowKeys(dataRecords)
def addRowKeys(data: DataFrame): DataSetWithErrors[DataFrame] = {
val colOrder = schemaWithRowKey.fields.map(_.name)
val withRowKeyDf = data.withColumn(RowKey, lit(generateUUID()))
val returnDf = withRowKeyDf.select(colOrder.map(col): _*)
Writer(DataFrameOps.emptyErrors(spark, encoder), returnDf)
}
}
For accumulating the errors at each stage, I am using a Writer monad from
the Cats library. I have made provisions that the combination of errors
happen automatically by implementing a Semigroup for Spark Dataset. This
way, I could do the following and have two Datasets (one for error and one
for value) when I start the stream.
val validRecordsWithErrors = pipelineStages.foldLeft(initDf) { case
(dfWithErrors, stage) =>
for {
df <- dfWithErrors
applied <- stage(df)
} yield applied
}
The validRecords is a combination of both transformation errors (left side)
and the dataframe of records that has successfully passed through the
stages (right)
Now, the tricky bit is this :
val initDf = Writer(*DataFrameOps.emptyErrorStream(spark)*, sourceRawDf)
The "zero" value of the fold and the error value for side-effecting stages
must be an empty stream. With Spark batch, I can always use an
"emptyDataFrame" but I have no clue on how to achieve this in Spark
streaming. Unfortunately, "emptyDataFrame" is not "isStreaming" and
therefore I won't be able to union the errors together.
I am sorry if I haven't done a good job in explaining it well.
Cheers,
Arun
On Tue, Nov 6, 2018 at 7:34 AM Jungtaek Lim <ka...@gmail.com> wrote:
> Could you explain what you're trying to do? It should have no batch for no
> data in stream, so it will end up to no-op even it is possible.
>
> - Jungtaek Lim (HeartSaVioR)
>
> 2018년 11월 6일 (화) 오전 8:29, Arun Manivannan <ar...@arunma.com>님이 작성:
>
>> Hi,
>>
>> I would like to create a "zero" value for a Structured Streaming
>> Dataframe and unfortunately, I couldn't find any leads. With Spark batch,
>> I can do a "emptyDataFrame" or "createDataFrame" with "emptyRDD" but with
>> StructuredStreaming, I am lost.
>>
>> If I use the "emptyDataFrame" as the zero value, I wouldn't be able to
>> join them with any other DataFrames in the program because Spark doesn't
>> allow you to mix batch and stream data frames. (isStreaming=false for the
>> Batch ones).
>>
>> Any clue is greatly appreciated. Here are the alternatives that I have at
>> the moment.
>>
>> *1. Reading from an empty file *
>> *Disadvantages : poll is expensive because it involves IO and it's error
>> prone in the sense that someone might accidentally update the file.*
>>
>> val emptyErrorStream = (spark: SparkSession) => {
>> spark
>> .readStream
>> .format("csv")
>> .schema(DataErrorSchema)
>> .load("/Users/arunma/IdeaProjects/OSS/SparkDatalakeKitchenSink/src/test/resources/dummy1.txt")
>> .as[DataError]
>> }
>>
>> *2. Use MemoryStream*
>>
>> *Disadvantages: MemoryStream itself is not recommended for production use because of the ability to mutate it but I am converting it to DS immediately. So, I am leaning towards this at the moment. *
>>
>>
>> val emptyErrorStream = (spark:SparkSession) => {
>> implicit val sqlC = spark.sqlContext
>> MemoryStream[DataError].toDS()
>> }
>>
>> Cheers,
>> Arun
>>
>
Re: Equivalent of emptyDataFrame in StructuredStreaming
Posted by Jungtaek Lim <ka...@gmail.com>.
Could you explain what you're trying to do? It should have no batch for no
data in stream, so it will end up to no-op even it is possible.
- Jungtaek Lim (HeartSaVioR)
2018년 11월 6일 (화) 오전 8:29, Arun Manivannan <ar...@arunma.com>님이 작성:
> Hi,
>
> I would like to create a "zero" value for a Structured Streaming Dataframe
> and unfortunately, I couldn't find any leads. With Spark batch, I can do a
> "emptyDataFrame" or "createDataFrame" with "emptyRDD" but with
> StructuredStreaming, I am lost.
>
> If I use the "emptyDataFrame" as the zero value, I wouldn't be able to
> join them with any other DataFrames in the program because Spark doesn't
> allow you to mix batch and stream data frames. (isStreaming=false for the
> Batch ones).
>
> Any clue is greatly appreciated. Here are the alternatives that I have at
> the moment.
>
> *1. Reading from an empty file *
> *Disadvantages : poll is expensive because it involves IO and it's error
> prone in the sense that someone might accidentally update the file.*
>
> val emptyErrorStream = (spark: SparkSession) => {
> spark
> .readStream
> .format("csv")
> .schema(DataErrorSchema)
> .load("/Users/arunma/IdeaProjects/OSS/SparkDatalakeKitchenSink/src/test/resources/dummy1.txt")
> .as[DataError]
> }
>
> *2. Use MemoryStream*
>
> *Disadvantages: MemoryStream itself is not recommended for production use because of the ability to mutate it but I am converting it to DS immediately. So, I am leaning towards this at the moment. *
>
>
> val emptyErrorStream = (spark:SparkSession) => {
> implicit val sqlC = spark.sqlContext
> MemoryStream[DataError].toDS()
> }
>
> Cheers,
> Arun
>