You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rion Williams <ri...@gmail.com> on 2022/04/21 20:49:17 UTC
Exception Handling in ElasticsearchSink
Hi all,
I've recently been encountering some issues that I've noticed in the logs
of my Flink job that handles writing to an Elasticsearch index. I was
hoping to leverage some of the metrics that Flink exposes (or piggyback on
them) to update metric counters when I encounter specific kinds of errors.
val builder = ElasticsearchSink.Builder(...)
builder.setFailureHandler { actionRequest, throwable, _, _ ->
// Log error here (and update metrics via metricGroup.counter(...)
}
return builder.build()
Is there a way to handle this currently? My specific implementation has a
process function that manages multiple sinks (so I can create these
dynamically), but in the case of these errors, it doesn't look like I can
access the metric group within the setFailureHandler at present.
My initial thought was in my parent process function, I could pass in the
context to the child sinks so that I'd have context for the
exceptions/metrics:
class DynamicElasticsearchSink<ElementT, RouteT, SinkT :
ElasticsearchSinkBase<ElementT, out AutoCloseable>>(
/**
* Defines a router that maps an element to its corresponding
ElasticsearchSink instance
* @param sinkRouter A [ElasticSinkRouter] that takes an element
of type [ElementT], a string-based route
* defined as [RouteT] which is used for caching sinks, and
finally the sink itself as [ElasticsearchSink]
*/
private val sinkRouter: ElasticsearchSinkRouter<ElementT, RouteT, SinkT>
) : RichSinkFunction<ElementT>(), CheckpointedFunction {
// Store a reference to all of the current routes
private val sinkRoutes: MutableMap<RouteT, SinkT> = ConcurrentHashMap()
private lateinit var configuration: Configuration
override fun open(parameters: Configuration) {
configuration = parameters
}
override fun invoke(value: ElementT, context: SinkFunction.Context) {
val route = sinkRouter.getRoute(value)
var sink = sinkRoutes[route]
if (sink == null) {
// Here's where the sink is constructed when an exception occurs
sink = sinkRouter.createSink(route, value)
sink.runtimeContext = runtimeContext
sink.open(configuration)
sinkRoutes[route] = sink
}
sink.invoke(value, context)
}
}
I'd imagine within the open call for this function, I could store the
metrics group and pass it into my createSink() call so the child sinks
would have a reference to it. Does that seem feasible or is there another
way to handle this?
Thanks all,
Rion
Re: Exception Handling in ElasticsearchSink
Posted by Alexander Preuß <al...@ververica.com>.
Hi Rion,
Sorry for the late reply. There should be no problems instantiating the
metric in the open() function and passing down its reference through
createSink and buildSinkFromRoute. I'd be happy to help in case you
encounter any issues.
Best,
Alexander
On Thu, Apr 21, 2022 at 10:49 PM Rion Williams <ri...@gmail.com>
wrote:
> Hi all,
>
> I've recently been encountering some issues that I've noticed in the logs
> of my Flink job that handles writing to an Elasticsearch index. I was
> hoping to leverage some of the metrics that Flink exposes (or piggyback on
> them) to update metric counters when I encounter specific kinds of errors.
>
> val builder = ElasticsearchSink.Builder(...)
>
> builder.setFailureHandler { actionRequest, throwable, _, _ ->
> // Log error here (and update metrics via metricGroup.counter(...)
> }
>
> return builder.build()
>
> Is there a way to handle this currently? My specific implementation has a
> process function that manages multiple sinks (so I can create these
> dynamically), but in the case of these errors, it doesn't look like I can
> access the metric group within the setFailureHandler at present.
>
> My initial thought was in my parent process function, I could pass in the
> context to the child sinks so that I'd have context for the
> exceptions/metrics:
>
> class DynamicElasticsearchSink<ElementT, RouteT, SinkT : ElasticsearchSinkBase<ElementT, out AutoCloseable>>(
> /**
> * Defines a router that maps an element to its corresponding ElasticsearchSink instance
> * @param sinkRouter A [ElasticSinkRouter] that takes an element of type [ElementT], a string-based route
> * defined as [RouteT] which is used for caching sinks, and finally the sink itself as [ElasticsearchSink]
> */
> private val sinkRouter: ElasticsearchSinkRouter<ElementT, RouteT, SinkT>
> ) : RichSinkFunction<ElementT>(), CheckpointedFunction {
>
> // Store a reference to all of the current routes
> private val sinkRoutes: MutableMap<RouteT, SinkT> = ConcurrentHashMap()
> private lateinit var configuration: Configuration
>
> override fun open(parameters: Configuration) {
> configuration = parameters
> }
>
> override fun invoke(value: ElementT, context: SinkFunction.Context) {
> val route = sinkRouter.getRoute(value)
> var sink = sinkRoutes[route]
> if (sink == null) {
> // Here's where the sink is constructed when an exception occurs
> sink = sinkRouter.createSink(route, value)
> sink.runtimeContext = runtimeContext
> sink.open(configuration)
> sinkRoutes[route] = sink
> }
>
> sink.invoke(value, context)
> }
> }
>
> I'd imagine within the open call for this function, I could store the
> metrics group and pass it into my createSink() call so the child sinks
> would have a reference to it. Does that seem feasible or is there another
> way to handle this?
>
> Thanks all,
>
> Rion
>
>
--
Alexander Preuß | Engineer - Data Intensive Systems
alexanderpreuss@ververica.com
<https://www.ververica.com/>
Follow us @VervericaData
--
Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason,
Jinwei (Kevin) Zhang