You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bahir.apache.org by "ASF subversion and git services (Jira)" <ji...@apache.org> on 2023/03/07 14:59:00 UTC

[jira] [Commented] (BAHIR-283) InfluxDBWriter fails to write the final element in each element

    [ https://issues.apache.org/jira/browse/BAHIR-283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17697472#comment-17697472 ] 

ASF subversion and git services commented on BAHIR-283:
-------------------------------------------------------

Commit c8b6f6122ae6411064a1ec55cf2c2fbc08d4979f in bahir-flink's branch refs/heads/master from dave
[ https://gitbox.apache.org/repos/asf?p=bahir-flink.git;h=c8b6f61 ]

[BAHIR-283] Fix dropped elements on InfluxDbSink



> InfluxDBWriter fails to write the final element in each element
> ---------------------------------------------------------------
>
>                 Key: BAHIR-283
>                 URL: https://issues.apache.org/jira/browse/BAHIR-283
>             Project: Bahir
>          Issue Type: Bug
>          Components: Flink Streaming Connectors
>    Affects Versions: Flink-1.0
>            Reporter: David Quigley
>            Priority: Major
>             Fix For: Flink-1.2.0
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> {{    /**
>      * This method calls the InfluxDB write API whenever the element list reaches the {@link
>      * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
>      * timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
>      *
>      * @param in incoming data
>      * @param context current Flink context
>      * @see org.apache.flink.api.connector.sink.SinkWriter.Context
>      */
>     @Override
>     public void write(final IN in, final Context context) throws IOException {
>         if (this.elements.size() == this.bufferSize) {
>             LOG.debug("Buffer size reached preparing to write the elements.");
>             this.writeCurrentElements();
>             this.elements.clear();
>         } else {
>             LOG.trace("Adding elements to buffer. Buffer size: {}", this.elements.size());
>             this.elements.add(this.schemaSerializer.serialize(in, context));
>             if (context.timestamp() != null) {
>                 this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp());
>             }
>         }
>     }}}
> The bug is in this write method. If the number of elements in the buffer is less than the configured buffer size, the current element is added to the buffer. If the number of elements in the buffer is equal to the buffer size, the buffer is flushed and the current element is not added to the next buffer. This results in the current element being dropped.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)