You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2018/02/20 10:25:00 UTC
[jira] [Assigned] (SPARK-23288) Incorrect number of written records
in structured streaming
[ https://issues.apache.org/jira/browse/SPARK-23288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-23288:
------------------------------------
Assignee: Apache Spark
> Incorrect number of written records in structured streaming
> -----------------------------------------------------------
>
> Key: SPARK-23288
> URL: https://issues.apache.org/jira/browse/SPARK-23288
> Project: Spark
> Issue Type: Bug
> Components: SQL, Structured Streaming
> Affects Versions: 2.2.0
> Reporter: Yuriy Bondaruk
> Assignee: Apache Spark
> Priority: Major
> Labels: Metrics, metrics
>
> I'm using SparkListener.onTaskEnd() to capture input and output metrics but it seems that number of written records ('taskEnd.taskMetrics().outputMetrics().recordsWritten()') is incorrect. Here is my stream construction:
>
> {code:java}
> StreamingQuery writeStream = session
> .readStream()
> .schema(RecordSchema.fromClass(TestRecord.class))
> .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
> .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
> .csv(inputFolder.getRoot().toPath().toString())
> .as(Encoders.bean(TestRecord.class))
> .flatMap(
> ((FlatMapFunction<TestRecord, TestVendingRecord>) (u) -> {
> List<TestVendingRecord> resultIterable = new ArrayList<>();
> try {
> TestVendingRecord result = transformer.convert(u);
> resultIterable.add(result);
> } catch (Throwable t) {
> System.err.println("Ooops");
> t.printStackTrace();
> }
> return resultIterable.iterator();
> }),
> Encoders.bean(TestVendingRecord.class))
> .writeStream()
> .outputMode(OutputMode.Append())
> .format("parquet")
> .option("path", outputFolder.getRoot().toPath().toString())
> .option("checkpointLocation", checkpointFolder.getRoot().toPath().toString())
> .start();
> writeStream.processAllAvailable();
> writeStream.stop();
> {code}
> Tested it with one good and one bad (throwing exception in transformer.convert(u)) input records and it produces following metrics:
>
> {code:java}
> (TestMain.java:onTaskEnd(73)) - -----------status--> SUCCESS
> (TestMain.java:onTaskEnd(75)) - -----------recordsWritten--> 0
> (TestMain.java:onTaskEnd(76)) - -----------recordsRead-----> 2
> (TestMain.java:onTaskEnd(83)) - taskEnd.taskInfo().accumulables():
> (TestMain.java:onTaskEnd(84)) - name = duration total (min, med, max)
> (TestMain.java:onTaskEnd(85)) - value = 323
> (TestMain.java:onTaskEnd(84)) - name = number of output rows
> (TestMain.java:onTaskEnd(85)) - value = 2
> (TestMain.java:onTaskEnd(84)) - name = duration total (min, med, max)
> (TestMain.java:onTaskEnd(85)) - value = 364
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.input.recordsRead
> (TestMain.java:onTaskEnd(85)) - value = 2
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.input.bytesRead
> (TestMain.java:onTaskEnd(85)) - value = 157
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.resultSerializationTime
> (TestMain.java:onTaskEnd(85)) - value = 3
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.resultSize
> (TestMain.java:onTaskEnd(85)) - value = 2396
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorCpuTime
> (TestMain.java:onTaskEnd(85)) - value = 633807000
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorRunTime
> (TestMain.java:onTaskEnd(85)) - value = 683
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorDeserializeCpuTime
> (TestMain.java:onTaskEnd(85)) - value = 55662000
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorDeserializeTime
> (TestMain.java:onTaskEnd(85)) - value = 58
> (TestMain.java:onTaskEnd(89)) - input records 2
> Streaming query made progress: {
> "id" : "1231f9cb-b2e8-4d10-804d-73d7826c1cb5",
> "runId" : "bd23b60c-93f9-4e17-b3bc-55403edce4e7",
> "name" : null,
> "timestamp" : "2018-01-26T14:44:05.362Z",
> "numInputRows" : 2,
> "processedRowsPerSecond" : 0.8163265306122448,
> "durationMs" : {
> "addBatch" : 1994,
> "getBatch" : 126,
> "getOffset" : 52,
> "queryPlanning" : 220,
> "triggerExecution" : 2450,
> "walCommit" : 41
> },
> "stateOperators" : [ ],
> "sources" : [ {
> "description" : "FileStreamSource[file:/var/folders/4w/zks_kfls2s3glmrj3f725p7hllyb5_/T/junit3661035412295337071]",
> "startOffset" : null,
> "endOffset" : {
> "logOffset" : 0
> },
> "numInputRows" : 2,
> "processedRowsPerSecond" : 0.8163265306122448
> } ],
> "sink" : {
> "description" : "FileSink[/var/folders/4w/zks_kfls2s3glmrj3f725p7hllyb5_/T/junit3785605384928624065]"
> }
> }
> {code}
> The number of inputs is correct but the number of output records taken from taskEnd.taskMetrics().outputMetrics().recordsWritten() is zero. Accumulables (taskEnd.taskInfo().accumulables()) don't have a correct value as well - should be 1 but it shows 2 'number of output rows'.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org