You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "wyphao.2007" <wy...@163.com> on 2017/07/06 06:10:36 UTC
Registering custom metrics does not work
Hi, all
I want to know element's latency before write to Elasticsearch, so I registering a custom metrics as follow:
class CustomElasticsearchSinkFunction extends ElasticsearchSinkFunction[EventEntry] {
private var metricGroup: Option[MetricGroup] = None
private var latency: Long = _
private def init(runtimeContext: RuntimeContext): Unit = {
if (metricGroup.isEmpty) {
metricGroup = Some(runtimeContext.getMetricGroup)
metricGroup.get.gauge[Long, Gauge[Long]]("esLatency", ScalaGauge[Long](() => latency))
}
}
def createIndexRequest(element: EventEntry, runtimeContext: RuntimeContext): IndexRequest = {
init(runtimeContext)
latency = System.currentTimeMillis() - element.executeTime.getMillis
Requests.indexRequest.index("test").`type`("event").source(element.json)
}
override def process(element: EventEntry,
runtimeContext: RuntimeContext,
requestIndexer: RequestIndexer): Unit =
requestIndexer.add(createIndexRequest(element, runtimeContext))
}
but that does not seem to work, Does anyone know why?
Regards
wyp
Re: Registering custom metrics does not work
Posted by Chesnay Schepler <ch...@apache.org>.
How are you verifying whether it is registered?
For the sake of covering all angles: Are you certain that
createPartitionIndex is called?
On 06.07.2017 08:51, wyphao.2007 wrote:
> Hi Chesnay, thank you for your reply
>
> The code above does not get registered at all.
>
>
>
> 在2017年07月06 14时45分, "Chesnay Schepler"<ch...@apache.org>写道:
>
>
> Hello,
>
> Plase provide more information as to how it is not working as
> expected.
>
> Does it throw an exception, log a warning, is the metric
> not get registered at all or does the value not changing?
>
> On 06.07.2017 08:10, wyphao.2007 wrote:
>> Hi, all
>> I want to know element's latency before write to
>> Elasticsearch, so I registering a custom metrics as follow:
>>
>> class CustomElasticsearchSinkFunction extends
>> ElasticsearchSinkFunction[EventEntry] {
>> private var metricGroup: Option[MetricGroup] = None
>> private var latency: Long = _
>>
>> private def init(runtimeContext: RuntimeContext): Unit = {
>> if (metricGroup.isEmpty) {
>> metricGroup = Some(runtimeContext.getMetricGroup)
>> metricGroup.get.gauge[Long, Gauge[Long]]("esLatency",
>> ScalaGauge[Long](() => latency))
>> }
>> }
>>
>> def createIndexRequest(element: EventEntry, runtimeContext:
>> RuntimeContext): IndexRequest = {
>> init(runtimeContext)
>> latency = System.currentTimeMillis() -
>> element.executeTime.getMillis
>> Requests.indexRequest.index("test").`type`("event").source(element.json)
>> }
>>
>> override def process(element: EventEntry,
>> runtimeContext: RuntimeContext,
>> requestIndexer: RequestIndexer):
>> Unit =
>> requestIndexer.add(createIndexRequest(element,
>> runtimeContext))
>> }
>>
>> but that does not seem to work, Does anyone know why?
>>
>> Regards
>> wyp
>>
>
Re:Re: Registering custom metrics does not work
Posted by "wyphao.2007" <wy...@163.com>.
Hi Chesnay, thank you for your reply
The code above does not get registered at all.
在2017年07月06 14时45分, "Chesnay Schepler"<ch...@apache.org>写道:
Hello,
Plase provide more information as to how it is not working as expected.
Does it throw an exception, log a warning, is the metric
not get registered at all or does the value not changing?
On 06.07.2017 08:10, wyphao.2007 wrote:
Hi, all
I want to know element's latency before write to Elasticsearch, so I registering a custom metrics as follow:
class CustomElasticsearchSinkFunction extends ElasticsearchSinkFunction[EventEntry] {
private var metricGroup: Option[MetricGroup] = None
private var latency: Long = _
private def init(runtimeContext: RuntimeContext): Unit = {
if (metricGroup.isEmpty) {
metricGroup = Some(runtimeContext.getMetricGroup)
metricGroup.get.gauge[Long, Gauge[Long]]("esLatency", ScalaGauge[Long](() => latency))
}
}
def createIndexRequest(element: EventEntry, runtimeContext: RuntimeContext): IndexRequest = {
init(runtimeContext)
latency = System.currentTimeMillis() - element.executeTime.getMillis
Requests.indexRequest.index("test").`type`("event").source(element.json)
}
override def process(element: EventEntry,
runtimeContext: RuntimeContext,
requestIndexer: RequestIndexer): Unit =
requestIndexer.add(createIndexRequest(element, runtimeContext))
}
but that does not seem to work, Does anyone know why?
Regards
wyp
Re: Registering custom metrics does not work
Posted by Chesnay Schepler <ch...@apache.org>.
Hello,
Plase provide more information as to how it is not working as expected.
Does it throw an exception, log a warning, is the metric
not get registered at all or does the value not changing?
On 06.07.2017 08:10, wyphao.2007 wrote:
> Hi, all
> I want to know element's latency before write to Elasticsearch, so I
> registering a custom metrics as follow:
>
> class CustomElasticsearchSinkFunction extends
> ElasticsearchSinkFunction[EventEntry] {
> private var metricGroup: Option[MetricGroup] = None
> private var latency: Long = _
>
> private def init(runtimeContext: RuntimeContext): Unit = {
> if (metricGroup.isEmpty) {
> metricGroup = Some(runtimeContext.getMetricGroup)
> metricGroup.get.gauge[Long, Gauge[Long]]("esLatency",
> ScalaGauge[Long](() => latency))
> }
> }
>
> def createIndexRequest(element: EventEntry, runtimeContext:
> RuntimeContext): IndexRequest = {
> init(runtimeContext)
> latency = System.currentTimeMillis() - element.executeTime.getMillis
> Requests.indexRequest.index("test").`type`("event").source(element.json)
> }
>
> override def process(element: EventEntry,
> runtimeContext: RuntimeContext,
> requestIndexer: RequestIndexer): Unit =
> requestIndexer.add(createIndexRequest(element, runtimeContext))
> }
>
> but that does not seem to work, Does anyone know why?
>
> Regards
> wyp
>