You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "phet (via GitHub)" <gi...@apache.org> on 2023/05/27 00:56:16 UTC

[GitHub] [gobblin] phet commented on a diff in pull request #3700: [GOBBLIN-1837] Implement multi-active, non blocking for leader host

phet commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1207456238


##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc:
##########
@@ -23,6 +23,11 @@
     "type" : "string",
     "doc" : "flow execution id for the dag action",
     "compliance" : "NONE"
+  }, {
+    "name" : "dagAction",
+    "type" : "string",

Review Comment:
   I see reasonable arguments for an enum here, as there are only a small number of actions.  what do you see as pros/cons?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -137,12 +139,12 @@ public boolean deleteDagAction(String flowGroup, String flowName, String flowExe
       connection.commit();
       return result != 0;
     } catch (SQLException e) {
-      throw new IOException(String.format("Failure to delete action for table %s of flow with flow group:%s, flow name:%s and flow execution id:%s",
-          tableName, flowGroup, flowName, flowExecutionId), e);
+      throw new IOException(String.format("Failure to delete action for table %s of flow with flow group:%s, flow name:%s, flow execution id:%s and dagAction: %s",
+          tableName, flowGroup, flowName, flowExecutionId, dagActionValue), e);

Review Comment:
   tip: if you used `DagAction` as a POJO, you could use its `.toString()` to the same effect--w/o duplication across these messages.  e.g.:
   ```
   IOE(String.format("...of flow with %s", tableName, dagAction), e);
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -310,6 +316,17 @@ public void orchestrate(Spec spec) throws Exception {
         flowCompilationTimer.get().stop(flowMetadata);
       }
 
+      // If multi-active scheduler is enabled do not pass onto DagManager, otherwise scheduler forwards it directly
+      if (this.isMultiActiveSchedulerEnabled) {
+        String flowExecutionId = flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+        boolean leaseAttemptSucceeded = schedulerLeaseAlgoHandler.handleNewTriggerEvent(jobProps, flowGroup, flowName,
+            flowExecutionId, SchedulerLeaseDeterminationStore.FlowActionType.LAUNCH, triggerTimestampMillis);
+        _log.info("scheduler attempted lease on flowGroup: %s, flowName: %s, flowExecutionId: %s, LAUNCH event for "

Review Comment:
   nit: this is very conversational.  I find more regularized, even robotic text easier to parse and grok when "grepping" a large volume of log msgs.  suggestion:
   ```
   [flowGroup: 'foo-grp', flowName: 'bar-fn', execId: 1234] - multi-active LAUNCH(987654): FAILURE
   ```
   (where `987654` would be the trigger tstamp)
   
   later we might add other multi-active status messages, each with a type, like `CANCEL`, `RESUME`, `ADVANCE` (i.e. to the next job in the flow).  each could carry action-specific info in the parens next to it, which would discern exactly which instance it is  e.g.
   ```
   [g, n, id] - multi-active ADVANCE(<<job-name>>): SUCCESS
   ```
   
   looking ahead to verification and debugging, the end result is that message filtering becomes trivial and flexible on so many dimensions, including:
   * for a particular flow group+name
   * for a particular flow group+name+id
   * for a particular flow group+name+id+LAUNCH(tstamp) (e.g. across hosts)
   * related to multi-active in any way
   * related to a certain multi-active "action", such as `RESUME`
   * specifically those that are `FAILURE` or `SUCCESS`
   * etc.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Interface defines the two basic actions required for lease determination for each FlowActionType event for a flow.
+ * It is used by the {@link SchedulerLeaseAlgoHandler} to allow multiple scheduler's on different hosts to determine
+ * which scheduler is tasked with ensuring the FlowAction is taken for the trigger.
+ */
+public interface SchedulerLeaseDeterminationStore {

Review Comment:
   naming-wise: is this strictly limited to scheduling?
   
   anyway I'd encourage more evocative naming that suggests the underlying architectural role or pattern.  e.g. `MultiActiveLeaseArbiter`.  ("broker" and "controller" carry different design patterns specific connotations)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java:
##########
@@ -27,7 +27,9 @@
 public interface DagActionStore {
   enum DagActionValue {
     KILL,
-    RESUME
+    RESUME,
+    // TODO: potentially combine this enum with {@link SchedulerLeaseDeterminationStore.FlowActionType}
+    LAUNCH
   }
 
   @Getter

Review Comment:
   would it be equivalent to use the `@Data` lombok annotation, which would also synthesize the constructor?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -120,13 +121,14 @@ public void addDagAction(String flowGroup, String flowName, String flowExecution
       insertStatement.executeUpdate();
       connection.commit();
     } catch (SQLException e) {
-      throw new IOException(String.format("Failure to adding action for table %s of flow with flow group:%s, flow name:%s and flow execution id:%s",
-          tableName, flowGroup, flowName, flowExecutionId), e);
+      throw new IOException(String.format("Failure to adding action for table %s of flow with flow group: %s, flow name"

Review Comment:
   nit: "to add" or just "adding"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java:
##########
@@ -40,12 +44,19 @@ public class DagActionStoreChangeMonitorFactory implements Provider<DagActionSto
   private final Config config;
   private DagActionStore dagActionStore;
   private DagManager dagManager;
+  private FlowCatalog flowCatalog;
+  private boolean isMultiActiveSchedulerEnabled;
 
   @Inject
-  public DagActionStoreChangeMonitorFactory(Config config, DagActionStore dagActionStore, DagManager dagManager) {
+  public DagActionStoreChangeMonitorFactory(Config config, DagActionStore dagActionStore, DagManager dagManager,
+      FlowCatalog flowCatalog, @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean isMultiActiveSchedulerEnabled) {

Review Comment:
   is the mismatch in names intentional?  if so, please make sure there's a code comment somewhere, probably at the `InjectionNames` def



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java:
##########
@@ -70,21 +73,23 @@ public DagAction(String flowGroup, String flowName, String flowExecutionId, DagA
    * @param flowGroup flow group for the dag action
    * @param flowName flow name for the dag action
    * @param flowExecutionId flow execution for the dag action
+   * @param dagActionValue the value of the dag action
    * @throws IOException
    * @return true if we successfully delete one record, return false if the record does not exist
    */
-  boolean deleteDagAction(String flowGroup, String flowName, String flowExecutionId) throws IOException;
+  boolean deleteDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionValue dagActionValue) throws IOException;
 
   /***
    * Retrieve action value by the flow group, flow name and flow execution id from the {@link DagActionStore}.
    * @param flowGroup flow group for the dag action
    * @param flowName flow name for the dag action
    * @param flowExecutionId flow execution for the dag action
+   * @param dagActionValue the value of the dag action
    * @throws IOException Exception in retrieving the {@link DagAction}.
    * @throws SpecNotFoundException If {@link DagAction} being retrieved is not present in store.
    */
-  DagAction getDagAction(String flowGroup, String flowName, String flowExecutionId) throws IOException, SpecNotFoundException,
-                                                                                           SQLException;
+  DagAction getDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionValue dagActionValue)

Review Comment:
   one thing I don't understand: a `DagAction` has exactly the four fields that are params to this method.  if so, does this method just duplicate `exists`?
   
   merely wondering: is there a use case for getting any and all actions related to a particular flow execution?
   
   relatedly w/ `deleteDagAction` (above), couldn't that take just a single param of type `DagAction`?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -151,20 +153,21 @@ private DagAction getDagActionWithRetry(String flowGroup, String flowName, Strin
       getStatement.setString(++i, flowGroup);
       getStatement.setString(++i, flowName);
       getStatement.setString(++i, flowExecutionId);
+      getStatement.setString(++i, dagActionValue.toString());

Review Comment:
   tip: if you want to use `DagAction` as a value type/POJO, you could even add a method to encapsulate this repetitive setup:
   ```
   void parepareStatement(PreparedStatement ps, int nextVarIndex)
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -440,7 +446,11 @@ public synchronized void scheduleJob(Properties jobProps, JobListener jobListene
   public void runJob(Properties jobProps, JobListener jobListener) throws JobException {
     try {
       Spec flowSpec = this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
-      this.orchestrator.orchestrate(flowSpec);
+      String triggerTimestampMillis =
+          jobProps.containsKey(ConfigurationKeys.SCHEDULER_ORIGINAL_TRIGGER_TIMESTAMP_MILLIS_KEY)
+              ? jobProps.getProperty(ConfigurationKeys.SCHEDULER_ORIGINAL_TRIGGER_TIMESTAMP_MILLIS_KEY, "0L"):
+              jobProps.getProperty(ConfigurationKeys.SCHEDULER_TRIGGER_TIMESTAMP_MILLIS_KEY,"0L");
+      this.orchestrator.orchestrate(flowSpec, jobProps, Long.parseLong(triggerTimestampMillis));

Review Comment:
   not major, but seems worth raising the level of abstraction w/ a method like `extractTriggerTimestampMillis`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -310,6 +316,17 @@ public void orchestrate(Spec spec) throws Exception {
         flowCompilationTimer.get().stop(flowMetadata);
       }
 
+      // If multi-active scheduler is enabled do not pass onto DagManager, otherwise scheduler forwards it directly
+      if (this.isMultiActiveSchedulerEnabled) {
+        String flowExecutionId = flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+        boolean leaseAttemptSucceeded = schedulerLeaseAlgoHandler.handleNewTriggerEvent(jobProps, flowGroup, flowName,
+            flowExecutionId, SchedulerLeaseDeterminationStore.FlowActionType.LAUNCH, triggerTimestampMillis);
+        _log.info("scheduler attempted lease on flowGroup: %s, flowName: %s, flowExecutionId: %s, LAUNCH event for "
+            + "triggerTimestamp: %s that was " + (leaseAttemptSucceeded ? "" : "NOT") + "successful", flowGroup,

Review Comment:
   `"NOT"` => `"NOT "`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -120,13 +121,14 @@ public void addDagAction(String flowGroup, String flowName, String flowExecution
       insertStatement.executeUpdate();
       connection.commit();
     } catch (SQLException e) {
-      throw new IOException(String.format("Failure to adding action for table %s of flow with flow group:%s, flow name:%s and flow execution id:%s",
-          tableName, flowGroup, flowName, flowExecutionId), e);
+      throw new IOException(String.format("Failure to adding action for table %s of flow with flow group: %s, flow name"
+              + ": %s, flow execution id: %s, and dag action: %s", tableName, flowGroup, flowName, flowExecutionId,
+          dagActionValue), e);
     }
   }
 
   @Override
-  public boolean deleteDagAction(String flowGroup, String flowName, String flowExecutionId) throws IOException {
+  public boolean deleteDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionValue dagActionValue) throws IOException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement deleteStatement = connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
       int i = 0;

Review Comment:
   don't you need to add the `dagActionValue`, now that there's another "bind variable" in the statement (i.e. the fourth "?" now)?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Interface defines the two basic actions required for lease determination for each FlowActionType event for a flow.
+ * It is used by the {@link SchedulerLeaseAlgoHandler} to allow multiple scheduler's on different hosts to determine
+ * which scheduler is tasked with ensuring the FlowAction is taken for the trigger.
+ */
+public interface SchedulerLeaseDeterminationStore {
+  static final Logger LOG = LoggerFactory.getLogger(SchedulerLeaseDeterminationStore.class);
+
+  // Enum is used to reason about the three possible scenarios that can result from an attempt to obtain a lease for a
+  // particular trigger event of a flow
+  enum LeaseAttemptStatus {
+    LEASE_OBTAINED,
+    PREVIOUS_LEASE_EXPIRED,
+    PREVIOUS_LEASE_VALID

Review Comment:
   I respect using an enum for the tri-state result, but the three I had imagined are more like:
   * LEASE_IS_YOURS / LEASE_OBTAINED (acquired)
   * LEASED_TO_ANOTHER / LEASE_NOT_AVAILABLE
   * NO_LONGER_LEASING / LEASE_NOT_POSSIBLE (not necessary / there's nothing to lease)
   
   did you have the same three concepts in mind, just named differently--or different idea here?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Interface defines the two basic actions required for lease determination for each FlowActionType event for a flow.
+ * It is used by the {@link SchedulerLeaseAlgoHandler} to allow multiple scheduler's on different hosts to determine
+ * which scheduler is tasked with ensuring the FlowAction is taken for the trigger.
+ */
+public interface SchedulerLeaseDeterminationStore {
+  static final Logger LOG = LoggerFactory.getLogger(SchedulerLeaseDeterminationStore.class);
+
+  // Enum is used to reason about the three possible scenarios that can result from an attempt to obtain a lease for a
+  // particular trigger event of a flow
+  enum LeaseAttemptStatus {
+    LEASE_OBTAINED,
+    PREVIOUS_LEASE_EXPIRED,
+    PREVIOUS_LEASE_VALID
+  }
+
+  // Action to take on a particular flow
+  enum FlowActionType {
+    LAUNCH,
+    RETRY,
+    CANCEL,
+    NEXT_HOP

Review Comment:
   nit: I mentioned `ADVANCE` elsewhere, but `NEXT_HOP` is fine too.  as for `RETRY`, I believe `RESUME` is the terminology we've adopted pretty widely--or do you find precendent for `RETRY`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -310,6 +316,17 @@ public void orchestrate(Spec spec) throws Exception {
         flowCompilationTimer.get().stop(flowMetadata);
       }
 
+      // If multi-active scheduler is enabled do not pass onto DagManager, otherwise scheduler forwards it directly
+      if (this.isMultiActiveSchedulerEnabled) {
+        String flowExecutionId = flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+        boolean leaseAttemptSucceeded = schedulerLeaseAlgoHandler.handleNewTriggerEvent(jobProps, flowGroup, flowName,
+            flowExecutionId, SchedulerLeaseDeterminationStore.FlowActionType.LAUNCH, triggerTimestampMillis);
+        _log.info("scheduler attempted lease on flowGroup: %s, flowName: %s, flowExecutionId: %s, LAUNCH event for "
+            + "triggerTimestamp: %s that was " + (leaseAttemptSucceeded ? "" : "NOT") + "successful", flowGroup,
+            flowName, flowExecutionId, triggerTimestampMillis);
+        return;

Review Comment:
   especially as there's an `if/else` just after, why not connect this block to that, rather than adding a premature `return` here?  (these can be potentially more error prone for maintainers.)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Interface defines the two basic actions required for lease determination for each FlowActionType event for a flow.
+ * It is used by the {@link SchedulerLeaseAlgoHandler} to allow multiple scheduler's on different hosts to determine
+ * which scheduler is tasked with ensuring the FlowAction is taken for the trigger.
+ */
+public interface SchedulerLeaseDeterminationStore {
+  static final Logger LOG = LoggerFactory.getLogger(SchedulerLeaseDeterminationStore.class);
+
+  // Enum is used to reason about the three possible scenarios that can result from an attempt to obtain a lease for a
+  // particular trigger event of a flow
+  enum LeaseAttemptStatus {
+    LEASE_OBTAINED,
+    PREVIOUS_LEASE_EXPIRED,
+    PREVIOUS_LEASE_VALID
+  }
+
+  // Action to take on a particular flow
+  enum FlowActionType {
+    LAUNCH,
+    RETRY,
+    CANCEL,
+    NEXT_HOP
+  }
+
+  /**
+   * This method attempts to insert an entry into store for a particular flow's trigger event if one does not already
+   * exist in the store for the same trigger event. Regardless of the outcome it also reads the pursuant timestamp of
+   * the entry for that trigger event (it could have pre-existed in the table or been newly added by the previous
+   * write). Based on the transaction results, it will return @LeaseAttemptStatus to determine the next action.
+   * @param flowGroup
+   * @param flowName
+   * @param flowExecutionId
+   * @param triggerTimeMillis is the time this flow is supposed to be launched
+   * @return LeaseAttemptStatus
+   * @throws IOException
+   */
+  LeaseAttemptStatus attemptInsertAndGetPursuantTimestamp(String flowGroup, String flowName,

Review Comment:
   seems misleading to name `GetPursuantTimestamp`, when that doesn't appear to be returned to the caller.  that said, won't the participant who fails to acquire the lease want to know when the lease was granted, to use that in deciding when to try again?
   
   nit: "pursuing" is slightly closer to the sense of what's happening here, than is "pursuant".  the former is also likely to be more familiar naming to maintainers.
   
   alternatively what about simply naming this `acquireLease` or `tryAcquireLease`?
   



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Interface defines the two basic actions required for lease determination for each FlowActionType event for a flow.
+ * It is used by the {@link SchedulerLeaseAlgoHandler} to allow multiple scheduler's on different hosts to determine
+ * which scheduler is tasked with ensuring the FlowAction is taken for the trigger.
+ */
+public interface SchedulerLeaseDeterminationStore {
+  static final Logger LOG = LoggerFactory.getLogger(SchedulerLeaseDeterminationStore.class);
+
+  // Enum is used to reason about the three possible scenarios that can result from an attempt to obtain a lease for a
+  // particular trigger event of a flow
+  enum LeaseAttemptStatus {
+    LEASE_OBTAINED,
+    PREVIOUS_LEASE_EXPIRED,
+    PREVIOUS_LEASE_VALID
+  }
+
+  // Action to take on a particular flow
+  enum FlowActionType {
+    LAUNCH,
+    RETRY,
+    CANCEL,
+    NEXT_HOP
+  }
+
+  /**
+   * This method attempts to insert an entry into store for a particular flow's trigger event if one does not already
+   * exist in the store for the same trigger event. Regardless of the outcome it also reads the pursuant timestamp of
+   * the entry for that trigger event (it could have pre-existed in the table or been newly added by the previous
+   * write). Based on the transaction results, it will return @LeaseAttemptStatus to determine the next action.
+   * @param flowGroup
+   * @param flowName
+   * @param flowExecutionId
+   * @param triggerTimeMillis is the time this flow is supposed to be launched
+   * @return LeaseAttemptStatus
+   * @throws IOException
+   */
+  LeaseAttemptStatus attemptInsertAndGetPursuantTimestamp(String flowGroup, String flowName,
+      String flowExecutionId, FlowActionType flowActionType, long triggerTimeMillis) throws IOException;
+
+  /**
+   * This method is used by `attemptInsertAndGetPursuantTimestamp` above to indicate the host has successfully completed

Review Comment:
   not having read further yet, the description here makes this sound like an internal impl detail.  does it need to be part of the public interface?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org