You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Kireet Agrawal <ki...@gantry.io.INVALID> on 2023/04/19 16:44:30 UTC

ValueState losing event randomly onTimer trigger

Hi Flink dev community!

I'm adding an event delay function for certain events using Flink timers,
but occasionally the ValueState seems to lose the event. I see a ValueState
loss of ~20 events out of 1250, and I don't see a pattern to it yet.

The code is below:
public class ProjectionEvictionEventDelayFunction extends
KeyedProcessFunction<String, JsonNode, JsonNode> {
/**
* ProjectionEvictionEventDelayFunction handles delay projection eviction
events by a configured amount
* of time using Flink's onTimer().
**/

private final static Logger logger = LogManager.getLogger(
ProjectionEvictionEventDelayFunction.class.getName());
private ValueState<String> originalEventState;

private final ObjectMapper mapper = new ObjectMapper();

@Override
public void open(Configuration config) {
ValueStateDescriptor<String> originalEventDescriptor = new
ValueStateDescriptor<String>(
"originalEventState",
Types.STRING);
originalEventState = getRuntimeContext().getState(originalEventDescriptor);
}

@Override
public void processElement(JsonNode event, Context context, Collector<
JsonNode> collector) throws Exception {
if (GantryEventUtils.isEvictionEvent(event)) {
// If it is the eviction event, add a timer with a delay.
originalEventState.update(mapper.writeValueAsString(event));
// TODO: Remove comment
logger.info("Storing eviction event" + event.get(GantryEventField.EVENT_ID.
getFieldName()).asText());
context.timerService().registerProcessingTimeTimer(context.timestamp() +
10000); // 10 seconds.
} else {
// If it is not the eviction event, just send it through.
// TODO: Remove comment
logger.info("Emitting non eviction event" + event.get(GantryEventField.
EVENT_ID.getFieldName()).asText());
collector.collect(event);
}
}

@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<
JsonNode> collector) throws Exception {
String event = originalEventState.value();
try {
// Retrieve the original event from the ValueState
JsonNode originalEvent = mapper.readTree(event);

// Clear the original event state for this key
originalEventState.clear();

// Emit the original event with a timestamp
collector.collect(originalEvent);
} catch (Exception e) {
logger.error("Failed to emit eviction event " + event + context.
getCurrentKey() + timestamp);
}
}
}


Thanks,
Kireet