You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/07/29 03:27:00 UTC
[jira] [Resolved] (SPARK-28546) Why does the File Sink operation of
Spark 2.4 Structured Streaming include double-level version validation?
[ https://issues.apache.org/jira/browse/SPARK-28546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-28546.
----------------------------------
Resolution: Invalid
> Why does the File Sink operation of Spark 2.4 Structured Streaming include double-level version validation?
> -----------------------------------------------------------------------------------------------------------
>
> Key: SPARK-28546
> URL: https://issues.apache.org/jira/browse/SPARK-28546
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.4.0
> Environment: Spark 2.4
> Structured Streaming
> Reporter: tommy duan
> Priority: Major
>
> My code is as follows:
> {code:java}
> Dataset<Row> dataset = this.sparkSession.readStream().format("kafka")
> .options(this.getSparkKafkaCommonOptions(sparkSession))
> .option("kafka.bootstrap.servers", "192.168.1.1:9092,192.168.1.2:9092")
> .option("subscribe", "myTopic1,myTopic2")
> .option("startingOffsets", "earliest")
> .load();{code}
> {code:java}
> String mdtTempView = "mybasetemp";
> ExpressionEncoder<Row> Rowencoder = this.getSchemaEncoder(new Schema.Parser().parse(baseschema.getValue()));
> Dataset<Row> parseVal = dataset.select("value").as(Encoders.BINARY())
> .map(new MapFunction<Row>(){
> ....
> }, Rowencoder)
> .createOrReplaceGlobalTempView(mdtTempView);
>
> Dataset<Row> queryResult = this.sparkSession.sql("select 。。。 from global_temp." + mdtTempView + " where start_time<>\"\"");
> String savePath= "/user/dx/streaming/data/testapp";
> String checkpointLocation= "/user/dx/streaming/checkpoint/testapp";
> StreamingQuery query = queryResult.writeStream().format("parquet")
> .option("path", savePath)
> .option("checkpointLocation", checkpointLocation)
> .partitionBy("month", "day", "hour")
> .outputMode(OutputMode.Append())
> .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
> .start();
> try {
> query.awaitTermination();
> } catch (StreamingQueryException e) {
> e.printStackTrace();
> }
> {code}
>
> 1) When I first ran it, I found that app could run normally.
> 2) Then, for some reason, I deleted the checkpoint directory of structured streaming and did not delete the savepath of sink file which saves HDFS files.
> 3) Then restart app, at which time only executor was assigned after app started, and no tasks were assigned. In the log, I found the print message: "INFO streaming. FileStream Sink: Skipping already committed batch 72". Later I looked at the source code and found that the log was from [https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L108]
> 4) The 3) situation lasts for several hours before the DAGScheduler is triggered to divide the DAG, submitStages, submitTasks, and tasks are assigned to the executor.
> Later, I read the [https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala] code carefully, and realized that in FileStreamSink, a log would be included under savepath/_spark_metadata, if the current batchId<=log. getLatest () will skip saving and output the log directly: logInfo (s "Skipping already committed batch $batchId").
>
> {code:java}
> class FileStreamSink(
> sparkSession: SparkSession,
> path: String,
> fileFormat: FileFormat,
> partitionColumnNames: Seq[String],
> options: Map[String, String]) extends Sink with Logging {
> private val basePath = new Path(path)
> private val logPath = new Path(basePath, FileStreamSink.metadataDir)
> private val fileLog =
> new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString)
>
> override def addBatch(batchId: Long, data: DataFrame): Unit = {
> if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
> logInfo(s"Skipping already committed batch $batchId")
> } else {
> // save file to hdfs
> }
> }
> //...
> }
> {code}
>
> I think that since checkpoint is used, all information control rights should be given to checkpoint, and there should not be a batchId log information record.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org