You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rion Williams <ri...@gmail.com> on 2022/04/21 20:49:17 UTC

Exception Handling in ElasticsearchSink

Hi all,

I've recently been encountering some issues that I've noticed in the logs
of my Flink job that handles writing to an Elasticsearch index. I was
hoping to leverage some of the metrics that Flink exposes (or piggyback on
them) to update metric counters when I encounter specific kinds of errors.

val builder = ElasticsearchSink.Builder(...)

builder.setFailureHandler { actionRequest, throwable, _, _ ->
    // Log error here (and update metrics via metricGroup.counter(...)
}

return builder.build()

Is there a way to handle this currently? My specific implementation has a
process function that manages multiple sinks (so I can create these
dynamically), but in the case of these errors, it doesn't look like I can
access the metric group within the setFailureHandler at present.

My initial thought was in my parent process function, I could pass in the
context to the child sinks so that I'd have context for the
exceptions/metrics:

class DynamicElasticsearchSink<ElementT, RouteT, SinkT :
ElasticsearchSinkBase<ElementT, out AutoCloseable>>(
    /**
     * Defines a router that maps an element to its corresponding
ElasticsearchSink instance
     * @param sinkRouter A [ElasticSinkRouter] that takes an element
of type [ElementT], a string-based route
     * defined as [RouteT] which is used for caching sinks, and
finally the sink itself as [ElasticsearchSink]
     */
    private val sinkRouter: ElasticsearchSinkRouter<ElementT, RouteT, SinkT>
) : RichSinkFunction<ElementT>(), CheckpointedFunction {

    // Store a reference to all of the current routes
    private val sinkRoutes: MutableMap<RouteT, SinkT> = ConcurrentHashMap()
    private lateinit var configuration: Configuration

    override fun open(parameters: Configuration) {
        configuration = parameters
    }

    override fun invoke(value: ElementT, context: SinkFunction.Context) {
        val route = sinkRouter.getRoute(value)
        var sink = sinkRoutes[route]
        if (sink == null) {
            // Here's where the sink is constructed when an exception occurs
            sink = sinkRouter.createSink(route, value)
            sink.runtimeContext = runtimeContext
            sink.open(configuration)
            sinkRoutes[route] = sink
        }

        sink.invoke(value, context)
    }
}

I'd imagine within the open call for this function, I could store the
metrics group and pass it into my createSink() call so the child sinks
would have a reference to it. Does that seem feasible or is there another
way to handle this?

Thanks all,

Rion

Re: Exception Handling in ElasticsearchSink

Posted by Alexander Preuß <al...@ververica.com>.
Hi Rion,
Sorry for the late reply. There should be no problems instantiating the
metric in the open() function and passing down its reference through
createSink and buildSinkFromRoute. I'd be happy to help in case you
encounter any issues.

Best,
Alexander

On Thu, Apr 21, 2022 at 10:49 PM Rion Williams <ri...@gmail.com>
wrote:

> Hi all,
>
> I've recently been encountering some issues that I've noticed in the logs
> of my Flink job that handles writing to an Elasticsearch index. I was
> hoping to leverage some of the metrics that Flink exposes (or piggyback on
> them) to update metric counters when I encounter specific kinds of errors.
>
> val builder = ElasticsearchSink.Builder(...)
>
> builder.setFailureHandler { actionRequest, throwable, _, _ ->
>     // Log error here (and update metrics via metricGroup.counter(...)
> }
>
> return builder.build()
>
> Is there a way to handle this currently? My specific implementation has a
> process function that manages multiple sinks (so I can create these
> dynamically), but in the case of these errors, it doesn't look like I can
> access the metric group within the setFailureHandler at present.
>
> My initial thought was in my parent process function, I could pass in the
> context to the child sinks so that I'd have context for the
> exceptions/metrics:
>
> class DynamicElasticsearchSink<ElementT, RouteT, SinkT : ElasticsearchSinkBase<ElementT, out AutoCloseable>>(
>     /**
>      * Defines a router that maps an element to its corresponding ElasticsearchSink instance
>      * @param sinkRouter A [ElasticSinkRouter] that takes an element of type [ElementT], a string-based route
>      * defined as [RouteT] which is used for caching sinks, and finally the sink itself as [ElasticsearchSink]
>      */
>     private val sinkRouter: ElasticsearchSinkRouter<ElementT, RouteT, SinkT>
> ) : RichSinkFunction<ElementT>(), CheckpointedFunction {
>
>     // Store a reference to all of the current routes
>     private val sinkRoutes: MutableMap<RouteT, SinkT> = ConcurrentHashMap()
>     private lateinit var configuration: Configuration
>
>     override fun open(parameters: Configuration) {
>         configuration = parameters
>     }
>
>     override fun invoke(value: ElementT, context: SinkFunction.Context) {
>         val route = sinkRouter.getRoute(value)
>         var sink = sinkRoutes[route]
>         if (sink == null) {
>             // Here's where the sink is constructed when an exception occurs
>             sink = sinkRouter.createSink(route, value)
>             sink.runtimeContext = runtimeContext
>             sink.open(configuration)
>             sinkRoutes[route] = sink
>         }
>
>         sink.invoke(value, context)
>     }
> }
>
> I'd imagine within the open call for this function, I could store the
> metrics group and pass it into my createSink() call so the child sinks
> would have a reference to it. Does that seem feasible or is there another
> way to handle this?
>
> Thanks all,
>
> Rion
>
>

-- 

Alexander Preuß | Engineer - Data Intensive Systems

alexanderpreuss@ververica.com

<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason,
Jinwei (Kevin) Zhang