You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Binh Nguyen Van <bi...@gmail.com> on 2023/10/18 12:45:20 UTC

Counter metric doesn't not show up

Hi,

I have a pipeline and I use Dataflow to run it. In my pipeline, I use state
and timer and in the timer callback, I update a counter. Then I created a
dashboard in Metric Explorer to view the metric but the counter that I
updated in timer callback doesn’t show up there even though the timer is
called. Is this expected behavior?

This is the sample code that can be used to reproduce the behavior.

Thanks
-Binh

class TestBeamTimer {
  public static void main(String[] args) {
    PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().create();
    Pipeline pipeline = Pipeline.create(options);

    PCollection<Long> numbers =
        pipeline.apply(
            "Generate numbers", GenerateSequence.from(0).withRate(10,
Duration.standardSeconds(1)));
    numbers
        .apply("Attach key", WithKeys.of(SerializableFunctions.identity()))
        .setCoder(KvCoder.of(VarLongCoder.of(), VarLongCoder.of()))
        .apply("Apply timer", ParDo.of(new ApplyTimer()));

    pipeline.run();
  }

  private static class ApplyTimer extends DoFn<KV<Long, Long>, KV<Long, Long>> {
    private static final String STATE_ID_INPUT_DATA = "input_data";

    @StateId(STATE_ID_INPUT_DATA)
    private final StateSpec<@NonNull ValueState<List<Long>>> inputDataState =
        StateSpecs.value(ListCoder.of(VarLongCoder.of()));

    private static final String TIMER_ID_TIMER = "timer";

    @TimerId(TIMER_ID_TIMER)
    private final TimerSpec timerSpec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

    private static final Counter STARTED_TIMER_COUNTER =
        Metrics.counter("timers", "started_timers_count");

    private static final Counter FIRED_TIMER_COUNTER =
        Metrics.counter("timers", "fired_timers_count");

    @ProcessElement
    public void processElement(
        @Element KV<Long, Long> element,
        @StateId(STATE_ID_INPUT_DATA) ValueState<List<Long>> inputDataState,
        @TimerId(TIMER_ID_TIMER) Timer timer,
        OutputReceiver<KV<Long, Long>> receiver) {
      List<Long> existingData =
MoreObjects.firstNonNull(inputDataState.read(), new ArrayList<>());

      if (existingData.isEmpty()) {
        timer.offset(Duration.standardSeconds(5)).withNoOutputTimestamp().setRelative();
        STARTED_TIMER_COUNTER.inc();
      }
      existingData.add(element.getValue());
      inputDataState.write(existingData);

      receiver.output(element);
    }

    @OnTimer(TIMER_ID_TIMER)
    public void onTimer(
        @Key Long key, @StateId(STATE_ID_INPUT_DATA)
ValueState<List<Long>> inputDataState) {
      FIRED_TIMER_COUNTER.inc();
      LOG.info("Timer callback for key {} was called", key);
      inputDataState.clear();
    }
  }
}