You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexey Trenikhun <ye...@msn.com> on 2021/03/05 16:47:42 UTC
How to check checkpointing mode
Hello,
My job sets checkpointing mode to at-least-once:
StreamExecutionEnvironment
.getExecutionEnvironment()
.enableCheckpointing(checkpointInterval.toMillis(),
CheckpointingMode.AT_LEAST_ONCE)
but Flink UI shows Checkpointing Mode: Exactly Once:
[cid:83ef77cf-356c-4f06-9873-f803baecee9d]
Why is that? Does Flink for some reason decide to ignore my setting (btw flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any other way to check what is actual checkpointing mode is?
Thanks,
Alexey
Re: Re: How to check checkpointing mode
Posted by Alexey Trenikhun <ye...@msn.com>.
Hi Yun,
It is confusing but UI now shows expected value "At Least Once" (obviously checkpointCfg#checkpointingMode shows AT_LEAST_ONCE as well). Clearly I've either looked in wrong place or job was not upgraded when I changed checkpointing mode ...
Sorry for noise and thank you for your help
Alexey
________________________________
From: Yun Gao <yu...@aliyun.com>
Sent: Monday, March 8, 2021 7:14 PM
To: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Re: How to check checkpointing mode
Hi Alexey,
Sorry I also do not see problems in the attached code. Could you add
a breakpoint at `see.execute(name)` and have a look at the value of
see#checkpointCfg#checkpointingMode ?
Best,
Yun
------------------Original Mail ------------------
Sender:Alexey Trenikhun <ye...@msn.com>
Send Date:Tue Mar 9 07:25:31 2021
Recipients:Flink User Mail List <us...@flink.apache.org>, Yun Gao <yu...@aliyun.com>
Subject:Re: How to check checkpointing mode
Hi Yun,
Thank you for looking, job creation is quite big, I've truncated helper methods dealing with command line parameters etc, below two major methods:
@Override
public Void call() throws Exception {
LOGGER.info("{}", new Info().toLog());
if (!allParameters.isEmpty()) {
// We don't expect any parameters, but Flink 1.12 adds JVM options to job args, since we add
// -- after jobs argument, this unnecessary for us arguments will be treated as positional
// parameters, which we ignore but log warning
LOGGER.warn("Unexpected parameters: {}", allParameters);
}
try {
final StreamExecutionEnvironment see = buildStreamExecutionEnvironment();
see.execute(name);
return null;
} catch (InterruptedException e) {
LOGGER.error("Stream Processor was interrupted", e);
Thread.currentThread().interrupt();
throw e;
} catch (Exception e) {
LOGGER.error("Stream Processor is terminated due to exception", e);
throw e;
}
}
private StreamExecutionEnvironment buildStreamExecutionEnvironment() throws IOException {
initDefaultKafkaSource();
final long deviationMillis = deviation.toMillis();
final GlobalAppConfig globalAppConfig = config();
final StreamExecutionEnvironment see = StreamExecutionEnvironment
.getExecutionEnvironment()
.enableCheckpointing(checkpointInterval.toMillis(),
CheckpointingMode.AT_LEAST_ONCE)
.setMaxParallelism(1024)
.setParallelism(parallelism);
if (externalizedCheckpoints) {
see.getCheckpointConfig()
.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
see.getConfig().disableGenericTypes();
see.getConfig().disableAutoGeneratedUIDs();
configureStateBackend(see);
final Properties producerProperties = new PropertiesBuilder()
.putAll(kafkaCommonOptions)
.putAll(kafkaProducerOptions)
.varFiles(valueFiles)
.build();
final KafkaProducerFactory producerFactory = KafkaProducerFactory.builder()
.semantic(Semantic.AT_LEAST_ONCE)
.config(producerProperties)
.build();
final AutoTopic autoTopic = AutoTopic.builder()
.config(producerProperties)
.partitions(autoCreateTopicsPartitions)
.replicationFactor(autoCreateTopicsReplicationFactor)
.doNotCreateTopics(ImmutableSet.of(
gspCfg, gspCustom, gspIxn, gspOutbound, gspSm
))
.build();
see.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.minutes(1)));
// since Flink 1.12 default stream characteristic is event time,
// so we don't need to set streamTimeCharacteristic, furthermore whole TimeCharacteristic enum
// is deprecated.
// If needed explicitly using processing-time windows and timers works in event-time mode.
addHeartbeats(see);
final TStateCleanupOnTimeout.Factory cleanupFactory =
new TStateCleanupOnTimeout.Factory(
maxCallDuration,
postmortemCallDuration,
globalAppConfig.timerGranularity()
);
@Nullable final SingleOutputStreamOperator<PbCfgDatum> cfgXform;
@Nullable final DataStream<PbCfgDatum> cfgSource = addSources(see,
SourceTopic.GCA_CFG,
new CfgJsonDeserializationSchema(),
(event, timestamp) -> event.getBatchId(),
it -> !it.getHeartbeat());
if (cfgSource != null) {
cfgXform = cfgSource
.keyBy(PbCfgDatum::getCcId)
.process(new CfgTransform())
.uid("xform-cfg")
.name("XForm Config");
if (!isNullOrEmpty(gspCfg)) {
cfgXform.addSink(producerFactory.create(gspCfg,
autoTopic.decorate(new CfgJsonSerializationSchema(gspCfg))))
.uid("uid-" + gspCfg)
.name(gspCfg);
} else {
cfgXform.addSink(new DiscardingSink<>())
.uid("uid-gsp-cfg-null")
.name("gsp-cfg-null");
}
} else {
cfgXform = null;
}
final DataStream<PbTcmDatum> voiceCallThreadSource = addSources(see,
SourceTopic.VOICE_CALL_THREAD,
callThreadFormat == KafkaTopicFormat.JSON
? new TJsonDeserializationSchema()
: new CallEventDeserializationSchema(),
(event, timestamp) ->
Instants.PROTO_TIMESTAMP_EPOCH.equals(event.getTimestamp())
? timestamp - deviationMillis
: Instants.toMillis(event.getTimestamp()),
event -> event.getType() != EventType.EVENT_UNKNOWN);
final SingleOutputStreamOperator<PbTcmDatum> tcmDataStream1 = voiceCallThreadSource
.keyBy(CallEventKey::new)
.process(new TIntakeProcessFunction(cleanupFactory))
.returns(PbTypes.TCM_DATUM)
.uid("intake-voice-calls")
.name("Intake voice calls");
DataStream<PbAgentStateDatum> allAgentStateEventsDataStream =
tcmDataStream1.getSideOutput(TIntakeProcessFunction.AGENT_STATE_EVENTS_TAG);
final SingleOutputStreamOperator<PbDIxnDatum> sortedIxnEventStream;
final SingleOutputStreamOperator<PbDIxnDatum> digitalPreTransform;
if (!isNullOrEmpty(digitalItx) && !isNullOrEmpty(digitalAgentStateTopic)) {
final DataStream<PbDIxnEnvelope> ixnIntake = addSources(see,
SourceTopic.DIGITAL_ITX,
new InteractionEventDeserializationSchema(),
(event, timestamp) -> Instants
.toMillis(event.getOrder().getTimestamp()),
event -> event.getId() != InteractionEventMessageName.EventHeartbeat.ordinal());
sortedIxnEventStream = ixnIntake
.keyBy(DixnEventKey::new)
.process(new UnwrapEnvelopeAndSort())
.uid("unwrap-sort")
.name("Sort Digital Interactions");
digitalPreTransform = sortedIxnEventStream
.keyBy(DixnEventKey::new)
.process(new DIxnPreTransformSplit())
.uid("split-digital-busy")
.name("Split Digital busy states");
allAgentStateEventsDataStream = union(
allAgentStateEventsDataStream,
digitalPreTransform.getSideOutput(DIxnPreTransformSplit.AGENT_STATE_EVENTS_TAG));
} else {
sortedIxnEventStream = null;
digitalPreTransform = null;
}
// --------
allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
addSources(see,
SourceTopic.VOICE_AGENT_STATE,
callThreadFormat == KafkaTopicFormat.JSON
? new AsdJsonDeserializationSchema() : new AsdAvroDeserializationSchema(),
this::timestamp,
event -> event.getId() != Id.UNKNOWN));
allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
addSources(see,
SourceTopic.DIGITAL_AGENT_STATE,
new AsdAvroDeserializationSchema(),
this::timestamp,
event -> event.getId() != Id.UNKNOWN));
@NotNull final SingleOutputStreamOperator<PbOcsDatum> ocsDataStream = Nullables
.requireNonNullElseGet(
addSources(see,
SourceTopic.VOICE_OUTBOUND,
new OcsJsonDeserializationSchema(),
this::timestamp,
event -> event.getEventCode() != EventCode.EVENT_OCS_NONE),
() -> see.addSource(new EmptyGenerator<PbOcsDatum>(Instant.MAX)))
.keyBy(OcsEventKey::new)
.process(new OcsIntakeProcessFunction())
.returns(PbTypes.OCS_DATUM)
.uid("intake-outbound")
.name("Intake Outbound");
final SingleOutputStreamOperator<PbAgentStateDatum> agentStatesIntake;
if (allAgentStateEventsDataStream != null) {
agentStatesIntake = allAgentStateEventsDataStream
.keyBy(PersonKey::new)
.process(new AgentStateIntakeProcessFunction())
.uid("intake-agent-states")
.name("Intake Agent States");
} else {
agentStatesIntake = null;
}
@Nullable final SingleOutputStreamOperator<PbAsdSubjectInfo> agentInfo;
if (cfgXform != null && agentStatesIntake != null) {
DataStream<PbPersonQuery> personQueryStream = union(
agentStatesIntake.getSideOutput(AgentStateIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG),
ocsDataStream.getSideOutput(OcsIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG)
);
agentInfo = cfgXform.getSideOutput(CfgTransform.PERSON_UPDATE_OUTPUT_TAG)
.connect(personQueryStream)
.keyBy(PersonKey::new, PersonKey::new)
.process(new CfgAgentInfoRegistry(globalAppConfig))
.uid("cfg-agent-info-provider")
.name("Agent Info");
} else {
agentInfo = null;
}
final SingleOutputStreamOperator<AgentLoginSession> allAgentLoginSession;
if (agentStatesIntake != null) {
allAgentLoginSession = agentStatesIntake.connect(agentInfo)
.keyBy(PersonKey::new, PersonKey::new)
.process(new AgentStateTransformation(
globalAppConfig,
glsAcwMode,
agentStateIdleTimeout))
.uid("xform-agent-states")
.name("Xform Agent States");
} else {
allAgentLoginSession = see.addSource(new EmptyGenerator<>());
}
if (!gspSm.isEmpty()) {
allAgentLoginSession
.addSink(producerFactory.create(
gspSm,
autoTopic.decorate(new AgentStatesSerializationSchema(outputTopicFormat, gspSm))))
.uid("uid-" + gspSm)
.name(gspSm);
} else {
allAgentLoginSession
.addSink(new DiscardingSink<>())
.uid("uid-gsp-sm-null")
.name("gsp-sm-null");
}
DataStream<Interaction> interactionDataStream = tcmDataStream1
.connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.VOICE_IRF_ACW))
.keyBy(CallEventKey::new, CallEventKey::new)
.process(new TInteractionsProcessFunction(globalAppConfig, cleanupFactory))
.uid("xform-voice-interactions")
.name("Xform Voice interactions");
if (digitalPreTransform != null) {
final DataStream<Interaction> ixnOutputStream = digitalPreTransform
.connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.DIGITAL_IRF))
.keyBy(DixnEventKey::new, DixnEventKey::new)
.process(new DixnTransform(globalAppConfig))
.uid("xform-digital-interactions")
.name("Xform Digital Interactions");
interactionDataStream = interactionDataStream.union(ixnOutputStream);
}
interactionDataStream = AsyncDataStream.orderedWait(
interactionDataStream,
new ResourceInfo(cfgCacheUrl, provideCfgDictionarySpec()),
cfgCacheTimeout.toMillis(),
TimeUnit.MILLISECONDS,
(ResourceInfo.IO_CAPACITY + see.getConfig().getParallelism() - 1) / see.getConfig()
.getParallelism())
.uid("uid-config-async-io")
.name("Enrich interactions with config");
if (!gspIxn.isEmpty()) {
interactionDataStream.addSink(
producerFactory.create(
gspIxn,
autoTopic.decorate(new InteractionSerializationSchema(
outputTopicFormat, gspIxn, producerFactory.config()))))
.uid("uid-" + gspIxn)
.name(gspIxn);
} else {
interactionDataStream
.addSink(new DiscardingSink<>())
.uid("uid-gsp-ixn-null")
.name("gsp-ixn-null");
}
if (!isNullOrEmpty(gspCustom)) {
final DataStream<PbUxDatum> uxEventsVoice = tcmDataStream1
.getSideOutput(TIntakeProcessFunction.UX_EVENTS_TAG);
final SingleOutputStreamOperator<CustomFact> customFactsVoice =
uxEventsVoice
.keyBy(UxKey::new)
.process(new UxToCustomFact(globalAppConfig))
.uid("xform-voice-ux")
.name("Xform Voice Ux");
SingleOutputStreamOperator<CustomFact> customFactsDigital = null;
if (sortedIxnEventStream != null) {
DataStream<PbUxDatum> uxEventsDigital = sortedIxnEventStream
.getSideOutput(UnwrapEnvelopeAndSort.UX_EVENTS_TAG);
customFactsDigital =
uxEventsDigital
.keyBy(UxKey::new)
.process(new UxToCustomFact(globalAppConfig))
.uid("xform-digital-ux")
.name("Xform Digital Ux");
}
final DataStream<CustomFact> customFacts = customFactsDigital == null ? customFactsVoice
: customFactsVoice.union(customFactsDigital);
customFacts.addSink(
producerFactory.create(
globalAppConfig.gspCustom(),
autoTopic.decorate(
new CustomFactSerializationSchema(
outputTopicFormat, globalAppConfig.gspCustom()))))
.uid("uid-" + globalAppConfig.gspCustom())
.name(globalAppConfig.gspCustom());
}
if (!isNullOrEmpty(voiceOutbound)) {
final OcsStateCleanupOnTimeout.Factory ocsStateCleanupFactory =
new OcsStateCleanupOnTimeout.Factory(
maxCampaignGroupSessionDuration,
globalAppConfig.timerGranularity()
);
@Nullable final DataStream<PbCafSubjectInfo> ocsAgentDetails = agentInfo != null
? agentInfo.getSideOutput(CfgAgentInfoRegistry.CAF_SUBJECT_INFO_OUTPUT_TAG) : null;
final SingleOutputStreamOperator<CampaignGroupSessionFact> outboundDataStream = ocsDataStream
.connect(ocsAgentDetails)
.keyBy(OcsEventKey::new, OcsEventKey::new)
.process(new OcsProcessFunction(globalAppConfig, ocsStateCleanupFactory))
.uid("xform-outbound")
.name("Xform Outbound");
if (!gspOutbound.isEmpty()) {
outboundDataStream.addSink(
producerFactory.create(
gspOutbound,
autoTopic.decorate(
new OutboundSerializationSchema(outputTopicFormat, gspOutbound))))
.uid("uid-" + globalAppConfig.gspOutbound())
.name(globalAppConfig.gspOutbound());
} else {
outboundDataStream
.addSink(new DiscardingSink<>())
.uid("uid-gsp-outbound-null")
.name("gsp-outbound-null");
}
}
TApp.main(see);
return see;
}
Thanks,
Alexey
________________________________
From: Yun Gao <yu...@aliyun.com>
Sent: Monday, March 8, 2021 7:57 AM
To: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: How to check checkpointing mode
Hi Alexey,
Logically the setting in the code is of the highest
priority.
Could you show the complete code on the job creation ?
I think it seems to be not usual to enable checkpointing
with an anonymous StreamExecutionEnvironment.
Best,
Yun
------------------------------------------------------------------
From:Alexey Trenikhun <ye...@msn.com>
Send Time:2021 Mar. 6 (Sat.) 01:02
To:Flink User Mail List <us...@flink.apache.org>
Subject:How to check checkpointing mode
Hello,
My job sets checkpointing mode to at-least-once:
StreamExecutionEnvironment
.getExecutionEnvironment()
.enableCheckpointing(checkpointInterval.toMillis(),
CheckpointingMode.AT_LEAST_ONCE)
but Flink UI shows Checkpointing Mode: Exactly Once:
[cid:__aliyun161521903696247551]
Why is that? Does Flink for some reason decide to ignore my setting (btw flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any other way to check what is actual checkpointing mode is?
Thanks,
Alexey
Re: Re: How to check checkpointing mode
Posted by Yun Gao <yu...@aliyun.com>.
Hi Alexey,
Sorry I also do not see problems in the attached code. Could you add
a breakpoint at `see.execute(name)` and have a look at the value of
see#checkpointCfg#checkpointingMode ?
Best,
Yun
------------------Original Mail ------------------
Sender:Alexey Trenikhun <ye...@msn.com>
Send Date:Tue Mar 9 07:25:31 2021
Recipients:Flink User Mail List <us...@flink.apache.org>, Yun Gao <yu...@aliyun.com>
Subject:Re: How to check checkpointing mode
Hi Yun,
Thank you for looking, job creation is quite big, I've truncated helper methods dealing with command line parameters etc, below two major methods:
@Override
public Void call() throws Exception {
LOGGER.info("{}", new Info().toLog());
if (!allParameters.isEmpty()) {
// We don't expect any parameters, but Flink 1.12 adds JVM options to job args, since we add
// -- after jobs argument, this unnecessary for us arguments will be treated as positional
// parameters, which we ignore but log warning
LOGGER.warn("Unexpected parameters: {}", allParameters);
}
try {
final StreamExecutionEnvironment see = buildStreamExecutionEnvironment();
see.execute(name);
return null;
} catch (InterruptedException e) {
LOGGER.error("Stream Processor was interrupted", e);
Thread.currentThread().interrupt();
throw e;
} catch (Exception e) {
LOGGER.error("Stream Processor is terminated due to exception", e);
throw e;
}
}
private StreamExecutionEnvironment buildStreamExecutionEnvironment() throws IOException {
initDefaultKafkaSource();
final long deviationMillis = deviation.toMillis();
final GlobalAppConfig globalAppConfig = config();
final StreamExecutionEnvironment see = StreamExecutionEnvironment
.getExecutionEnvironment()
.enableCheckpointing(checkpointInterval.toMillis(),
CheckpointingMode.AT_LEAST_ONCE)
.setMaxParallelism(1024)
.setParallelism(parallelism);
if (externalizedCheckpoints) {
see.getCheckpointConfig()
.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
see.getConfig().disableGenericTypes();
see.getConfig().disableAutoGeneratedUIDs();
configureStateBackend(see);
final Properties producerProperties = new PropertiesBuilder()
.putAll(kafkaCommonOptions)
.putAll(kafkaProducerOptions)
.varFiles(valueFiles)
.build();
final KafkaProducerFactory producerFactory = KafkaProducerFactory.builder()
.semantic(Semantic.AT_LEAST_ONCE)
.config(producerProperties)
.build();
final AutoTopic autoTopic = AutoTopic.builder()
.config(producerProperties)
.partitions(autoCreateTopicsPartitions)
.replicationFactor(autoCreateTopicsReplicationFactor)
.doNotCreateTopics(ImmutableSet.of(
gspCfg, gspCustom, gspIxn, gspOutbound, gspSm
))
.build();
see.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.minutes(1)));
// since Flink 1.12 default stream characteristic is event time,
// so we don't need to set streamTimeCharacteristic, furthermore whole TimeCharacteristic enum
// is deprecated.
// If needed explicitly using processing-time windows and timers works in event-time mode.
addHeartbeats(see);
final TStateCleanupOnTimeout.Factory cleanupFactory =
new TStateCleanupOnTimeout.Factory(
maxCallDuration,
postmortemCallDuration,
globalAppConfig.timerGranularity()
);
@Nullable final SingleOutputStreamOperator<PbCfgDatum> cfgXform;
@Nullable final DataStream<PbCfgDatum> cfgSource = addSources(see,
SourceTopic.GCA_CFG,
new CfgJsonDeserializationSchema(),
(event, timestamp) -> event.getBatchId(),
it -> !it.getHeartbeat());
if (cfgSource != null) {
cfgXform = cfgSource
.keyBy(PbCfgDatum::getCcId)
.process(new CfgTransform())
.uid("xform-cfg")
.name("XForm Config");
if (!isNullOrEmpty(gspCfg)) {
cfgXform.addSink(producerFactory.create(gspCfg,
autoTopic.decorate(new CfgJsonSerializationSchema(gspCfg))))
.uid("uid-" + gspCfg)
.name(gspCfg);
} else {
cfgXform.addSink(new DiscardingSink<>())
.uid("uid-gsp-cfg-null")
.name("gsp-cfg-null");
}
} else {
cfgXform = null;
}
final DataStream<PbTcmDatum> voiceCallThreadSource = addSources(see,
SourceTopic.VOICE_CALL_THREAD,
callThreadFormat == KafkaTopicFormat.JSON
? new TJsonDeserializationSchema()
: new CallEventDeserializationSchema(),
(event, timestamp) ->
Instants.PROTO_TIMESTAMP_EPOCH.equals(event.getTimestamp())
? timestamp - deviationMillis
: Instants.toMillis(event.getTimestamp()),
event -> event.getType() != EventType.EVENT_UNKNOWN);
final SingleOutputStreamOperator<PbTcmDatum> tcmDataStream1 = voiceCallThreadSource
.keyBy(CallEventKey::new)
.process(new TIntakeProcessFunction(cleanupFactory))
.returns(PbTypes.TCM_DATUM)
.uid("intake-voice-calls")
.name("Intake voice calls");
DataStream<PbAgentStateDatum> allAgentStateEventsDataStream =
tcmDataStream1.getSideOutput(TIntakeProcessFunction.AGENT_STATE_EVENTS_TAG);
final SingleOutputStreamOperator<PbDIxnDatum> sortedIxnEventStream;
final SingleOutputStreamOperator<PbDIxnDatum> digitalPreTransform;
if (!isNullOrEmpty(digitalItx) && !isNullOrEmpty(digitalAgentStateTopic)) {
final DataStream<PbDIxnEnvelope> ixnIntake = addSources(see,
SourceTopic.DIGITAL_ITX,
new InteractionEventDeserializationSchema(),
(event, timestamp) -> Instants
.toMillis(event.getOrder().getTimestamp()),
event -> event.getId() != InteractionEventMessageName.EventHeartbeat.ordinal());
sortedIxnEventStream = ixnIntake
.keyBy(DixnEventKey::new)
.process(new UnwrapEnvelopeAndSort())
.uid("unwrap-sort")
.name("Sort Digital Interactions");
digitalPreTransform = sortedIxnEventStream
.keyBy(DixnEventKey::new)
.process(new DIxnPreTransformSplit())
.uid("split-digital-busy")
.name("Split Digital busy states");
allAgentStateEventsDataStream = union(
allAgentStateEventsDataStream,
digitalPreTransform.getSideOutput(DIxnPreTransformSplit.AGENT_STATE_EVENTS_TAG));
} else {
sortedIxnEventStream = null;
digitalPreTransform = null;
}
// --------
allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
addSources(see,
SourceTopic.VOICE_AGENT_STATE,
callThreadFormat == KafkaTopicFormat.JSON
? new AsdJsonDeserializationSchema() : new AsdAvroDeserializationSchema(),
this::timestamp,
event -> event.getId() != Id.UNKNOWN));
allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
addSources(see,
SourceTopic.DIGITAL_AGENT_STATE,
new AsdAvroDeserializationSchema(),
this::timestamp,
event -> event.getId() != Id.UNKNOWN));
@NotNull final SingleOutputStreamOperator<PbOcsDatum> ocsDataStream = Nullables
.requireNonNullElseGet(
addSources(see,
SourceTopic.VOICE_OUTBOUND,
new OcsJsonDeserializationSchema(),
this::timestamp,
event -> event.getEventCode() != EventCode.EVENT_OCS_NONE),
() -> see.addSource(new EmptyGenerator<PbOcsDatum>(Instant.MAX)))
.keyBy(OcsEventKey::new)
.process(new OcsIntakeProcessFunction())
.returns(PbTypes.OCS_DATUM)
.uid("intake-outbound")
.name("Intake Outbound");
final SingleOutputStreamOperator<PbAgentStateDatum> agentStatesIntake;
if (allAgentStateEventsDataStream != null) {
agentStatesIntake = allAgentStateEventsDataStream
.keyBy(PersonKey::new)
.process(new AgentStateIntakeProcessFunction())
.uid("intake-agent-states")
.name("Intake Agent States");
} else {
agentStatesIntake = null;
}
@Nullable final SingleOutputStreamOperator<PbAsdSubjectInfo> agentInfo;
if (cfgXform != null && agentStatesIntake != null) {
DataStream<PbPersonQuery> personQueryStream = union(
agentStatesIntake.getSideOutput(AgentStateIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG),
ocsDataStream.getSideOutput(OcsIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG)
);
agentInfo = cfgXform.getSideOutput(CfgTransform.PERSON_UPDATE_OUTPUT_TAG)
.connect(personQueryStream)
.keyBy(PersonKey::new, PersonKey::new)
.process(new CfgAgentInfoRegistry(globalAppConfig))
.uid("cfg-agent-info-provider")
.name("Agent Info");
} else {
agentInfo = null;
}
final SingleOutputStreamOperator<AgentLoginSession> allAgentLoginSession;
if (agentStatesIntake != null) {
allAgentLoginSession = agentStatesIntake.connect(agentInfo)
.keyBy(PersonKey::new, PersonKey::new)
.process(new AgentStateTransformation(
globalAppConfig,
glsAcwMode,
agentStateIdleTimeout))
.uid("xform-agent-states")
.name("Xform Agent States");
} else {
allAgentLoginSession = see.addSource(new EmptyGenerator<>());
}
if (!gspSm.isEmpty()) {
allAgentLoginSession
.addSink(producerFactory.create(
gspSm,
autoTopic.decorate(new AgentStatesSerializationSchema(outputTopicFormat, gspSm))))
.uid("uid-" + gspSm)
.name(gspSm);
} else {
allAgentLoginSession
.addSink(new DiscardingSink<>())
.uid("uid-gsp-sm-null")
.name("gsp-sm-null");
}
DataStream<Interaction> interactionDataStream = tcmDataStream1
.connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.VOICE_IRF_ACW))
.keyBy(CallEventKey::new, CallEventKey::new)
.process(new TInteractionsProcessFunction(globalAppConfig, cleanupFactory))
.uid("xform-voice-interactions")
.name("Xform Voice interactions");
if (digitalPreTransform != null) {
final DataStream<Interaction> ixnOutputStream = digitalPreTransform
.connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.DIGITAL_IRF))
.keyBy(DixnEventKey::new, DixnEventKey::new)
.process(new DixnTransform(globalAppConfig))
.uid("xform-digital-interactions")
.name("Xform Digital Interactions");
interactionDataStream = interactionDataStream.union(ixnOutputStream);
}
interactionDataStream = AsyncDataStream.orderedWait(
interactionDataStream,
new ResourceInfo(cfgCacheUrl, provideCfgDictionarySpec()),
cfgCacheTimeout.toMillis(),
TimeUnit.MILLISECONDS,
(ResourceInfo.IO_CAPACITY + see.getConfig().getParallelism() - 1) / see.getConfig()
.getParallelism())
.uid("uid-config-async-io")
.name("Enrich interactions with config");
if (!gspIxn.isEmpty()) {
interactionDataStream.addSink(
producerFactory.create(
gspIxn,
autoTopic.decorate(new InteractionSerializationSchema(
outputTopicFormat, gspIxn, producerFactory.config()))))
.uid("uid-" + gspIxn)
.name(gspIxn);
} else {
interactionDataStream
.addSink(new DiscardingSink<>())
.uid("uid-gsp-ixn-null")
.name("gsp-ixn-null");
}
if (!isNullOrEmpty(gspCustom)) {
final DataStream<PbUxDatum> uxEventsVoice = tcmDataStream1
.getSideOutput(TIntakeProcessFunction.UX_EVENTS_TAG);
final SingleOutputStreamOperator<CustomFact> customFactsVoice =
uxEventsVoice
.keyBy(UxKey::new)
.process(new UxToCustomFact(globalAppConfig))
.uid("xform-voice-ux")
.name("Xform Voice Ux");
SingleOutputStreamOperator<CustomFact> customFactsDigital = null;
if (sortedIxnEventStream != null) {
DataStream<PbUxDatum> uxEventsDigital = sortedIxnEventStream
.getSideOutput(UnwrapEnvelopeAndSort.UX_EVENTS_TAG);
customFactsDigital =
uxEventsDigital
.keyBy(UxKey::new)
.process(new UxToCustomFact(globalAppConfig))
.uid("xform-digital-ux")
.name("Xform Digital Ux");
}
final DataStream<CustomFact> customFacts = customFactsDigital == null ? customFactsVoice
: customFactsVoice.union(customFactsDigital);
customFacts.addSink(
producerFactory.create(
globalAppConfig.gspCustom(),
autoTopic.decorate(
new CustomFactSerializationSchema(
outputTopicFormat, globalAppConfig.gspCustom()))))
.uid("uid-" + globalAppConfig.gspCustom())
.name(globalAppConfig.gspCustom());
}
if (!isNullOrEmpty(voiceOutbound)) {
final OcsStateCleanupOnTimeout.Factory ocsStateCleanupFactory =
new OcsStateCleanupOnTimeout.Factory(
maxCampaignGroupSessionDuration,
globalAppConfig.timerGranularity()
);
@Nullable final DataStream<PbCafSubjectInfo> ocsAgentDetails = agentInfo != null
? agentInfo.getSideOutput(CfgAgentInfoRegistry.CAF_SUBJECT_INFO_OUTPUT_TAG) : null;
final SingleOutputStreamOperator<CampaignGroupSessionFact> outboundDataStream = ocsDataStream
.connect(ocsAgentDetails)
.keyBy(OcsEventKey::new, OcsEventKey::new)
.process(new OcsProcessFunction(globalAppConfig, ocsStateCleanupFactory))
.uid("xform-outbound")
.name("Xform Outbound");
if (!gspOutbound.isEmpty()) {
outboundDataStream.addSink(
producerFactory.create(
gspOutbound,
autoTopic.decorate(
new OutboundSerializationSchema(outputTopicFormat, gspOutbound))))
.uid("uid-" + globalAppConfig.gspOutbound())
.name(globalAppConfig.gspOutbound());
} else {
outboundDataStream
.addSink(new DiscardingSink<>())
.uid("uid-gsp-outbound-null")
.name("gsp-outbound-null");
}
}
TApp.main(see);
return see;
}
Thanks,
Alexey
From: Yun Gao <yu...@aliyun.com>
Sent: Monday, March 8, 2021 7:57 AM
To: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: How to check checkpointing mode
Hi Alexey,
Logically the setting in the code is of the highest
priority.
Could you show the complete code on the job creation ?
I think it seems to be not usual to enable checkpointing
with an anonymous StreamExecutionEnvironment.
Best,
Yun
------------------------------------------------------------------
From:Alexey Trenikhun <ye...@msn.com>
Send Time:2021 Mar. 6 (Sat.) 01:02
To:Flink User Mail List <us...@flink.apache.org>
Subject:How to check checkpointing mode
Hello,
My job sets checkpointing mode to at-least-once:
StreamExecutionEnvironment
.getExecutionEnvironment()
.enableCheckpointing(checkpointInterval.toMillis(),
CheckpointingMode.AT_LEAST_ONCE)
but Flink UI shows Checkpointing Mode: Exactly Once:
Why is that? Does Flink for some reason decide to ignore my setting (btw flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any other way to check what is actual checkpointing mode is?
Thanks,
Alexey
Re: How to check checkpointing mode
Posted by Alexey Trenikhun <ye...@msn.com>.
Hi Yun,
Thank you for looking, job creation is quite big, I've truncated helper methods dealing with command line parameters etc, below two major methods:
@Override
public Void call() throws Exception {
LOGGER.info("{}", new Info().toLog());
if (!allParameters.isEmpty()) {
// We don't expect any parameters, but Flink 1.12 adds JVM options to job args, since we add
// -- after jobs argument, this unnecessary for us arguments will be treated as positional
// parameters, which we ignore but log warning
LOGGER.warn("Unexpected parameters: {}", allParameters);
}
try {
final StreamExecutionEnvironment see = buildStreamExecutionEnvironment();
see.execute(name);
return null;
} catch (InterruptedException e) {
LOGGER.error("Stream Processor was interrupted", e);
Thread.currentThread().interrupt();
throw e;
} catch (Exception e) {
LOGGER.error("Stream Processor is terminated due to exception", e);
throw e;
}
}
private StreamExecutionEnvironment buildStreamExecutionEnvironment() throws IOException {
initDefaultKafkaSource();
final long deviationMillis = deviation.toMillis();
final GlobalAppConfig globalAppConfig = config();
final StreamExecutionEnvironment see = StreamExecutionEnvironment
.getExecutionEnvironment()
.enableCheckpointing(checkpointInterval.toMillis(),
CheckpointingMode.AT_LEAST_ONCE)
.setMaxParallelism(1024)
.setParallelism(parallelism);
if (externalizedCheckpoints) {
see.getCheckpointConfig()
.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
see.getConfig().disableGenericTypes();
see.getConfig().disableAutoGeneratedUIDs();
configureStateBackend(see);
final Properties producerProperties = new PropertiesBuilder()
.putAll(kafkaCommonOptions)
.putAll(kafkaProducerOptions)
.varFiles(valueFiles)
.build();
final KafkaProducerFactory producerFactory = KafkaProducerFactory.builder()
.semantic(Semantic.AT_LEAST_ONCE)
.config(producerProperties)
.build();
final AutoTopic autoTopic = AutoTopic.builder()
.config(producerProperties)
.partitions(autoCreateTopicsPartitions)
.replicationFactor(autoCreateTopicsReplicationFactor)
.doNotCreateTopics(ImmutableSet.of(
gspCfg, gspCustom, gspIxn, gspOutbound, gspSm
))
.build();
see.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.minutes(1)));
// since Flink 1.12 default stream characteristic is event time,
// so we don't need to set streamTimeCharacteristic, furthermore whole TimeCharacteristic enum
// is deprecated.
// If needed explicitly using processing-time windows and timers works in event-time mode.
addHeartbeats(see);
final TStateCleanupOnTimeout.Factory cleanupFactory =
new TStateCleanupOnTimeout.Factory(
maxCallDuration,
postmortemCallDuration,
globalAppConfig.timerGranularity()
);
@Nullable final SingleOutputStreamOperator<PbCfgDatum> cfgXform;
@Nullable final DataStream<PbCfgDatum> cfgSource = addSources(see,
SourceTopic.GCA_CFG,
new CfgJsonDeserializationSchema(),
(event, timestamp) -> event.getBatchId(),
it -> !it.getHeartbeat());
if (cfgSource != null) {
cfgXform = cfgSource
.keyBy(PbCfgDatum::getCcId)
.process(new CfgTransform())
.uid("xform-cfg")
.name("XForm Config");
if (!isNullOrEmpty(gspCfg)) {
cfgXform.addSink(producerFactory.create(gspCfg,
autoTopic.decorate(new CfgJsonSerializationSchema(gspCfg))))
.uid("uid-" + gspCfg)
.name(gspCfg);
} else {
cfgXform.addSink(new DiscardingSink<>())
.uid("uid-gsp-cfg-null")
.name("gsp-cfg-null");
}
} else {
cfgXform = null;
}
final DataStream<PbTcmDatum> voiceCallThreadSource = addSources(see,
SourceTopic.VOICE_CALL_THREAD,
callThreadFormat == KafkaTopicFormat.JSON
? new TJsonDeserializationSchema()
: new CallEventDeserializationSchema(),
(event, timestamp) ->
Instants.PROTO_TIMESTAMP_EPOCH.equals(event.getTimestamp())
? timestamp - deviationMillis
: Instants.toMillis(event.getTimestamp()),
event -> event.getType() != EventType.EVENT_UNKNOWN);
final SingleOutputStreamOperator<PbTcmDatum> tcmDataStream1 = voiceCallThreadSource
.keyBy(CallEventKey::new)
.process(new TIntakeProcessFunction(cleanupFactory))
.returns(PbTypes.TCM_DATUM)
.uid("intake-voice-calls")
.name("Intake voice calls");
DataStream<PbAgentStateDatum> allAgentStateEventsDataStream =
tcmDataStream1.getSideOutput(TIntakeProcessFunction.AGENT_STATE_EVENTS_TAG);
final SingleOutputStreamOperator<PbDIxnDatum> sortedIxnEventStream;
final SingleOutputStreamOperator<PbDIxnDatum> digitalPreTransform;
if (!isNullOrEmpty(digitalItx) && !isNullOrEmpty(digitalAgentStateTopic)) {
final DataStream<PbDIxnEnvelope> ixnIntake = addSources(see,
SourceTopic.DIGITAL_ITX,
new InteractionEventDeserializationSchema(),
(event, timestamp) -> Instants
.toMillis(event.getOrder().getTimestamp()),
event -> event.getId() != InteractionEventMessageName.EventHeartbeat.ordinal());
sortedIxnEventStream = ixnIntake
.keyBy(DixnEventKey::new)
.process(new UnwrapEnvelopeAndSort())
.uid("unwrap-sort")
.name("Sort Digital Interactions");
digitalPreTransform = sortedIxnEventStream
.keyBy(DixnEventKey::new)
.process(new DIxnPreTransformSplit())
.uid("split-digital-busy")
.name("Split Digital busy states");
allAgentStateEventsDataStream = union(
allAgentStateEventsDataStream,
digitalPreTransform.getSideOutput(DIxnPreTransformSplit.AGENT_STATE_EVENTS_TAG));
} else {
sortedIxnEventStream = null;
digitalPreTransform = null;
}
// --------
allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
addSources(see,
SourceTopic.VOICE_AGENT_STATE,
callThreadFormat == KafkaTopicFormat.JSON
? new AsdJsonDeserializationSchema() : new AsdAvroDeserializationSchema(),
this::timestamp,
event -> event.getId() != Id.UNKNOWN));
allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
addSources(see,
SourceTopic.DIGITAL_AGENT_STATE,
new AsdAvroDeserializationSchema(),
this::timestamp,
event -> event.getId() != Id.UNKNOWN));
@NotNull final SingleOutputStreamOperator<PbOcsDatum> ocsDataStream = Nullables
.requireNonNullElseGet(
addSources(see,
SourceTopic.VOICE_OUTBOUND,
new OcsJsonDeserializationSchema(),
this::timestamp,
event -> event.getEventCode() != EventCode.EVENT_OCS_NONE),
() -> see.addSource(new EmptyGenerator<PbOcsDatum>(Instant.MAX)))
.keyBy(OcsEventKey::new)
.process(new OcsIntakeProcessFunction())
.returns(PbTypes.OCS_DATUM)
.uid("intake-outbound")
.name("Intake Outbound");
final SingleOutputStreamOperator<PbAgentStateDatum> agentStatesIntake;
if (allAgentStateEventsDataStream != null) {
agentStatesIntake = allAgentStateEventsDataStream
.keyBy(PersonKey::new)
.process(new AgentStateIntakeProcessFunction())
.uid("intake-agent-states")
.name("Intake Agent States");
} else {
agentStatesIntake = null;
}
@Nullable final SingleOutputStreamOperator<PbAsdSubjectInfo> agentInfo;
if (cfgXform != null && agentStatesIntake != null) {
DataStream<PbPersonQuery> personQueryStream = union(
agentStatesIntake.getSideOutput(AgentStateIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG),
ocsDataStream.getSideOutput(OcsIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG)
);
agentInfo = cfgXform.getSideOutput(CfgTransform.PERSON_UPDATE_OUTPUT_TAG)
.connect(personQueryStream)
.keyBy(PersonKey::new, PersonKey::new)
.process(new CfgAgentInfoRegistry(globalAppConfig))
.uid("cfg-agent-info-provider")
.name("Agent Info");
} else {
agentInfo = null;
}
final SingleOutputStreamOperator<AgentLoginSession> allAgentLoginSession;
if (agentStatesIntake != null) {
allAgentLoginSession = agentStatesIntake.connect(agentInfo)
.keyBy(PersonKey::new, PersonKey::new)
.process(new AgentStateTransformation(
globalAppConfig,
glsAcwMode,
agentStateIdleTimeout))
.uid("xform-agent-states")
.name("Xform Agent States");
} else {
allAgentLoginSession = see.addSource(new EmptyGenerator<>());
}
if (!gspSm.isEmpty()) {
allAgentLoginSession
.addSink(producerFactory.create(
gspSm,
autoTopic.decorate(new AgentStatesSerializationSchema(outputTopicFormat, gspSm))))
.uid("uid-" + gspSm)
.name(gspSm);
} else {
allAgentLoginSession
.addSink(new DiscardingSink<>())
.uid("uid-gsp-sm-null")
.name("gsp-sm-null");
}
DataStream<Interaction> interactionDataStream = tcmDataStream1
.connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.VOICE_IRF_ACW))
.keyBy(CallEventKey::new, CallEventKey::new)
.process(new TInteractionsProcessFunction(globalAppConfig, cleanupFactory))
.uid("xform-voice-interactions")
.name("Xform Voice interactions");
if (digitalPreTransform != null) {
final DataStream<Interaction> ixnOutputStream = digitalPreTransform
.connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.DIGITAL_IRF))
.keyBy(DixnEventKey::new, DixnEventKey::new)
.process(new DixnTransform(globalAppConfig))
.uid("xform-digital-interactions")
.name("Xform Digital Interactions");
interactionDataStream = interactionDataStream.union(ixnOutputStream);
}
interactionDataStream = AsyncDataStream.orderedWait(
interactionDataStream,
new ResourceInfo(cfgCacheUrl, provideCfgDictionarySpec()),
cfgCacheTimeout.toMillis(),
TimeUnit.MILLISECONDS,
(ResourceInfo.IO_CAPACITY + see.getConfig().getParallelism() - 1) / see.getConfig()
.getParallelism())
.uid("uid-config-async-io")
.name("Enrich interactions with config");
if (!gspIxn.isEmpty()) {
interactionDataStream.addSink(
producerFactory.create(
gspIxn,
autoTopic.decorate(new InteractionSerializationSchema(
outputTopicFormat, gspIxn, producerFactory.config()))))
.uid("uid-" + gspIxn)
.name(gspIxn);
} else {
interactionDataStream
.addSink(new DiscardingSink<>())
.uid("uid-gsp-ixn-null")
.name("gsp-ixn-null");
}
if (!isNullOrEmpty(gspCustom)) {
final DataStream<PbUxDatum> uxEventsVoice = tcmDataStream1
.getSideOutput(TIntakeProcessFunction.UX_EVENTS_TAG);
final SingleOutputStreamOperator<CustomFact> customFactsVoice =
uxEventsVoice
.keyBy(UxKey::new)
.process(new UxToCustomFact(globalAppConfig))
.uid("xform-voice-ux")
.name("Xform Voice Ux");
SingleOutputStreamOperator<CustomFact> customFactsDigital = null;
if (sortedIxnEventStream != null) {
DataStream<PbUxDatum> uxEventsDigital = sortedIxnEventStream
.getSideOutput(UnwrapEnvelopeAndSort.UX_EVENTS_TAG);
customFactsDigital =
uxEventsDigital
.keyBy(UxKey::new)
.process(new UxToCustomFact(globalAppConfig))
.uid("xform-digital-ux")
.name("Xform Digital Ux");
}
final DataStream<CustomFact> customFacts = customFactsDigital == null ? customFactsVoice
: customFactsVoice.union(customFactsDigital);
customFacts.addSink(
producerFactory.create(
globalAppConfig.gspCustom(),
autoTopic.decorate(
new CustomFactSerializationSchema(
outputTopicFormat, globalAppConfig.gspCustom()))))
.uid("uid-" + globalAppConfig.gspCustom())
.name(globalAppConfig.gspCustom());
}
if (!isNullOrEmpty(voiceOutbound)) {
final OcsStateCleanupOnTimeout.Factory ocsStateCleanupFactory =
new OcsStateCleanupOnTimeout.Factory(
maxCampaignGroupSessionDuration,
globalAppConfig.timerGranularity()
);
@Nullable final DataStream<PbCafSubjectInfo> ocsAgentDetails = agentInfo != null
? agentInfo.getSideOutput(CfgAgentInfoRegistry.CAF_SUBJECT_INFO_OUTPUT_TAG) : null;
final SingleOutputStreamOperator<CampaignGroupSessionFact> outboundDataStream = ocsDataStream
.connect(ocsAgentDetails)
.keyBy(OcsEventKey::new, OcsEventKey::new)
.process(new OcsProcessFunction(globalAppConfig, ocsStateCleanupFactory))
.uid("xform-outbound")
.name("Xform Outbound");
if (!gspOutbound.isEmpty()) {
outboundDataStream.addSink(
producerFactory.create(
gspOutbound,
autoTopic.decorate(
new OutboundSerializationSchema(outputTopicFormat, gspOutbound))))
.uid("uid-" + globalAppConfig.gspOutbound())
.name(globalAppConfig.gspOutbound());
} else {
outboundDataStream
.addSink(new DiscardingSink<>())
.uid("uid-gsp-outbound-null")
.name("gsp-outbound-null");
}
}
TApp.main(see);
return see;
}
Thanks,
Alexey
________________________________
From: Yun Gao <yu...@aliyun.com>
Sent: Monday, March 8, 2021 7:57 AM
To: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: How to check checkpointing mode
Hi Alexey,
Logically the setting in the code is of the highest
priority.
Could you show the complete code on the job creation ?
I think it seems to be not usual to enable checkpointing
with an anonymous StreamExecutionEnvironment.
Best,
Yun
------------------------------------------------------------------
From:Alexey Trenikhun <ye...@msn.com>
Send Time:2021 Mar. 6 (Sat.) 01:02
To:Flink User Mail List <us...@flink.apache.org>
Subject:How to check checkpointing mode
Hello,
My job sets checkpointing mode to at-least-once:
StreamExecutionEnvironment
.getExecutionEnvironment()
.enableCheckpointing(checkpointInterval.toMillis(),
CheckpointingMode.AT_LEAST_ONCE)
but Flink UI shows Checkpointing Mode: Exactly Once:
[cid:__aliyun161521903696247551]
Why is that? Does Flink for some reason decide to ignore my setting (btw flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any other way to check what is actual checkpointing mode is?
Thanks,
Alexey
Re: How to check checkpointing mode
Posted by Yun Gao <yu...@aliyun.com>.
Hi Alexey,
Logically the setting in the code is of the highest
priority.
Could you show the complete code on the job creation ?
I think it seems to be not usual to enable checkpointing
with an anonymous StreamExecutionEnvironment.
Best,
Yun
------------------------------------------------------------------
From:Alexey Trenikhun <ye...@msn.com>
Send Time:2021 Mar. 6 (Sat.) 01:02
To:Flink User Mail List <us...@flink.apache.org>
Subject:How to check checkpointing mode
Hello,
My job sets checkpointing mode to at-least-once:
StreamExecutionEnvironment
.getExecutionEnvironment()
.enableCheckpointing(checkpointInterval.toMillis(),
CheckpointingMode.AT_LEAST_ONCE)
but Flink UI shows Checkpointing Mode: Exactly Once:
Why is that? Does Flink for some reason decide to ignore my setting (btw flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any other way to check what is actual checkpointing mode is?
Thanks,
Alexey