You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuriy Bondaruk (JIRA)" <ji...@apache.org> on 2018/01/31 17:47:02 UTC

[jira] [Created] (SPARK-23288) Incorrect number of written records in structured streaming

Yuriy Bondaruk created SPARK-23288:
--------------------------------------

             Summary: Incorrect number of written records in structured streaming
                 Key: SPARK-23288
                 URL: https://issues.apache.org/jira/browse/SPARK-23288
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.2.0
            Reporter: Yuriy Bondaruk


I'm using SparkListener.onTaskEnd() to capture input and output metrics but it seems that number of written records ('taskEnd.taskMetrics().outputMetrics().recordsWritten()') is incorrect. Here is my stream construction:

 
{code:java}
StreamingQuery writeStream = session
        .readStream()
        .schema(RecordSchema.fromClass(TestRecord.class))
        .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
        .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
        .csv(inputFolder.getRoot().toPath().toString())
        .as(Encoders.bean(TestRecord.class))
        .flatMap(
            ((FlatMapFunction<TestRecord, TestVendingRecord>) (u) -> {
                List<TestVendingRecord> resultIterable = new ArrayList<>();
                try {
                    TestVendingRecord result = transformer.convert(u);
                    resultIterable.add(result);
                } catch (Throwable t) {
                    System.err.println("Ooops");
                    t.printStackTrace();
                }

                return resultIterable.iterator();
            }),
            Encoders.bean(TestVendingRecord.class))
        .writeStream()
        .outputMode(OutputMode.Append())
        .format("parquet")
        .option("path", outputFolder.getRoot().toPath().toString())
        .option("checkpointLocation", checkpointFolder.getRoot().toPath().toString())
        .start();

    writeStream.processAllAvailable();
    writeStream.stop();
{code}
Tested it with one good and one bad (throwing exception in transformer.convert(u)) input records and it produces following metrics:

 
{code:java}
(TestMain.java:onTaskEnd(73)) - -----------status--> SUCCESS
(TestMain.java:onTaskEnd(75)) - -----------recordsWritten--> 0
(TestMain.java:onTaskEnd(76)) - -----------recordsRead-----> 2
(TestMain.java:onTaskEnd(83)) - taskEnd.taskInfo().accumulables():
(TestMain.java:onTaskEnd(84)) - name = duration total (min, med, max)
(TestMain.java:onTaskEnd(85)) - value =  323
(TestMain.java:onTaskEnd(84)) - name = number of output rows
(TestMain.java:onTaskEnd(85)) - value =  2
(TestMain.java:onTaskEnd(84)) - name = duration total (min, med, max)
(TestMain.java:onTaskEnd(85)) - value =  364
(TestMain.java:onTaskEnd(84)) - name = internal.metrics.input.recordsRead
(TestMain.java:onTaskEnd(85)) - value =  2
(TestMain.java:onTaskEnd(84)) - name = internal.metrics.input.bytesRead
(TestMain.java:onTaskEnd(85)) - value =  157
(TestMain.java:onTaskEnd(84)) - name = internal.metrics.resultSerializationTime
(TestMain.java:onTaskEnd(85)) - value =  3
(TestMain.java:onTaskEnd(84)) - name = internal.metrics.resultSize
(TestMain.java:onTaskEnd(85)) - value =  2396
(TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorCpuTime
(TestMain.java:onTaskEnd(85)) - value =  633807000
(TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorRunTime
(TestMain.java:onTaskEnd(85)) - value =  683
(TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorDeserializeCpuTime
(TestMain.java:onTaskEnd(85)) - value =  55662000
(TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorDeserializeTime
(TestMain.java:onTaskEnd(85)) - value =  58
(TestMain.java:onTaskEnd(89)) - input records 2

Streaming query made progress: {
  "id" : "1231f9cb-b2e8-4d10-804d-73d7826c1cb5",
  "runId" : "bd23b60c-93f9-4e17-b3bc-55403edce4e7",
  "name" : null,
  "timestamp" : "2018-01-26T14:44:05.362Z",
  "numInputRows" : 2,
  "processedRowsPerSecond" : 0.8163265306122448,
  "durationMs" : {
    "addBatch" : 1994,
    "getBatch" : 126,
    "getOffset" : 52,
    "queryPlanning" : 220,
    "triggerExecution" : 2450,
    "walCommit" : 41
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "FileStreamSource[file:/var/folders/4w/zks_kfls2s3glmrj3f725p7hllyb5_/T/junit3661035412295337071]",
    "startOffset" : null,
    "endOffset" : {
      "logOffset" : 0
    },
    "numInputRows" : 2,
    "processedRowsPerSecond" : 0.8163265306122448
  } ],
  "sink" : {
    "description" : "FileSink[/var/folders/4w/zks_kfls2s3glmrj3f725p7hllyb5_/T/junit3785605384928624065]"
  }
}
{code}
The number of inputs is correct but the number of output records taken from taskEnd.taskMetrics().outputMetrics().recordsWritten() is zero. Accumulables (taskEnd.taskInfo().accumulables()) don't have a correct value as well - should be 1 but it shows 2 'number of output rows'.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org