You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "wyphao.2007" <wy...@163.com> on 2017/07/06 06:10:36 UTC

Registering custom metrics does not work

Hi, all
I want to know element's latency before write to Elasticsearch, so I registering a custom metrics as follow:


class CustomElasticsearchSinkFunction extends ElasticsearchSinkFunction[EventEntry] {
  private var metricGroup: Option[MetricGroup] = None
  private var latency: Long = _


  private def init(runtimeContext: RuntimeContext): Unit = {
    if (metricGroup.isEmpty) {
      metricGroup = Some(runtimeContext.getMetricGroup)
      metricGroup.get.gauge[Long, Gauge[Long]]("esLatency", ScalaGauge[Long](() => latency))
    }
  }


  def createIndexRequest(element: EventEntry, runtimeContext: RuntimeContext): IndexRequest = {
    init(runtimeContext)
    latency = System.currentTimeMillis() - element.executeTime.getMillis
    Requests.indexRequest.index("test").`type`("event").source(element.json)
  }


  override def process(element: EventEntry,
                       runtimeContext: RuntimeContext,
                       requestIndexer: RequestIndexer): Unit =
    requestIndexer.add(createIndexRequest(element, runtimeContext))
}


but that does not seem to work, Does anyone know why?


Regards
wyp


Re: Registering custom metrics does not work

Posted by Chesnay Schepler <ch...@apache.org>.
How are you verifying whether it is registered?

For the sake of covering all angles: Are you certain that 
createPartitionIndex is called?

On 06.07.2017 08:51, wyphao.2007 wrote:
> Hi Chesnay, thank you for your reply
>
> The code above does not get registered at all.
>
>
>
> 在2017年07月06 14时45分, "Chesnay Schepler"<ch...@apache.org>写道:
>
>
>     Hello,
>
>          Plase provide more information as to how it is not working as
>          expected.
>
>          Does it throw an exception, log a warning, is the metric
>          not get registered at all or does the value not changing?
>
>          On 06.07.2017 08:10, wyphao.2007 wrote:
>>     Hi, all
>>     I want to know element's latency before write to        
>>      Elasticsearch, so I registering a custom metrics as follow:
>>
>>     class CustomElasticsearchSinkFunction extends
>>      ElasticsearchSinkFunction[EventEntry] {
>>       private var metricGroup: Option[MetricGroup] = None
>>       private var latency: Long = _
>>
>>       private def init(runtimeContext: RuntimeContext): Unit =          {
>>         if (metricGroup.isEmpty) {
>>           metricGroup = Some(runtimeContext.getMetricGroup)
>>           metricGroup.get.gauge[Long, Gauge[Long]]("esLatency",      
>>        ScalaGauge[Long](() => latency))
>>         }
>>       }
>>
>>       def createIndexRequest(element: EventEntry,  runtimeContext:
>>     RuntimeContext): IndexRequest = {
>>         init(runtimeContext)
>>         latency = System.currentTimeMillis() -
>>      element.executeTime.getMillis
>>      Requests.indexRequest.index("test").`type`("event").source(element.json)
>>       }
>>
>>       override def process(element: EventEntry,
>>                            runtimeContext: RuntimeContext,
>>                            requestIndexer: RequestIndexer):        
>>      Unit =
>>         requestIndexer.add(createIndexRequest(element,    
>>      runtimeContext))
>>     }
>>
>>     but that does not seem to work, Does anyone know why?
>>
>>     Regards
>>     wyp
>>
>


Re:Re: Registering custom metrics does not work

Posted by "wyphao.2007" <wy...@163.com>.
Hi Chesnay, thank you for your reply


The code above does not get registered at all.






在2017年07月06 14时45分, "Chesnay Schepler"<ch...@apache.org>写道:

             
Hello,
     
     Plase provide more information as to how it is not working as      expected.
     
     Does it throw an exception, log a warning, is the metric
     not get registered at all or does the value not changing?
     
     On 06.07.2017 08:10, wyphao.2007 wrote:
   
        
       
Hi, all
       
I want to know element's latency before write to          Elasticsearch, so I registering a custom metrics as follow:
       

       
       
class CustomElasticsearchSinkFunction extends          ElasticsearchSinkFunction[EventEntry] {
       
  private var metricGroup: Option[MetricGroup] = None
       
  private var latency: Long = _
       

       
       
  private def init(runtimeContext: RuntimeContext): Unit =          {
       
    if (metricGroup.isEmpty) {
       
      metricGroup = Some(runtimeContext.getMetricGroup)
       
      metricGroup.get.gauge[Long, Gauge[Long]]("esLatency",          ScalaGauge[Long](() => latency))
       
    }
       
  }
       

       
       
  def createIndexRequest(element: EventEntry,          runtimeContext: RuntimeContext): IndexRequest = {
       
    init(runtimeContext)
       
    latency = System.currentTimeMillis() -          element.executeTime.getMillis
       
             Requests.indexRequest.index("test").`type`("event").source(element.json)
       
  }
       

       
       
  override def process(element: EventEntry,
       
                       runtimeContext: RuntimeContext,
       
                       requestIndexer: RequestIndexer):          Unit =
       
    requestIndexer.add(createIndexRequest(element,          runtimeContext))
       
}
       

       
       
but that does not seem to work, Does anyone know why?
       

       
       
Regards
       
wyp
       

       
     
            


   

Re: Registering custom metrics does not work

Posted by Chesnay Schepler <ch...@apache.org>.
Hello,

Plase provide more information as to how it is not working as expected.

Does it throw an exception, log a warning, is the metric
not get registered at all or does the value not changing?

On 06.07.2017 08:10, wyphao.2007 wrote:
> Hi, all
> I want to know element's latency before write to Elasticsearch, so I 
> registering a custom metrics as follow:
>
> class CustomElasticsearchSinkFunction extends 
> ElasticsearchSinkFunction[EventEntry] {
>   private var metricGroup: Option[MetricGroup] = None
>   private var latency: Long = _
>
>   private def init(runtimeContext: RuntimeContext): Unit = {
>     if (metricGroup.isEmpty) {
>       metricGroup = Some(runtimeContext.getMetricGroup)
>       metricGroup.get.gauge[Long, Gauge[Long]]("esLatency", 
> ScalaGauge[Long](() => latency))
>     }
>   }
>
>   def createIndexRequest(element: EventEntry, runtimeContext: 
> RuntimeContext): IndexRequest = {
>     init(runtimeContext)
>     latency = System.currentTimeMillis() - element.executeTime.getMillis
> Requests.indexRequest.index("test").`type`("event").source(element.json)
>   }
>
>   override def process(element: EventEntry,
>                        runtimeContext: RuntimeContext,
>                        requestIndexer: RequestIndexer): Unit =
>     requestIndexer.add(createIndexRequest(element, runtimeContext))
> }
>
> but that does not seem to work, Does anyone know why?
>
> Regards
> wyp
>