You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexey Trenikhun <ye...@msn.com> on 2021/03/05 16:47:42 UTC

How to check checkpointing mode

Hello,

My job sets checkpointing mode to at-least-once:


StreamExecutionEnvironment
    .getExecutionEnvironment()
    .enableCheckpointing(checkpointInterval.toMillis(),
        CheckpointingMode.AT_LEAST_ONCE)

but Flink UI shows Checkpointing Mode: Exactly Once:
[cid:83ef77cf-356c-4f06-9873-f803baecee9d]

Why is that? Does Flink for some reason decide to ignore my setting (btw flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any other way to check what is actual checkpointing mode is?

Thanks,
Alexey

Re: Re: How to check checkpointing mode

Posted by Alexey Trenikhun <ye...@msn.com>.
Hi Yun,
It is confusing but UI now shows expected value "At Least Once" (obviously checkpointCfg#checkpointingMode shows AT_LEAST_ONCE as well). Clearly I've either looked in wrong place or job was not upgraded when I changed checkpointing mode ...

Sorry for noise and thank you for your help

Alexey

________________________________
From: Yun Gao <yu...@aliyun.com>
Sent: Monday, March 8, 2021 7:14 PM
To: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Re: How to check checkpointing mode

Hi Alexey,

Sorry I also do not see problems in the attached code. Could you add
a breakpoint at `see.execute(name)` and have a look at the value of
see#checkpointCfg#checkpointingMode ?

Best,
Yun

------------------Original Mail ------------------
Sender:Alexey Trenikhun <ye...@msn.com>
Send Date:Tue Mar 9 07:25:31 2021
Recipients:Flink User Mail List <us...@flink.apache.org>, Yun Gao <yu...@aliyun.com>
Subject:Re: How to check checkpointing mode
Hi Yun,
Thank you for looking, job creation is quite big, I've truncated helper methods dealing with command line parameters etc, below two major methods:


@Override

public Void call() throws Exception {
  LOGGER.info("{}", new Info().toLog());


  if (!allParameters.isEmpty()) {
    // We don't expect any parameters, but Flink 1.12 adds JVM options to job args, since we add
    // -- after jobs argument, this unnecessary for us arguments will be treated as positional
    // parameters, which we ignore but log warning
    LOGGER.warn("Unexpected parameters: {}", allParameters);
  }
  try {
    final StreamExecutionEnvironment see = buildStreamExecutionEnvironment();
    see.execute(name);
    return null;
  } catch (InterruptedException e) {
    LOGGER.error("Stream Processor was interrupted", e);
    Thread.currentThread().interrupt();
    throw e;
  } catch (Exception e) {
    LOGGER.error("Stream Processor is terminated due to exception", e);
    throw e;
  }
}


private StreamExecutionEnvironment buildStreamExecutionEnvironment() throws IOException {
  initDefaultKafkaSource();
  final long deviationMillis = deviation.toMillis();
  final GlobalAppConfig globalAppConfig = config();
  final StreamExecutionEnvironment see = StreamExecutionEnvironment
      .getExecutionEnvironment()
      .enableCheckpointing(checkpointInterval.toMillis(),
          CheckpointingMode.AT_LEAST_ONCE)
      .setMaxParallelism(1024)
      .setParallelism(parallelism);
  if (externalizedCheckpoints) {
    see.getCheckpointConfig()
        .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  }
  see.getConfig().disableGenericTypes();
  see.getConfig().disableAutoGeneratedUIDs();
  configureStateBackend(see);

  final Properties producerProperties = new PropertiesBuilder()
      .putAll(kafkaCommonOptions)
      .putAll(kafkaProducerOptions)
      .varFiles(valueFiles)
      .build();

  final KafkaProducerFactory producerFactory = KafkaProducerFactory.builder()
      .semantic(Semantic.AT_LEAST_ONCE)
      .config(producerProperties)
      .build();

  final AutoTopic autoTopic = AutoTopic.builder()
      .config(producerProperties)
      .partitions(autoCreateTopicsPartitions)
      .replicationFactor(autoCreateTopicsReplicationFactor)
      .doNotCreateTopics(ImmutableSet.of(
          gspCfg, gspCustom, gspIxn, gspOutbound, gspSm
      ))
      .build();

  see.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.minutes(1)));
  // since Flink 1.12 default stream characteristic is event time,
  // so we don't need to set streamTimeCharacteristic, furthermore whole TimeCharacteristic enum
  // is deprecated.
  // If needed explicitly using processing-time windows and timers works in event-time mode.

  addHeartbeats(see);
  final TStateCleanupOnTimeout.Factory cleanupFactory =
      new TStateCleanupOnTimeout.Factory(
          maxCallDuration,
          postmortemCallDuration,
          globalAppConfig.timerGranularity()
      );

  @Nullable final SingleOutputStreamOperator<PbCfgDatum> cfgXform;
  @Nullable final DataStream<PbCfgDatum> cfgSource = addSources(see,
      SourceTopic.GCA_CFG,
      new CfgJsonDeserializationSchema(),
      (event, timestamp) -> event.getBatchId(),
      it -> !it.getHeartbeat());

  if (cfgSource != null) {
    cfgXform = cfgSource
        .keyBy(PbCfgDatum::getCcId)
        .process(new CfgTransform())
        .uid("xform-cfg")
        .name("XForm Config");

    if (!isNullOrEmpty(gspCfg)) {
      cfgXform.addSink(producerFactory.create(gspCfg,
          autoTopic.decorate(new CfgJsonSerializationSchema(gspCfg))))
          .uid("uid-" + gspCfg)
          .name(gspCfg);
    } else {
      cfgXform.addSink(new DiscardingSink<>())
          .uid("uid-gsp-cfg-null")
          .name("gsp-cfg-null");
    }
  } else {
    cfgXform = null;
  }

  final DataStream<PbTcmDatum> voiceCallThreadSource = addSources(see,
      SourceTopic.VOICE_CALL_THREAD,
      callThreadFormat == KafkaTopicFormat.JSON
          ? new TJsonDeserializationSchema()
          : new CallEventDeserializationSchema(),
      (event, timestamp) ->
          Instants.PROTO_TIMESTAMP_EPOCH.equals(event.getTimestamp())
              ? timestamp - deviationMillis
              : Instants.toMillis(event.getTimestamp()),
      event -> event.getType() != EventType.EVENT_UNKNOWN);

  final SingleOutputStreamOperator<PbTcmDatum> tcmDataStream1 = voiceCallThreadSource
      .keyBy(CallEventKey::new)
      .process(new TIntakeProcessFunction(cleanupFactory))
      .returns(PbTypes.TCM_DATUM)
      .uid("intake-voice-calls")
      .name("Intake voice calls");

  DataStream<PbAgentStateDatum> allAgentStateEventsDataStream =
      tcmDataStream1.getSideOutput(TIntakeProcessFunction.AGENT_STATE_EVENTS_TAG);

  final SingleOutputStreamOperator<PbDIxnDatum> sortedIxnEventStream;
  final SingleOutputStreamOperator<PbDIxnDatum> digitalPreTransform;

  if (!isNullOrEmpty(digitalItx) && !isNullOrEmpty(digitalAgentStateTopic)) {
    final DataStream<PbDIxnEnvelope> ixnIntake = addSources(see,
        SourceTopic.DIGITAL_ITX,
        new InteractionEventDeserializationSchema(),
        (event, timestamp) -> Instants
            .toMillis(event.getOrder().getTimestamp()),
        event -> event.getId() != InteractionEventMessageName.EventHeartbeat.ordinal());

    sortedIxnEventStream = ixnIntake
        .keyBy(DixnEventKey::new)
        .process(new UnwrapEnvelopeAndSort())
        .uid("unwrap-sort")
        .name("Sort Digital Interactions");

    digitalPreTransform = sortedIxnEventStream
        .keyBy(DixnEventKey::new)
        .process(new DIxnPreTransformSplit())
        .uid("split-digital-busy")
        .name("Split Digital busy states");
    allAgentStateEventsDataStream = union(
        allAgentStateEventsDataStream,
        digitalPreTransform.getSideOutput(DIxnPreTransformSplit.AGENT_STATE_EVENTS_TAG));
  } else {
    sortedIxnEventStream = null;
    digitalPreTransform = null;
  }

  // --------
  allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
      addSources(see,
          SourceTopic.VOICE_AGENT_STATE,
          callThreadFormat == KafkaTopicFormat.JSON
              ? new AsdJsonDeserializationSchema() : new AsdAvroDeserializationSchema(),
          this::timestamp,
          event -> event.getId() != Id.UNKNOWN));

  allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
      addSources(see,
          SourceTopic.DIGITAL_AGENT_STATE,
          new AsdAvroDeserializationSchema(),
          this::timestamp,
          event -> event.getId() != Id.UNKNOWN));

  @NotNull final SingleOutputStreamOperator<PbOcsDatum> ocsDataStream = Nullables
      .requireNonNullElseGet(
          addSources(see,
              SourceTopic.VOICE_OUTBOUND,
              new OcsJsonDeserializationSchema(),
              this::timestamp,
              event -> event.getEventCode() != EventCode.EVENT_OCS_NONE),
          () -> see.addSource(new EmptyGenerator<PbOcsDatum>(Instant.MAX)))
      .keyBy(OcsEventKey::new)
      .process(new OcsIntakeProcessFunction())
      .returns(PbTypes.OCS_DATUM)
      .uid("intake-outbound")
      .name("Intake Outbound");

  final SingleOutputStreamOperator<PbAgentStateDatum> agentStatesIntake;
  if (allAgentStateEventsDataStream != null) {
    agentStatesIntake = allAgentStateEventsDataStream
        .keyBy(PersonKey::new)
        .process(new AgentStateIntakeProcessFunction())
        .uid("intake-agent-states")
        .name("Intake Agent States");
  } else {
    agentStatesIntake = null;
  }

  @Nullable final SingleOutputStreamOperator<PbAsdSubjectInfo> agentInfo;
  if (cfgXform != null && agentStatesIntake != null) {
    DataStream<PbPersonQuery> personQueryStream = union(
        agentStatesIntake.getSideOutput(AgentStateIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG),
        ocsDataStream.getSideOutput(OcsIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG)
    );

    agentInfo = cfgXform.getSideOutput(CfgTransform.PERSON_UPDATE_OUTPUT_TAG)
        .connect(personQueryStream)
        .keyBy(PersonKey::new, PersonKey::new)
        .process(new CfgAgentInfoRegistry(globalAppConfig))
        .uid("cfg-agent-info-provider")
        .name("Agent Info");
  } else {
    agentInfo = null;
  }

  final SingleOutputStreamOperator<AgentLoginSession> allAgentLoginSession;
  if (agentStatesIntake != null) {
    allAgentLoginSession = agentStatesIntake.connect(agentInfo)
        .keyBy(PersonKey::new, PersonKey::new)
        .process(new AgentStateTransformation(
            globalAppConfig,
            glsAcwMode,
            agentStateIdleTimeout))
        .uid("xform-agent-states")
        .name("Xform Agent States");
  } else {
    allAgentLoginSession = see.addSource(new EmptyGenerator<>());
  }

  if (!gspSm.isEmpty()) {
    allAgentLoginSession
        .addSink(producerFactory.create(
            gspSm,
            autoTopic.decorate(new AgentStatesSerializationSchema(outputTopicFormat, gspSm))))
        .uid("uid-" + gspSm)
        .name(gspSm);
  } else {
    allAgentLoginSession
        .addSink(new DiscardingSink<>())
        .uid("uid-gsp-sm-null")
        .name("gsp-sm-null");
  }

  DataStream<Interaction> interactionDataStream = tcmDataStream1
      .connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.VOICE_IRF_ACW))
      .keyBy(CallEventKey::new, CallEventKey::new)
      .process(new TInteractionsProcessFunction(globalAppConfig, cleanupFactory))
      .uid("xform-voice-interactions")
      .name("Xform Voice interactions");

  if (digitalPreTransform != null) {
    final DataStream<Interaction> ixnOutputStream = digitalPreTransform
        .connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.DIGITAL_IRF))
        .keyBy(DixnEventKey::new, DixnEventKey::new)
        .process(new DixnTransform(globalAppConfig))
        .uid("xform-digital-interactions")
        .name("Xform Digital Interactions");
    interactionDataStream = interactionDataStream.union(ixnOutputStream);
  }

  interactionDataStream = AsyncDataStream.orderedWait(
      interactionDataStream,
      new ResourceInfo(cfgCacheUrl, provideCfgDictionarySpec()),
      cfgCacheTimeout.toMillis(),
      TimeUnit.MILLISECONDS,
      (ResourceInfo.IO_CAPACITY + see.getConfig().getParallelism() - 1) / see.getConfig()
          .getParallelism())
      .uid("uid-config-async-io")
      .name("Enrich interactions with config");

  if (!gspIxn.isEmpty()) {
    interactionDataStream.addSink(
        producerFactory.create(
            gspIxn,
            autoTopic.decorate(new InteractionSerializationSchema(
                outputTopicFormat, gspIxn, producerFactory.config()))))
        .uid("uid-" + gspIxn)
        .name(gspIxn);
  } else {
    interactionDataStream
        .addSink(new DiscardingSink<>())
        .uid("uid-gsp-ixn-null")
        .name("gsp-ixn-null");
  }

  if (!isNullOrEmpty(gspCustom)) {
    final DataStream<PbUxDatum> uxEventsVoice = tcmDataStream1
        .getSideOutput(TIntakeProcessFunction.UX_EVENTS_TAG);
    final SingleOutputStreamOperator<CustomFact> customFactsVoice =
        uxEventsVoice
            .keyBy(UxKey::new)
            .process(new UxToCustomFact(globalAppConfig))
            .uid("xform-voice-ux")
            .name("Xform Voice Ux");

    SingleOutputStreamOperator<CustomFact> customFactsDigital = null;
    if (sortedIxnEventStream != null) {
      DataStream<PbUxDatum> uxEventsDigital = sortedIxnEventStream
          .getSideOutput(UnwrapEnvelopeAndSort.UX_EVENTS_TAG);
      customFactsDigital =
          uxEventsDigital
              .keyBy(UxKey::new)
              .process(new UxToCustomFact(globalAppConfig))
              .uid("xform-digital-ux")
              .name("Xform Digital Ux");
    }

    final DataStream<CustomFact> customFacts = customFactsDigital == null ? customFactsVoice
        : customFactsVoice.union(customFactsDigital);
    customFacts.addSink(
        producerFactory.create(
            globalAppConfig.gspCustom(),
            autoTopic.decorate(
                new CustomFactSerializationSchema(
                    outputTopicFormat, globalAppConfig.gspCustom()))))
        .uid("uid-" + globalAppConfig.gspCustom())
        .name(globalAppConfig.gspCustom());
  }

  if (!isNullOrEmpty(voiceOutbound)) {
    final OcsStateCleanupOnTimeout.Factory ocsStateCleanupFactory =
        new OcsStateCleanupOnTimeout.Factory(
            maxCampaignGroupSessionDuration,
            globalAppConfig.timerGranularity()
        );

    @Nullable final DataStream<PbCafSubjectInfo> ocsAgentDetails = agentInfo != null
        ? agentInfo.getSideOutput(CfgAgentInfoRegistry.CAF_SUBJECT_INFO_OUTPUT_TAG) : null;

    final SingleOutputStreamOperator<CampaignGroupSessionFact> outboundDataStream = ocsDataStream
        .connect(ocsAgentDetails)
        .keyBy(OcsEventKey::new, OcsEventKey::new)
        .process(new OcsProcessFunction(globalAppConfig, ocsStateCleanupFactory))
        .uid("xform-outbound")
        .name("Xform Outbound");

    if (!gspOutbound.isEmpty()) {
      outboundDataStream.addSink(
          producerFactory.create(
              gspOutbound,
              autoTopic.decorate(
                  new OutboundSerializationSchema(outputTopicFormat, gspOutbound))))
          .uid("uid-" + globalAppConfig.gspOutbound())
          .name(globalAppConfig.gspOutbound());
    } else {
      outboundDataStream
          .addSink(new DiscardingSink<>())
          .uid("uid-gsp-outbound-null")
          .name("gsp-outbound-null");
    }
  }

  TApp.main(see);
  return see;
}

Thanks,
Alexey
________________________________
From: Yun Gao <yu...@aliyun.com>
Sent: Monday, March 8, 2021 7:57 AM
To: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: How to check checkpointing mode

Hi Alexey,

Logically the setting in the code is of the highest
priority.

Could you show the complete code on the job creation ?
I think it seems to be not usual to enable checkpointing
with an anonymous StreamExecutionEnvironment.


Best,
Yun

------------------------------------------------------------------
From:Alexey Trenikhun <ye...@msn.com>
Send Time:2021 Mar. 6 (Sat.) 01:02
To:Flink User Mail List <us...@flink.apache.org>
Subject:How to check checkpointing mode

Hello,

My job sets checkpointing mode to at-least-once:


StreamExecutionEnvironment
    .getExecutionEnvironment()
    .enableCheckpointing(checkpointInterval.toMillis(),
        CheckpointingMode.AT_LEAST_ONCE)

but Flink UI shows Checkpointing Mode: Exactly Once:
[cid:__aliyun161521903696247551]

Why is that? Does Flink for some reason decide to ignore my setting (btw flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any other way to check what is actual checkpointing mode is?

Thanks,
Alexey

Re: Re: How to check checkpointing mode

Posted by Yun Gao <yu...@aliyun.com>.
Hi Alexey,

Sorry I also do not see problems in the attached code. Could you add
a breakpoint at `see.execute(name)` and have a look at the value of 
see#checkpointCfg#checkpointingMode ?

Best,
Yun


 ------------------Original Mail ------------------
Sender:Alexey Trenikhun <ye...@msn.com>
Send Date:Tue Mar 9 07:25:31 2021
Recipients:Flink User Mail List <us...@flink.apache.org>, Yun Gao <yu...@aliyun.com>
Subject:Re: How to check checkpointing mode

Hi Yun,
Thank you for looking, job creation is quite big, I've truncated helper methods dealing with command line parameters etc, below two major methods:

@Override

public Void call() throws Exception {
  LOGGER.info("{}", new Info().toLog());


  if (!allParameters.isEmpty()) {
    // We don't expect any parameters, but Flink 1.12 adds JVM options to job args, since we add
    // -- after jobs argument, this unnecessary for us arguments will be treated as positional
    // parameters, which we ignore but log warning
    LOGGER.warn("Unexpected parameters: {}", allParameters);
  }
  try {
    final StreamExecutionEnvironment see = buildStreamExecutionEnvironment();
    see.execute(name);
    return null;
  } catch (InterruptedException e) {
    LOGGER.error("Stream Processor was interrupted", e);
    Thread.currentThread().interrupt();
    throw e;
  } catch (Exception e) {
    LOGGER.error("Stream Processor is terminated due to exception", e);
    throw e;
  }
}
private StreamExecutionEnvironment buildStreamExecutionEnvironment() throws IOException {
  initDefaultKafkaSource();
  final long deviationMillis = deviation.toMillis();
  final GlobalAppConfig globalAppConfig = config();
  final StreamExecutionEnvironment see = StreamExecutionEnvironment
      .getExecutionEnvironment()
      .enableCheckpointing(checkpointInterval.toMillis(),
          CheckpointingMode.AT_LEAST_ONCE)
      .setMaxParallelism(1024)
      .setParallelism(parallelism);
  if (externalizedCheckpoints) {
    see.getCheckpointConfig()
        .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  }
  see.getConfig().disableGenericTypes();
  see.getConfig().disableAutoGeneratedUIDs();
  configureStateBackend(see);

  final Properties producerProperties = new PropertiesBuilder()
      .putAll(kafkaCommonOptions)
      .putAll(kafkaProducerOptions)
      .varFiles(valueFiles)
      .build();

  final KafkaProducerFactory producerFactory = KafkaProducerFactory.builder()
      .semantic(Semantic.AT_LEAST_ONCE)
      .config(producerProperties)
      .build();

  final AutoTopic autoTopic = AutoTopic.builder()
      .config(producerProperties)
      .partitions(autoCreateTopicsPartitions)
      .replicationFactor(autoCreateTopicsReplicationFactor)
      .doNotCreateTopics(ImmutableSet.of(
          gspCfg, gspCustom, gspIxn, gspOutbound, gspSm
      ))
      .build();

  see.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.minutes(1)));
  // since Flink 1.12 default stream characteristic is event time,
  // so we don't need to set streamTimeCharacteristic, furthermore whole TimeCharacteristic enum
  // is deprecated.
  // If needed explicitly using processing-time windows and timers works in event-time mode.

  addHeartbeats(see);
  final TStateCleanupOnTimeout.Factory cleanupFactory =
      new TStateCleanupOnTimeout.Factory(
          maxCallDuration,
          postmortemCallDuration,
          globalAppConfig.timerGranularity()
      );

  @Nullable final SingleOutputStreamOperator<PbCfgDatum> cfgXform;
  @Nullable final DataStream<PbCfgDatum> cfgSource = addSources(see,
      SourceTopic.GCA_CFG,
      new CfgJsonDeserializationSchema(),
      (event, timestamp) -> event.getBatchId(),
      it -> !it.getHeartbeat());

  if (cfgSource != null) {
    cfgXform = cfgSource
        .keyBy(PbCfgDatum::getCcId)
        .process(new CfgTransform())
        .uid("xform-cfg")
        .name("XForm Config");

    if (!isNullOrEmpty(gspCfg)) {
      cfgXform.addSink(producerFactory.create(gspCfg,
          autoTopic.decorate(new CfgJsonSerializationSchema(gspCfg))))
          .uid("uid-" + gspCfg)
          .name(gspCfg);
    } else {
      cfgXform.addSink(new DiscardingSink<>())
          .uid("uid-gsp-cfg-null")
          .name("gsp-cfg-null");
    }
  } else {
    cfgXform = null;
  }

  final DataStream<PbTcmDatum> voiceCallThreadSource = addSources(see,
      SourceTopic.VOICE_CALL_THREAD,
      callThreadFormat == KafkaTopicFormat.JSON
          ? new TJsonDeserializationSchema()
          : new CallEventDeserializationSchema(),
      (event, timestamp) ->
          Instants.PROTO_TIMESTAMP_EPOCH.equals(event.getTimestamp())
              ? timestamp - deviationMillis
              : Instants.toMillis(event.getTimestamp()),
      event -> event.getType() != EventType.EVENT_UNKNOWN);

  final SingleOutputStreamOperator<PbTcmDatum> tcmDataStream1 = voiceCallThreadSource
      .keyBy(CallEventKey::new)
      .process(new TIntakeProcessFunction(cleanupFactory))
      .returns(PbTypes.TCM_DATUM)
      .uid("intake-voice-calls")
      .name("Intake voice calls");

  DataStream<PbAgentStateDatum> allAgentStateEventsDataStream =
      tcmDataStream1.getSideOutput(TIntakeProcessFunction.AGENT_STATE_EVENTS_TAG);

  final SingleOutputStreamOperator<PbDIxnDatum> sortedIxnEventStream;
  final SingleOutputStreamOperator<PbDIxnDatum> digitalPreTransform;

  if (!isNullOrEmpty(digitalItx) && !isNullOrEmpty(digitalAgentStateTopic)) {
    final DataStream<PbDIxnEnvelope> ixnIntake = addSources(see,
        SourceTopic.DIGITAL_ITX,
        new InteractionEventDeserializationSchema(),
        (event, timestamp) -> Instants
            .toMillis(event.getOrder().getTimestamp()),
        event -> event.getId() != InteractionEventMessageName.EventHeartbeat.ordinal());

    sortedIxnEventStream = ixnIntake
        .keyBy(DixnEventKey::new)
        .process(new UnwrapEnvelopeAndSort())
        .uid("unwrap-sort")
        .name("Sort Digital Interactions");

    digitalPreTransform = sortedIxnEventStream
        .keyBy(DixnEventKey::new)
        .process(new DIxnPreTransformSplit())
        .uid("split-digital-busy")
        .name("Split Digital busy states");
    allAgentStateEventsDataStream = union(
        allAgentStateEventsDataStream,
        digitalPreTransform.getSideOutput(DIxnPreTransformSplit.AGENT_STATE_EVENTS_TAG));
  } else {
    sortedIxnEventStream = null;
    digitalPreTransform = null;
  }

  // --------
  allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
      addSources(see,
          SourceTopic.VOICE_AGENT_STATE,
          callThreadFormat == KafkaTopicFormat.JSON
              ? new AsdJsonDeserializationSchema() : new AsdAvroDeserializationSchema(),
          this::timestamp,
          event -> event.getId() != Id.UNKNOWN));

  allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
      addSources(see,
          SourceTopic.DIGITAL_AGENT_STATE,
          new AsdAvroDeserializationSchema(),
          this::timestamp,
          event -> event.getId() != Id.UNKNOWN));

  @NotNull final SingleOutputStreamOperator<PbOcsDatum> ocsDataStream = Nullables
      .requireNonNullElseGet(
          addSources(see,
              SourceTopic.VOICE_OUTBOUND,
              new OcsJsonDeserializationSchema(),
              this::timestamp,
              event -> event.getEventCode() != EventCode.EVENT_OCS_NONE),
          () -> see.addSource(new EmptyGenerator<PbOcsDatum>(Instant.MAX)))
      .keyBy(OcsEventKey::new)
      .process(new OcsIntakeProcessFunction())
      .returns(PbTypes.OCS_DATUM)
      .uid("intake-outbound")
      .name("Intake Outbound");

  final SingleOutputStreamOperator<PbAgentStateDatum> agentStatesIntake;
  if (allAgentStateEventsDataStream != null) {
    agentStatesIntake = allAgentStateEventsDataStream
        .keyBy(PersonKey::new)
        .process(new AgentStateIntakeProcessFunction())
        .uid("intake-agent-states")
        .name("Intake Agent States");
  } else {
    agentStatesIntake = null;
  }

  @Nullable final SingleOutputStreamOperator<PbAsdSubjectInfo> agentInfo;
  if (cfgXform != null && agentStatesIntake != null) {
    DataStream<PbPersonQuery> personQueryStream = union(
        agentStatesIntake.getSideOutput(AgentStateIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG),
        ocsDataStream.getSideOutput(OcsIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG)
    );

    agentInfo = cfgXform.getSideOutput(CfgTransform.PERSON_UPDATE_OUTPUT_TAG)
        .connect(personQueryStream)
        .keyBy(PersonKey::new, PersonKey::new)
        .process(new CfgAgentInfoRegistry(globalAppConfig))
        .uid("cfg-agent-info-provider")
        .name("Agent Info");
  } else {
    agentInfo = null;
  }

  final SingleOutputStreamOperator<AgentLoginSession> allAgentLoginSession;
  if (agentStatesIntake != null) {
    allAgentLoginSession = agentStatesIntake.connect(agentInfo)
        .keyBy(PersonKey::new, PersonKey::new)
        .process(new AgentStateTransformation(
            globalAppConfig,
            glsAcwMode,
            agentStateIdleTimeout))
        .uid("xform-agent-states")
        .name("Xform Agent States");
  } else {
    allAgentLoginSession = see.addSource(new EmptyGenerator<>());
  }

  if (!gspSm.isEmpty()) {
    allAgentLoginSession
        .addSink(producerFactory.create(
            gspSm,
            autoTopic.decorate(new AgentStatesSerializationSchema(outputTopicFormat, gspSm))))
        .uid("uid-" + gspSm)
        .name(gspSm);
  } else {
    allAgentLoginSession
        .addSink(new DiscardingSink<>())
        .uid("uid-gsp-sm-null")
        .name("gsp-sm-null");
  }

  DataStream<Interaction> interactionDataStream = tcmDataStream1
      .connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.VOICE_IRF_ACW))
      .keyBy(CallEventKey::new, CallEventKey::new)
      .process(new TInteractionsProcessFunction(globalAppConfig, cleanupFactory))
      .uid("xform-voice-interactions")
      .name("Xform Voice interactions");

  if (digitalPreTransform != null) {
    final DataStream<Interaction> ixnOutputStream = digitalPreTransform
        .connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.DIGITAL_IRF))
        .keyBy(DixnEventKey::new, DixnEventKey::new)
        .process(new DixnTransform(globalAppConfig))
        .uid("xform-digital-interactions")
        .name("Xform Digital Interactions");
    interactionDataStream = interactionDataStream.union(ixnOutputStream);
  }

  interactionDataStream = AsyncDataStream.orderedWait(
      interactionDataStream,
      new ResourceInfo(cfgCacheUrl, provideCfgDictionarySpec()),
      cfgCacheTimeout.toMillis(),
      TimeUnit.MILLISECONDS,
      (ResourceInfo.IO_CAPACITY + see.getConfig().getParallelism() - 1) / see.getConfig()
          .getParallelism())
      .uid("uid-config-async-io")
      .name("Enrich interactions with config");

  if (!gspIxn.isEmpty()) {
    interactionDataStream.addSink(
        producerFactory.create(
            gspIxn,
            autoTopic.decorate(new InteractionSerializationSchema(
                outputTopicFormat, gspIxn, producerFactory.config()))))
        .uid("uid-" + gspIxn)
        .name(gspIxn);
  } else {
    interactionDataStream
        .addSink(new DiscardingSink<>())
        .uid("uid-gsp-ixn-null")
        .name("gsp-ixn-null");
  }

  if (!isNullOrEmpty(gspCustom)) {
    final DataStream<PbUxDatum> uxEventsVoice = tcmDataStream1
        .getSideOutput(TIntakeProcessFunction.UX_EVENTS_TAG);
    final SingleOutputStreamOperator<CustomFact> customFactsVoice =
        uxEventsVoice
            .keyBy(UxKey::new)
            .process(new UxToCustomFact(globalAppConfig))
            .uid("xform-voice-ux")
            .name("Xform Voice Ux");

    SingleOutputStreamOperator<CustomFact> customFactsDigital = null;
    if (sortedIxnEventStream != null) {
      DataStream<PbUxDatum> uxEventsDigital = sortedIxnEventStream
          .getSideOutput(UnwrapEnvelopeAndSort.UX_EVENTS_TAG);
      customFactsDigital =
          uxEventsDigital
              .keyBy(UxKey::new)
              .process(new UxToCustomFact(globalAppConfig))
              .uid("xform-digital-ux")
              .name("Xform Digital Ux");
    }

    final DataStream<CustomFact> customFacts = customFactsDigital == null ? customFactsVoice
        : customFactsVoice.union(customFactsDigital);
    customFacts.addSink(
        producerFactory.create(
            globalAppConfig.gspCustom(),
            autoTopic.decorate(
                new CustomFactSerializationSchema(
                    outputTopicFormat, globalAppConfig.gspCustom()))))
        .uid("uid-" + globalAppConfig.gspCustom())
        .name(globalAppConfig.gspCustom());
  }

  if (!isNullOrEmpty(voiceOutbound)) {
    final OcsStateCleanupOnTimeout.Factory ocsStateCleanupFactory =
        new OcsStateCleanupOnTimeout.Factory(
            maxCampaignGroupSessionDuration,
            globalAppConfig.timerGranularity()
        );

    @Nullable final DataStream<PbCafSubjectInfo> ocsAgentDetails = agentInfo != null
        ? agentInfo.getSideOutput(CfgAgentInfoRegistry.CAF_SUBJECT_INFO_OUTPUT_TAG) : null;

    final SingleOutputStreamOperator<CampaignGroupSessionFact> outboundDataStream = ocsDataStream
        .connect(ocsAgentDetails)
        .keyBy(OcsEventKey::new, OcsEventKey::new)
        .process(new OcsProcessFunction(globalAppConfig, ocsStateCleanupFactory))
        .uid("xform-outbound")
        .name("Xform Outbound");

    if (!gspOutbound.isEmpty()) {
      outboundDataStream.addSink(
          producerFactory.create(
              gspOutbound,
              autoTopic.decorate(
                  new OutboundSerializationSchema(outputTopicFormat, gspOutbound))))
          .uid("uid-" + globalAppConfig.gspOutbound())
          .name(globalAppConfig.gspOutbound());
    } else {
      outboundDataStream
          .addSink(new DiscardingSink<>())
          .uid("uid-gsp-outbound-null")
          .name("gsp-outbound-null");
    }
  }

  TApp.main(see);
  return see;
}
Thanks,
Alexey
From: Yun Gao <yu...@aliyun.com>
Sent: Monday, March 8, 2021 7:57 AM
To: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: How to check checkpointing mode
Hi Alexey,

Logically the setting in the code is of the highest
priority. 

Could you show the complete code on the job creation ?
I think it seems to be not usual to enable checkpointing
with an anonymous StreamExecutionEnvironment.


Best,
Yun


------------------------------------------------------------------
From:Alexey Trenikhun <ye...@msn.com>
Send Time:2021 Mar. 6 (Sat.) 01:02
To:Flink User Mail List <us...@flink.apache.org>
Subject:How to check checkpointing mode

Hello,

My job sets checkpointing mode to at-least-once:

StreamExecutionEnvironment
    .getExecutionEnvironment()
    .enableCheckpointing(checkpointInterval.toMillis(),
        CheckpointingMode.AT_LEAST_ONCE)

but Flink UI shows Checkpointing Mode: Exactly Once:


Why is that? Does Flink for some reason decide to ignore my setting (btw flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any other way to check what is actual checkpointing mode is?

Thanks,
Alexey

Re: How to check checkpointing mode

Posted by Alexey Trenikhun <ye...@msn.com>.
Hi Yun,
Thank you for looking, job creation is quite big, I've truncated helper methods dealing with command line parameters etc, below two major methods:


@Override

public Void call() throws Exception {
  LOGGER.info("{}", new Info().toLog());


  if (!allParameters.isEmpty()) {
    // We don't expect any parameters, but Flink 1.12 adds JVM options to job args, since we add
    // -- after jobs argument, this unnecessary for us arguments will be treated as positional
    // parameters, which we ignore but log warning
    LOGGER.warn("Unexpected parameters: {}", allParameters);
  }
  try {
    final StreamExecutionEnvironment see = buildStreamExecutionEnvironment();
    see.execute(name);
    return null;
  } catch (InterruptedException e) {
    LOGGER.error("Stream Processor was interrupted", e);
    Thread.currentThread().interrupt();
    throw e;
  } catch (Exception e) {
    LOGGER.error("Stream Processor is terminated due to exception", e);
    throw e;
  }
}


private StreamExecutionEnvironment buildStreamExecutionEnvironment() throws IOException {
  initDefaultKafkaSource();
  final long deviationMillis = deviation.toMillis();
  final GlobalAppConfig globalAppConfig = config();
  final StreamExecutionEnvironment see = StreamExecutionEnvironment
      .getExecutionEnvironment()
      .enableCheckpointing(checkpointInterval.toMillis(),
          CheckpointingMode.AT_LEAST_ONCE)
      .setMaxParallelism(1024)
      .setParallelism(parallelism);
  if (externalizedCheckpoints) {
    see.getCheckpointConfig()
        .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  }
  see.getConfig().disableGenericTypes();
  see.getConfig().disableAutoGeneratedUIDs();
  configureStateBackend(see);

  final Properties producerProperties = new PropertiesBuilder()
      .putAll(kafkaCommonOptions)
      .putAll(kafkaProducerOptions)
      .varFiles(valueFiles)
      .build();

  final KafkaProducerFactory producerFactory = KafkaProducerFactory.builder()
      .semantic(Semantic.AT_LEAST_ONCE)
      .config(producerProperties)
      .build();

  final AutoTopic autoTopic = AutoTopic.builder()
      .config(producerProperties)
      .partitions(autoCreateTopicsPartitions)
      .replicationFactor(autoCreateTopicsReplicationFactor)
      .doNotCreateTopics(ImmutableSet.of(
          gspCfg, gspCustom, gspIxn, gspOutbound, gspSm
      ))
      .build();

  see.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.minutes(1)));
  // since Flink 1.12 default stream characteristic is event time,
  // so we don't need to set streamTimeCharacteristic, furthermore whole TimeCharacteristic enum
  // is deprecated.
  // If needed explicitly using processing-time windows and timers works in event-time mode.

  addHeartbeats(see);
  final TStateCleanupOnTimeout.Factory cleanupFactory =
      new TStateCleanupOnTimeout.Factory(
          maxCallDuration,
          postmortemCallDuration,
          globalAppConfig.timerGranularity()
      );

  @Nullable final SingleOutputStreamOperator<PbCfgDatum> cfgXform;
  @Nullable final DataStream<PbCfgDatum> cfgSource = addSources(see,
      SourceTopic.GCA_CFG,
      new CfgJsonDeserializationSchema(),
      (event, timestamp) -> event.getBatchId(),
      it -> !it.getHeartbeat());

  if (cfgSource != null) {
    cfgXform = cfgSource
        .keyBy(PbCfgDatum::getCcId)
        .process(new CfgTransform())
        .uid("xform-cfg")
        .name("XForm Config");

    if (!isNullOrEmpty(gspCfg)) {
      cfgXform.addSink(producerFactory.create(gspCfg,
          autoTopic.decorate(new CfgJsonSerializationSchema(gspCfg))))
          .uid("uid-" + gspCfg)
          .name(gspCfg);
    } else {
      cfgXform.addSink(new DiscardingSink<>())
          .uid("uid-gsp-cfg-null")
          .name("gsp-cfg-null");
    }
  } else {
    cfgXform = null;
  }

  final DataStream<PbTcmDatum> voiceCallThreadSource = addSources(see,
      SourceTopic.VOICE_CALL_THREAD,
      callThreadFormat == KafkaTopicFormat.JSON
          ? new TJsonDeserializationSchema()
          : new CallEventDeserializationSchema(),
      (event, timestamp) ->
          Instants.PROTO_TIMESTAMP_EPOCH.equals(event.getTimestamp())
              ? timestamp - deviationMillis
              : Instants.toMillis(event.getTimestamp()),
      event -> event.getType() != EventType.EVENT_UNKNOWN);

  final SingleOutputStreamOperator<PbTcmDatum> tcmDataStream1 = voiceCallThreadSource
      .keyBy(CallEventKey::new)
      .process(new TIntakeProcessFunction(cleanupFactory))
      .returns(PbTypes.TCM_DATUM)
      .uid("intake-voice-calls")
      .name("Intake voice calls");

  DataStream<PbAgentStateDatum> allAgentStateEventsDataStream =
      tcmDataStream1.getSideOutput(TIntakeProcessFunction.AGENT_STATE_EVENTS_TAG);

  final SingleOutputStreamOperator<PbDIxnDatum> sortedIxnEventStream;
  final SingleOutputStreamOperator<PbDIxnDatum> digitalPreTransform;

  if (!isNullOrEmpty(digitalItx) && !isNullOrEmpty(digitalAgentStateTopic)) {
    final DataStream<PbDIxnEnvelope> ixnIntake = addSources(see,
        SourceTopic.DIGITAL_ITX,
        new InteractionEventDeserializationSchema(),
        (event, timestamp) -> Instants
            .toMillis(event.getOrder().getTimestamp()),
        event -> event.getId() != InteractionEventMessageName.EventHeartbeat.ordinal());

    sortedIxnEventStream = ixnIntake
        .keyBy(DixnEventKey::new)
        .process(new UnwrapEnvelopeAndSort())
        .uid("unwrap-sort")
        .name("Sort Digital Interactions");

    digitalPreTransform = sortedIxnEventStream
        .keyBy(DixnEventKey::new)
        .process(new DIxnPreTransformSplit())
        .uid("split-digital-busy")
        .name("Split Digital busy states");
    allAgentStateEventsDataStream = union(
        allAgentStateEventsDataStream,
        digitalPreTransform.getSideOutput(DIxnPreTransformSplit.AGENT_STATE_EVENTS_TAG));
  } else {
    sortedIxnEventStream = null;
    digitalPreTransform = null;
  }

  // --------
  allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
      addSources(see,
          SourceTopic.VOICE_AGENT_STATE,
          callThreadFormat == KafkaTopicFormat.JSON
              ? new AsdJsonDeserializationSchema() : new AsdAvroDeserializationSchema(),
          this::timestamp,
          event -> event.getId() != Id.UNKNOWN));

  allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
      addSources(see,
          SourceTopic.DIGITAL_AGENT_STATE,
          new AsdAvroDeserializationSchema(),
          this::timestamp,
          event -> event.getId() != Id.UNKNOWN));

  @NotNull final SingleOutputStreamOperator<PbOcsDatum> ocsDataStream = Nullables
      .requireNonNullElseGet(
          addSources(see,
              SourceTopic.VOICE_OUTBOUND,
              new OcsJsonDeserializationSchema(),
              this::timestamp,
              event -> event.getEventCode() != EventCode.EVENT_OCS_NONE),
          () -> see.addSource(new EmptyGenerator<PbOcsDatum>(Instant.MAX)))
      .keyBy(OcsEventKey::new)
      .process(new OcsIntakeProcessFunction())
      .returns(PbTypes.OCS_DATUM)
      .uid("intake-outbound")
      .name("Intake Outbound");

  final SingleOutputStreamOperator<PbAgentStateDatum> agentStatesIntake;
  if (allAgentStateEventsDataStream != null) {
    agentStatesIntake = allAgentStateEventsDataStream
        .keyBy(PersonKey::new)
        .process(new AgentStateIntakeProcessFunction())
        .uid("intake-agent-states")
        .name("Intake Agent States");
  } else {
    agentStatesIntake = null;
  }

  @Nullable final SingleOutputStreamOperator<PbAsdSubjectInfo> agentInfo;
  if (cfgXform != null && agentStatesIntake != null) {
    DataStream<PbPersonQuery> personQueryStream = union(
        agentStatesIntake.getSideOutput(AgentStateIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG),
        ocsDataStream.getSideOutput(OcsIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG)
    );

    agentInfo = cfgXform.getSideOutput(CfgTransform.PERSON_UPDATE_OUTPUT_TAG)
        .connect(personQueryStream)
        .keyBy(PersonKey::new, PersonKey::new)
        .process(new CfgAgentInfoRegistry(globalAppConfig))
        .uid("cfg-agent-info-provider")
        .name("Agent Info");
  } else {
    agentInfo = null;
  }

  final SingleOutputStreamOperator<AgentLoginSession> allAgentLoginSession;
  if (agentStatesIntake != null) {
    allAgentLoginSession = agentStatesIntake.connect(agentInfo)
        .keyBy(PersonKey::new, PersonKey::new)
        .process(new AgentStateTransformation(
            globalAppConfig,
            glsAcwMode,
            agentStateIdleTimeout))
        .uid("xform-agent-states")
        .name("Xform Agent States");
  } else {
    allAgentLoginSession = see.addSource(new EmptyGenerator<>());
  }

  if (!gspSm.isEmpty()) {
    allAgentLoginSession
        .addSink(producerFactory.create(
            gspSm,
            autoTopic.decorate(new AgentStatesSerializationSchema(outputTopicFormat, gspSm))))
        .uid("uid-" + gspSm)
        .name(gspSm);
  } else {
    allAgentLoginSession
        .addSink(new DiscardingSink<>())
        .uid("uid-gsp-sm-null")
        .name("gsp-sm-null");
  }

  DataStream<Interaction> interactionDataStream = tcmDataStream1
      .connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.VOICE_IRF_ACW))
      .keyBy(CallEventKey::new, CallEventKey::new)
      .process(new TInteractionsProcessFunction(globalAppConfig, cleanupFactory))
      .uid("xform-voice-interactions")
      .name("Xform Voice interactions");

  if (digitalPreTransform != null) {
    final DataStream<Interaction> ixnOutputStream = digitalPreTransform
        .connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.DIGITAL_IRF))
        .keyBy(DixnEventKey::new, DixnEventKey::new)
        .process(new DixnTransform(globalAppConfig))
        .uid("xform-digital-interactions")
        .name("Xform Digital Interactions");
    interactionDataStream = interactionDataStream.union(ixnOutputStream);
  }

  interactionDataStream = AsyncDataStream.orderedWait(
      interactionDataStream,
      new ResourceInfo(cfgCacheUrl, provideCfgDictionarySpec()),
      cfgCacheTimeout.toMillis(),
      TimeUnit.MILLISECONDS,
      (ResourceInfo.IO_CAPACITY + see.getConfig().getParallelism() - 1) / see.getConfig()
          .getParallelism())
      .uid("uid-config-async-io")
      .name("Enrich interactions with config");

  if (!gspIxn.isEmpty()) {
    interactionDataStream.addSink(
        producerFactory.create(
            gspIxn,
            autoTopic.decorate(new InteractionSerializationSchema(
                outputTopicFormat, gspIxn, producerFactory.config()))))
        .uid("uid-" + gspIxn)
        .name(gspIxn);
  } else {
    interactionDataStream
        .addSink(new DiscardingSink<>())
        .uid("uid-gsp-ixn-null")
        .name("gsp-ixn-null");
  }

  if (!isNullOrEmpty(gspCustom)) {
    final DataStream<PbUxDatum> uxEventsVoice = tcmDataStream1
        .getSideOutput(TIntakeProcessFunction.UX_EVENTS_TAG);
    final SingleOutputStreamOperator<CustomFact> customFactsVoice =
        uxEventsVoice
            .keyBy(UxKey::new)
            .process(new UxToCustomFact(globalAppConfig))
            .uid("xform-voice-ux")
            .name("Xform Voice Ux");

    SingleOutputStreamOperator<CustomFact> customFactsDigital = null;
    if (sortedIxnEventStream != null) {
      DataStream<PbUxDatum> uxEventsDigital = sortedIxnEventStream
          .getSideOutput(UnwrapEnvelopeAndSort.UX_EVENTS_TAG);
      customFactsDigital =
          uxEventsDigital
              .keyBy(UxKey::new)
              .process(new UxToCustomFact(globalAppConfig))
              .uid("xform-digital-ux")
              .name("Xform Digital Ux");
    }

    final DataStream<CustomFact> customFacts = customFactsDigital == null ? customFactsVoice
        : customFactsVoice.union(customFactsDigital);
    customFacts.addSink(
        producerFactory.create(
            globalAppConfig.gspCustom(),
            autoTopic.decorate(
                new CustomFactSerializationSchema(
                    outputTopicFormat, globalAppConfig.gspCustom()))))
        .uid("uid-" + globalAppConfig.gspCustom())
        .name(globalAppConfig.gspCustom());
  }

  if (!isNullOrEmpty(voiceOutbound)) {
    final OcsStateCleanupOnTimeout.Factory ocsStateCleanupFactory =
        new OcsStateCleanupOnTimeout.Factory(
            maxCampaignGroupSessionDuration,
            globalAppConfig.timerGranularity()
        );

    @Nullable final DataStream<PbCafSubjectInfo> ocsAgentDetails = agentInfo != null
        ? agentInfo.getSideOutput(CfgAgentInfoRegistry.CAF_SUBJECT_INFO_OUTPUT_TAG) : null;

    final SingleOutputStreamOperator<CampaignGroupSessionFact> outboundDataStream = ocsDataStream
        .connect(ocsAgentDetails)
        .keyBy(OcsEventKey::new, OcsEventKey::new)
        .process(new OcsProcessFunction(globalAppConfig, ocsStateCleanupFactory))
        .uid("xform-outbound")
        .name("Xform Outbound");

    if (!gspOutbound.isEmpty()) {
      outboundDataStream.addSink(
          producerFactory.create(
              gspOutbound,
              autoTopic.decorate(
                  new OutboundSerializationSchema(outputTopicFormat, gspOutbound))))
          .uid("uid-" + globalAppConfig.gspOutbound())
          .name(globalAppConfig.gspOutbound());
    } else {
      outboundDataStream
          .addSink(new DiscardingSink<>())
          .uid("uid-gsp-outbound-null")
          .name("gsp-outbound-null");
    }
  }

  TApp.main(see);
  return see;
}

Thanks,
Alexey
________________________________
From: Yun Gao <yu...@aliyun.com>
Sent: Monday, March 8, 2021 7:57 AM
To: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: How to check checkpointing mode

Hi Alexey,

Logically the setting in the code is of the highest
priority.

Could you show the complete code on the job creation ?
I think it seems to be not usual to enable checkpointing
with an anonymous StreamExecutionEnvironment.


Best,
Yun

------------------------------------------------------------------
From:Alexey Trenikhun <ye...@msn.com>
Send Time:2021 Mar. 6 (Sat.) 01:02
To:Flink User Mail List <us...@flink.apache.org>
Subject:How to check checkpointing mode

Hello,

My job sets checkpointing mode to at-least-once:


StreamExecutionEnvironment
    .getExecutionEnvironment()
    .enableCheckpointing(checkpointInterval.toMillis(),
        CheckpointingMode.AT_LEAST_ONCE)

but Flink UI shows Checkpointing Mode: Exactly Once:
[cid:__aliyun161521903696247551]

Why is that? Does Flink for some reason decide to ignore my setting (btw flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any other way to check what is actual checkpointing mode is?

Thanks,
Alexey

Re: How to check checkpointing mode

Posted by Yun Gao <yu...@aliyun.com>.
Hi Alexey,

Logically the setting in the code is of the highest
priority. 

Could you show the complete code on the job creation ?
I think it seems to be not usual to enable checkpointing
with an anonymous StreamExecutionEnvironment.


Best,
Yun


------------------------------------------------------------------
From:Alexey Trenikhun <ye...@msn.com>
Send Time:2021 Mar. 6 (Sat.) 01:02
To:Flink User Mail List <us...@flink.apache.org>
Subject:How to check checkpointing mode

  
 Hello,

 My job sets checkpointing mode to at-least-once:

StreamExecutionEnvironment
    .getExecutionEnvironment()
    .enableCheckpointing(checkpointInterval.toMillis(),
        CheckpointingMode.AT_LEAST_ONCE) 

 but Flink UI shows Checkpointing Mode: Exactly Once:


 Why is that? Does Flink for some reason decide to ignore my setting (btw flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any other way to check what is actual checkpointing mode is?

 Thanks,
 Alexey