You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/07/29 03:27:00 UTC

[jira] [Resolved] (SPARK-28546) Why does the File Sink operation of Spark 2.4 Structured Streaming include double-level version validation?

     [ https://issues.apache.org/jira/browse/SPARK-28546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-28546.
----------------------------------
    Resolution: Invalid

> Why does the File Sink operation of Spark 2.4 Structured Streaming include double-level version validation?
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-28546
>                 URL: https://issues.apache.org/jira/browse/SPARK-28546
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>         Environment: Spark 2.4
> Structured Streaming
>            Reporter: tommy duan
>            Priority: Major
>
> My code is as follows:
> {code:java}
> Dataset<Row> dataset = this.sparkSession.readStream().format("kafka")
>  .options(this.getSparkKafkaCommonOptions(sparkSession)) 
>  .option("kafka.bootstrap.servers", "192.168.1.1:9092,192.168.1.2:9092")
>  .option("subscribe", "myTopic1,myTopic2")
>  .option("startingOffsets", "earliest")
>  .load();{code}
> {code:java}
> String mdtTempView = "mybasetemp";
>  ExpressionEncoder<Row> Rowencoder = this.getSchemaEncoder(new Schema.Parser().parse(baseschema.getValue())); 
>  Dataset<Row> parseVal = dataset.select("value").as(Encoders.BINARY())
>  .map(new MapFunction<Row>(){
>  ....
>  }, Rowencoder)
>  .createOrReplaceGlobalTempView(mdtTempView);
>  
>  Dataset<Row> queryResult = this.sparkSession.sql("select 。。。 from global_temp." + mdtTempView + " where start_time<>\"\"");
>  String savePath= "/user/dx/streaming/data/testapp"; 
>  String checkpointLocation= "/user/dx/streaming/checkpoint/testapp";
>  StreamingQuery query = queryResult.writeStream().format("parquet")
>  .option("path", savePath)
>  .option("checkpointLocation", checkpointLocation)
>  .partitionBy("month", "day", "hour")
>  .outputMode(OutputMode.Append())
>  .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
>  .start();
> try {
>  query.awaitTermination();
>  } catch (StreamingQueryException e) {
>  e.printStackTrace();
>  }
> {code}
>  
> 1) When I first ran it, I found that app could run normally.
> 2) Then, for some reason, I deleted the checkpoint directory of structured streaming and did not delete the savepath of sink file which saves HDFS files.
> 3) Then restart app, at which time only executor was assigned after app started, and no tasks were assigned. In the log, I found the print message: "INFO streaming. FileStream Sink: Skipping already committed batch 72". Later I looked at the source code and found that the log was from [https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L108]
> 4) The 3) situation lasts for several hours before the DAGScheduler is triggered to divide the DAG, submitStages, submitTasks, and tasks are assigned to the executor.
> Later, I read the [https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala] code carefully, and realized that in FileStreamSink, a log would be included under savepath/_spark_metadata, if the current batchId<=log. getLatest () will skip saving and output the log directly: logInfo (s "Skipping already committed batch $batchId").
>  
> {code:java}
> class FileStreamSink(
>  sparkSession: SparkSession,
>  path: String,
>  fileFormat: FileFormat,
>  partitionColumnNames: Seq[String],
>  options: Map[String, String]) extends Sink with Logging {
>  private val basePath = new Path(path)
>  private val logPath = new Path(basePath, FileStreamSink.metadataDir)
>  private val fileLog =
>  new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString)
>  
>  override def addBatch(batchId: Long, data: DataFrame): Unit = {
>    if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
>      logInfo(s"Skipping already committed batch $batchId")
>    } else {
>      // save file to hdfs
>    }
>  }
>  //...
> }
> {code}
>  
> I think that since checkpoint is used, all information control rights should be given to checkpoint, and there should not be a batchId log information record.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org