You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "umustafi (via GitHub)" <gi...@apache.org> on 2023/02/09 00:47:50 UTC

[GitHub] [gobblin] umustafi opened a new pull request, #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

umustafi opened a new pull request, #3640:
URL: https://github.com/apache/gobblin/pull/3640

   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [X] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1783 
   
   
   ### Description
   - [X] Here are some details about my PR, including screenshots (if applicable):
   We seek to improve initialization time of the `GobblinServiceJobScheduler` upon restart or new leadership change by batching the mysql queries to get flow specs. Instead of making 1 mysql get call for each flow execution id, which scales extremely poorly with number of flows, we should group them to reduce number of calls and downtime.
   
   This implementation adds two new functions to the `SpecStore` interface, `getSortedSpecURIs` and `getBatchedSpecs`, that we use to achieve the batching. Because these two functionalities are generic enough to be used in derived classes of the `SpecStore` we add them to the base class. Although this requires any child classes to implement these functions, it allows any consumer of the parent class `SpecStore` to use this functionality without caring about the specific implementation of the `SpecStore` used (as `GobblinServiceJobScheduler` does). Additionally, the `getBatchedSpecs` requires an `offset` or starting point to obtain the batches from so the consumer has to do some book keeping of where in the paginated gets we are but this again separates the functionality from the use case of the consumer. the entirety of the flow catalog is too large to load into memory for the `GobblinServiceJobScheduler`, so we use this batch functionality. 
   
   ### Tests
   - [X] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   The main testing for this functionality will be empirical testing to determine the batch size which results in decreasing the time needed for all the gets above. In general, the time should go down with a larger batch size but there may be an inflection point where we see diminishing returns or the memory required to store a large batch is excessive. I propose testing with batch size 100, 250, and 750 incrementally and using the time metrics emitted to settle on an appropriate size.
   
   ### Commits
   - [X] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] AndyJiang99 commented on a diff in pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "AndyJiang99 (via GitHub)" <gi...@apache.org>.
AndyJiang99 commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1105121542


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java:
##########
@@ -84,6 +84,7 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
   private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec FROM %s";
   private static final String GET_ALL_URIS_STATEMENT = "SELECT spec_uri FROM %s";
   private static final String GET_ALL_URIS_WITH_TAG_STATEMENT = "SELECT spec_uri FROM %s WHERE tag = ?";
+  private static final String GET_SPECS_BATCH_STATEMENT = "SELECT spec_uri, spec FROM %s ORDER BY spec_uri ASC LIMIT ? OFFSET ?";

Review Comment:
   1. Was there any reason why spec_json was removed from the query string? 
   2. Using this query, it will error in the scenario the OFFSET or LIMIT was set to a negative value as MySQL queries cannot handle those and this [if block](https://github.com/apache/gobblin/pull/3640/files#diff-900cc8e4e863a8057e0e808230a2f9c3c169048059d62a5d94ed9beb94360c12L283-L293) handles such cases. We'll need to add something similar in to handle those cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#issuecomment-1442230139

   > Seems there is no change for spec monitor, I think for spec monitor, you also need to get modification time in the spec?
   
   Good catch, I changed the getSpec function for a single URI in SpecStore to also use modification time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#issuecomment-1428533867

   > 
   
   1. Current implementation, adds scheduler then the specConsumer to list of services. I considered switching order but scheduler needs to be initialized before consuming specs and trying to add to scheduler. Need to confirm if services are initialized in that other or done concurrently. specConsumer starts consuming from the latestOffset so this should not miss any specs. The offset won't move along unless service is up and able to accept requests and our consumer is processing. 
   
   2. The problem can come up if we are loading flowSpecA from old value and while processing that batch there's API request to update flow and consumer calls onAdd with a newer value first, then scheduler calls with old value. It's very rare but we may want to add modified timestamp to avoid. This technically _could_ have happened in previous case although much more rare chance with the individual gets that in between get and add spec the consumer processed a newer spec version. If we want to use modification time, need to make bigger change to store modified time with spec in `DagManager` or `Scheduler` itself perhaps.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1103408096


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java:
##########
@@ -84,6 +84,7 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
   private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec FROM %s";
   private static final String GET_ALL_URIS_STATEMENT = "SELECT spec_uri FROM %s";
   private static final String GET_ALL_URIS_WITH_TAG_STATEMENT = "SELECT spec_uri FROM %s WHERE tag = ?";
+  private static final String GET_SPECS_BATCH_STATEMENT = "SELECT spec_uri, spec FROM %s ORDER BY spec_uri ASC LIMIT ? OFFSET ?";

Review Comment:
   the `spec_uri` is the primary key [here](https://jarvis.corp.linkedin.com/codesearch/result/?name=MysqlBaseSpecStore.java&path=gobblin-elr%2Fgobblin-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fgobblin%2Fruntime%2Fspec_store&reponame=linkedin%2Fgobblin-elr#91) in `MysqlBaseSpecStore` and in the `MysqlSpecStore` so ordering on spec_uri should be unique. Let me add a comment in the docstring of the paginate function. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#issuecomment-1444829515

   > Minor comments, also we seems to change all the get method return modification time in the flow spec as well. Hopefully you verified that this is a backward compatible change
   
   Tests pass (other than when you compare user generated spec to retrieved one which requires some change). Logically this should be backwards compatible, since we added a property not removed. Other Specs should also be able to add properties as needed to the Config. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1114672807


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java:
##########
@@ -72,6 +73,7 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
 
   public static final String CONFIG_PREFIX = "mysqlBaseSpecStore";
   public static final String DEFAULT_TAG_VALUE = "";
+  public static final String modificationTimeKey = "modified_time";

Review Comment:
   Can we make this a Configuration Key or a key associated with the FlowSpec rather than the SpecStore? Since the property is part of the flowSpec it will encapsulate better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1117801844


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -210,26 +245,67 @@ private void scheduleSpecsFromCatalog() {
         clearRunningFlowState(drUris);
       }
     } catch (IOException e) {
-      throw new RuntimeException("Failed to get the iterator of all Spec URIS", e);
+      throw new RuntimeException("Failed to get Spec URIs with tag to clear running flow state", e);
     }
 
-    while (specUris.hasNext()) {
-      Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
-      try {
-        // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
-        if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean((
-            (FlowSpec) spec).getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
-          Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
-          onAddSpec(modifiedSpec);
-        } else {
-          onAddSpec(spec);
+    int startOffset = 0;
+    long batchGetStartTime;
+    long batchGetEndTime;
+
+    while (startOffset < numSpecs) {
+      batchGetStartTime  = System.currentTimeMillis();
+      Collection<Spec> batchOfSpecs = this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
+      Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
+      batchGetEndTime = System.currentTimeMillis();
+
+      while (batchOfSpecsIterator.hasNext()) {
+        Spec spec = batchOfSpecsIterator.next();
+        try {
+          // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
+          if (spec instanceof FlowSpec && PropertiesUtils
+              .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
+                  "false")) {
+            Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
+            onAddSpec(modifiedSpec);
+          } else {
+            onAddSpec(spec);
+          }
+          urisLeftToSchedule.remove(spec.getUri());
+        } catch (Exception e) {
+          // If there is an uncaught error thrown during compilation, log it and continue adding flows
+          _log.error("Could not schedule spec {} from flowCatalog due to ", spec, e);
         }
-      } catch (Exception e) {
-        // If there is an uncaught error thrown during compilation, log it and continue adding flows
-        _log.error("Could not schedule spec {} from flowCatalog due to ", spec, e);
       }
+      startOffset += this.loadSpecsBatchSize;
+      // This count is used to ensure the average spec get time is calculated accurately for the last batch which may be
+      // smaller than the loadSpecsBatchSize
+      averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) / batchOfSpecs.size();
     }
+
+    // Ensure we did not miss any specs due to ordering changing (deletions/insertions) while loading
+    Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
+    while (urisLeft.hasNext()) {
+        URI uri = urisLeft.next();
+        try {
+          Spec spec = this.flowCatalog.get().getSpecs(uri);
+          // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
+          if (spec instanceof FlowSpec && PropertiesUtils

Review Comment:
   Duplicate code, can we extract them as one method?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -210,26 +245,67 @@ private void scheduleSpecsFromCatalog() {
         clearRunningFlowState(drUris);
       }
     } catch (IOException e) {
-      throw new RuntimeException("Failed to get the iterator of all Spec URIS", e);
+      throw new RuntimeException("Failed to get Spec URIs with tag to clear running flow state", e);
     }
 
-    while (specUris.hasNext()) {
-      Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
-      try {
-        // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
-        if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean((
-            (FlowSpec) spec).getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
-          Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
-          onAddSpec(modifiedSpec);
-        } else {
-          onAddSpec(spec);
+    int startOffset = 0;
+    long batchGetStartTime;
+    long batchGetEndTime;
+
+    while (startOffset < numSpecs) {
+      batchGetStartTime  = System.currentTimeMillis();
+      Collection<Spec> batchOfSpecs = this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
+      Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
+      batchGetEndTime = System.currentTimeMillis();
+
+      while (batchOfSpecsIterator.hasNext()) {
+        Spec spec = batchOfSpecsIterator.next();
+        try {
+          // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
+          if (spec instanceof FlowSpec && PropertiesUtils
+              .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
+                  "false")) {
+            Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
+            onAddSpec(modifiedSpec);
+          } else {
+            onAddSpec(spec);
+          }
+          urisLeftToSchedule.remove(spec.getUri());
+        } catch (Exception e) {
+          // If there is an uncaught error thrown during compilation, log it and continue adding flows
+          _log.error("Could not schedule spec {} from flowCatalog due to ", spec, e);
         }
-      } catch (Exception e) {
-        // If there is an uncaught error thrown during compilation, log it and continue adding flows
-        _log.error("Could not schedule spec {} from flowCatalog due to ", spec, e);
       }
+      startOffset += this.loadSpecsBatchSize;
+      // This count is used to ensure the average spec get time is calculated accurately for the last batch which may be
+      // smaller than the loadSpecsBatchSize
+      averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) / batchOfSpecs.size();
     }
+
+    // Ensure we did not miss any specs due to ordering changing (deletions/insertions) while loading
+    Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
+    while (urisLeft.hasNext()) {
+        URI uri = urisLeft.next();
+        try {
+          Spec spec = this.flowCatalog.get().getSpecs(uri);

Review Comment:
   Use getSpecWrapper? It's possible that it's adhoc flow and we delete it after execute. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +418,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the existing flow and reschedule it unnecessarily
+    Long modificationTime = Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.modificationTimeKey, "0"));
+    String uriString = flowSpec.getUri().toString();
+    // If the modification time is 0 (which means the original API was used to retrieve spec or warm standby mode is not
+    // enabled), spec not in scheduler, or have a modification time associated with it assume it's the most recent
+    if (modificationTime != 0L && this.scheduledFlowSpecs.containsKey(uriString)
+        && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+      if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= modificationTime) {

Review Comment:
   might be a super corner case, but if the modification time is the same, you might want to check whether the flow is run-immediately, as the load method overwrite that value and might skip the first run of the flow. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1109099163


##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java:
##########
@@ -325,19 +325,18 @@ public void testGetAllSpecPaginate() throws Exception {
     Assert.assertTrue(specs.contains(this.flowSpec4));
 
     // Return all flowSpecs of index [0, 2). Total of 3 flowSpecs, only return first two.
-    specs = this.specStore.getSpecs(0,2);
+    specs = this.specStore.getSpecsPaginated(0,2);
     Assert.assertEquals(specs.size(), 2);
     Assert.assertTrue(specs.contains(this.flowSpec1));
     Assert.assertTrue(specs.contains(this.flowSpec2));
     Assert.assertFalse(specs.contains(this.flowSpec4));
 
-    // Return all flowSpecs of index [0, 2). Total of 3 flowSpecs, only return first two.
-    // Check that functionality for not including a start value is the same as including start value of 0
-    specs = this.specStore.getSpecs(-1, 2);
+    // Return all flowSpecs from index 1 to 3 - 1. Total of 2 flowSpecs, only return second two.
+    specs = this.specStore.getSpecsPaginated(1, 2);

Review Comment:
   Added tests for all the edge cases of negative, 0 count/offset or a start offset past the length of the store. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1119138633


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -343,8 +416,27 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the existing flow and reschedule it unnecessarily
+    Long modificationTime = Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.MODIFICATION_TIME_KEY, "0"));
+    String uriString = flowSpec.getUri().toString();
+    Boolean isRunImmediately = PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false");
+    // If the modification time is 0 (which means the original API was used to retrieve spec or warm standby mode is not
+    // enabled), spec not in scheduler, or have a modification time associated with it assume it's the most recent
+    if (modificationTime != 0L && this.scheduledFlowSpecs.containsKey(uriString)
+        && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+      // For run-immediately flows with a schedule the modified_time would remain the same
+      if (this.lastUpdatedTimeForFlowSpec.get(uriString) > modificationTime
+          || (this.lastUpdatedTimeForFlowSpec.get(uriString).equals(modificationTime) && !isRunImmediately)) {

Review Comment:
   why equals but not ==? 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +418,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the existing flow and reschedule it unnecessarily
+    Long modificationTime = Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.modificationTimeKey, "0"));
+    String uriString = flowSpec.getUri().toString();
+    // If the modification time is 0 (which means the original API was used to retrieve spec or warm standby mode is not
+    // enabled), spec not in scheduler, or have a modification time associated with it assume it's the most recent
+    if (modificationTime != 0L && this.scheduledFlowSpecs.containsKey(uriString)
+        && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+      if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= modificationTime) {
+        _log.info("Ignoring the spec {} modified at time {} because we have a more updated version from time {}",
+            addedSpec, modificationTime,this.lastUpdatedTimeForFlowSpec.get(uriString));
+        return new AddSpecResponse(response);
+      }
+    }
+
     // todo : we should probably not schedule a flow if it is a runOnce flow
     this.scheduledFlowSpecs.put(flowSpecUri.toString(), addedSpec);
+    this.lastUpdatedTimeForFlowSpec.put(flowSpecUri.toString(), modificationTime);

Review Comment:
   I think you want to delete the entry when we remove the flow from the scheduler, otherwise, you will have a slight memory leak when we keep receiving ad-hoc flow requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1116096090


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +412,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the existing flow and reschedule it unnecessarily
+    Long modificationTime = Long.valueOf(flowSpec.getConfigAsProperties().getProperty(MysqlBaseSpecStore.modificationTimeKey, "0"));
+    if (this.isSchedulingSpecsFromCatalog()) {

Review Comment:
   If not scheduling period, then as of now we should always be processing most recent update but we can do the check anyway since its small. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1117782597


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java:
##########
@@ -65,6 +65,9 @@
 @SuppressFBWarnings(value="SE_BAD_FIELD",
     justification = "FindBugs complains about Config not being serializable, but the implementation of Config is serializable")
 public class FlowSpec implements Configurable, Spec {
+  // Key for Property associated with modified_time
+  public static final String modificationTimeKey = "modified_time";

Review Comment:
   can we treat this the same way as other Gobblin constants?
   `public static final String MODIFICATION_TIME_KEY = "modified_time"`
   



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java:
##########
@@ -54,6 +54,9 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaRequests.exceeded";
   public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.time.to.check.quota";
 
+  public static final String GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.average.get.spec.speed.while.loading.all.specs.millis";

Review Comment:
   I would consider shortening this metric name :) since for metric platforms they usually have some slicing/dicing available based on your delimiter.
   So it'd be a very long metric name when you may want to group more related metrics in the future under `jobScheduler`
   So for example this can be `GOBBLIN_SERVICE_PREFIX + "jobScheduler.getSpecSpeedDuringStartupAvgMillis"`
   
   Similar can be said about the below metrics in a way.
   Also be careful not to add a dot before the prefix otherwise I believe you'll get a metric name with 2 `.`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +418,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the existing flow and reschedule it unnecessarily
+    Long modificationTime = Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.modificationTimeKey, "0"));
+    String uriString = flowSpec.getUri().toString();
+    // If the modification time is 0 (which means the original API was used to retrieve spec or warm standby mode is not
+    // enabled), spec not in scheduler, or have a modification time associated with it assume it's the most recent
+    if (modificationTime != 0L && this.scheduledFlowSpecs.containsKey(uriString)
+        && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+      if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= modificationTime) {
+        _log.info("Ignoring the spec {} modified at time {} because we have a more updated version from time {}",

Review Comment:
   We should make this a warn log probably, based on the response it will treat as a successful add



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +418,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the existing flow and reschedule it unnecessarily
+    Long modificationTime = Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.modificationTimeKey, "0"));
+    String uriString = flowSpec.getUri().toString();
+    // If the modification time is 0 (which means the original API was used to retrieve spec or warm standby mode is not
+    // enabled), spec not in scheduler, or have a modification time associated with it assume it's the most recent
+    if (modificationTime != 0L && this.scheduledFlowSpecs.containsKey(uriString)
+        && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+      if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= modificationTime) {
+        _log.info("Ignoring the spec {} modified at time {} because we have a more updated version from time {}",
+            addedSpec, modificationTime,this.lastUpdatedTimeForFlowSpec.get(uriString));
+        return new AddSpecResponse(response);
+      }
+    }
+
     // todo : we should probably not schedule a flow if it is a runOnce flow
     this.scheduledFlowSpecs.put(flowSpecUri.toString(), addedSpec);
+    this.lastUpdatedTimeForFlowSpec.put(flowSpecUri.toString(), modificationTime);

Review Comment:
   Do we still need to keep track of this data structure after service startup?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] AndyJiang99 commented on a diff in pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "AndyJiang99 (via GitHub)" <gi...@apache.org>.
AndyJiang99 commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1105183287


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java:
##########
@@ -358,8 +360,35 @@ public int getSizeImpl() throws IOException {
   }
 
   @Override
-  public Collection<Spec> getSpecsImpl(int start, int count) throws UnsupportedOperationException {
-    throw new UnsupportedOperationException();
+  public Collection<Spec> getSpecsPaginatedImpl(int startOffset, int batchSize)
+      throws IOException {
+    if (startOffset < 0 || batchSize < 0) {
+      throw new IOException(String.format("Received negative offset or batch size value when they should be >= 0. "

Review Comment:
   Maybe IllegalArgumentException instead? 



##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java:
##########
@@ -303,19 +303,18 @@ public void testGetAllSpecPaginate() throws Exception {
     Assert.assertTrue(specs.contains(this.flowSpec4));
 
     // Return all flowSpecs from index 0 to 2 - 1. Total of 3 flowSpecs, only return first two.
-    specs = this.specStore.getSpecs(0,2);
+    specs = this.specStore.getSpecsPaginated(0,2);
     Assert.assertEquals(specs.size(), 2);
     Assert.assertTrue(specs.contains(this.flowSpec1));
     Assert.assertTrue(specs.contains(this.flowSpec2));
     Assert.assertFalse(specs.contains(this.flowSpec4));
 
-    // Return all flowSpecs from index 0 to 2 - 1. Total of 3 flowSpecs, only return first two.
-    // Check that functionality for not including a start value is the same as including start value of 0
-    specs = this.specStore.getSpecs(-1, 2);
+    // Return all flowSpecs from index 1 to 3 - 1. Total of 2 flowSpecs, only return second two.
+    specs = this.specStore.getSpecsPaginated(1, 2);

Review Comment:
   Same comment as below, can we keep the -1 input for either OFFSET or COUNT just to check that the functionality to handle such edge cases exists



##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java:
##########
@@ -325,19 +325,18 @@ public void testGetAllSpecPaginate() throws Exception {
     Assert.assertTrue(specs.contains(this.flowSpec4));
 
     // Return all flowSpecs of index [0, 2). Total of 3 flowSpecs, only return first two.
-    specs = this.specStore.getSpecs(0,2);
+    specs = this.specStore.getSpecsPaginated(0,2);
     Assert.assertEquals(specs.size(), 2);
     Assert.assertTrue(specs.contains(this.flowSpec1));
     Assert.assertTrue(specs.contains(this.flowSpec2));
     Assert.assertFalse(specs.contains(this.flowSpec4));
 
-    // Return all flowSpecs of index [0, 2). Total of 3 flowSpecs, only return first two.
-    // Check that functionality for not including a start value is the same as including start value of 0
-    specs = this.specStore.getSpecs(-1, 2);
+    // Return all flowSpecs from index 1 to 3 - 1. Total of 2 flowSpecs, only return second two.
+    specs = this.specStore.getSpecsPaginated(1, 2);

Review Comment:
   Could we keep the test where the COUNT and/or OFFSET is negative, and Assert that we're able to catch an error? The reason as to why this was here is to catch this corner case in the case that implementation changes 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -133,10 +147,22 @@ public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
     this.helixManager = helixManager;
     this.orchestrator = orchestrator;
     this.scheduledFlowSpecs = Maps.newHashMap();
+    this.loadSpecsBatchSize = Integer.parseInt(ConfigUtils.configToProperties(config).getProperty(ConfigurationKeys.LOAD_SPEC_BATCH_SIZE, String.valueOf(ConfigurationKeys.DEFAULT_LOAD_SPEC_BATCH_SIZE)));
     this.isNominatedDRHandler = config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED)
         && config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
     this.warmStandbyEnabled = warmStandbyEnabled;
     this.quotaManager = quotaManager;
+    // Check that these metrics do not exist before adding, mainly for testing purpose which creates multiple instances
+    // of the scheduler. If one metric exists, then the others should as well.
+    MetricFilter filter = MetricFilter.contains(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS);
+    if (metricContext.getGauges(filter).isEmpty()) {
+//      this.averageGetSpecTimeMillis =

Review Comment:
   Remove unnecessary comments here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1115178711


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +412,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the existing flow and reschedule it unnecessarily
+    Long modificationTime = Long.valueOf(flowSpec.getConfigAsProperties().getProperty(MysqlBaseSpecStore.modificationTimeKey, "0"));
+    if (this.isSchedulingSpecsFromCatalog()) {
+      String uriString = flowSpec.getUri().toString();
+      // If spec does not exist in scheduler or have a modification time associated with it, assume it's the most recent
+      if (this.scheduledFlowSpecs.containsKey(uriString) && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+        if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= modificationTime) {

Review Comment:
   if modification time is 0, which means it's not enabled with multi leader, I believe you want to process it anyway



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -210,26 +239,67 @@ private void scheduleSpecsFromCatalog() {
         clearRunningFlowState(drUris);
       }
     } catch (IOException e) {
-      throw new RuntimeException("Failed to get the iterator of all Spec URIS", e);
+      throw new RuntimeException("Failed to get Spec URIs with tag to clear running flow state", e);
     }
 
-    while (specUris.hasNext()) {
-      Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
-      try {
-        // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
-        if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean((
-            (FlowSpec) spec).getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
-          Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
-          onAddSpec(modifiedSpec);
-        } else {
-          onAddSpec(spec);
+    int startOffset = 0;
+    long batchGetStartTime;
+    long batchGetEndTime;
+
+    while (startOffset < numSpecs) {
+      batchGetStartTime  = System.currentTimeMillis();
+      Collection<Spec> batchOfSpecs = this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
+      Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
+      batchGetEndTime = System.currentTimeMillis();
+
+      while (batchOfSpecsIterator.hasNext()) {
+        Spec spec = batchOfSpecsIterator.next();
+        try {
+          // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
+          if (spec instanceof FlowSpec && PropertiesUtils
+              .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
+                  "false")) {
+            Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
+            onAddSpec(modifiedSpec);
+          } else {
+            onAddSpec(spec);
+          }
+        } catch (Exception e) {
+          // If there is an uncaught error thrown during compilation, log it and continue adding flows
+          _log.error("Could not schedule spec {} from flowCatalog due to ", spec, e);
+        }
+      }
+      startOffset += this.loadSpecsBatchSize;
+      // This count is used to ensure the average spec get time is calculated accurately for the last batch which may be
+      // smaller than the loadSpecsBatchSize
+      averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) / batchOfSpecs.size();
+    }
+
+    // Ensure no specs were more specs were added during the load time
+    int updatedNumSpecs = flowCatalog.get().getSize();

Review Comment:
   As we talked before, if delete spec happens, this won't solve the issue, am I missing anything? Why not just use set to track what has been handled?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +412,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the existing flow and reschedule it unnecessarily
+    Long modificationTime = Long.valueOf(flowSpec.getConfigAsProperties().getProperty(MysqlBaseSpecStore.modificationTimeKey, "0"));
+    if (this.isSchedulingSpecsFromCatalog()) {

Review Comment:
   we can do this compare all the time even if it's not the loading time period? What's the concern for not doing that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1117840401


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +418,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the existing flow and reschedule it unnecessarily
+    Long modificationTime = Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.modificationTimeKey, "0"));
+    String uriString = flowSpec.getUri().toString();
+    // If the modification time is 0 (which means the original API was used to retrieve spec or warm standby mode is not
+    // enabled), spec not in scheduler, or have a modification time associated with it assume it's the most recent
+    if (modificationTime != 0L && this.scheduledFlowSpecs.containsKey(uriString)
+        && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+      if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= modificationTime) {
+        _log.info("Ignoring the spec {} modified at time {} because we have a more updated version from time {}",
+            addedSpec, modificationTime,this.lastUpdatedTimeForFlowSpec.get(uriString));
+        return new AddSpecResponse(response);
+      }
+    }
+
     // todo : we should probably not schedule a flow if it is a runOnce flow
     this.scheduledFlowSpecs.put(flowSpecUri.toString(), addedSpec);
+    this.lastUpdatedTimeForFlowSpec.put(flowSpecUri.toString(), modificationTime);

Review Comment:
   We don't have to but there's no harm in having the check regardless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1119143320


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -343,8 +416,27 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the existing flow and reschedule it unnecessarily
+    Long modificationTime = Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.MODIFICATION_TIME_KEY, "0"));
+    String uriString = flowSpec.getUri().toString();
+    Boolean isRunImmediately = PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false");
+    // If the modification time is 0 (which means the original API was used to retrieve spec or warm standby mode is not
+    // enabled), spec not in scheduler, or have a modification time associated with it assume it's the most recent
+    if (modificationTime != 0L && this.scheduledFlowSpecs.containsKey(uriString)
+        && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+      // For run-immediately flows with a schedule the modified_time would remain the same
+      if (this.lastUpdatedTimeForFlowSpec.get(uriString) > modificationTime
+          || (this.lastUpdatedTimeForFlowSpec.get(uriString).equals(modificationTime) && !isRunImmediately)) {

Review Comment:
   I'm comparing `Long` object not primitive so equals() needed to compare value, == comparison will check the object references are equal not value 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1119143320


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -343,8 +416,27 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the existing flow and reschedule it unnecessarily
+    Long modificationTime = Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.MODIFICATION_TIME_KEY, "0"));
+    String uriString = flowSpec.getUri().toString();
+    Boolean isRunImmediately = PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false");
+    // If the modification time is 0 (which means the original API was used to retrieve spec or warm standby mode is not
+    // enabled), spec not in scheduler, or have a modification time associated with it assume it's the most recent
+    if (modificationTime != 0L && this.scheduledFlowSpecs.containsKey(uriString)
+        && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+      // For run-immediately flows with a schedule the modified_time would remain the same
+      if (this.lastUpdatedTimeForFlowSpec.get(uriString) > modificationTime
+          || (this.lastUpdatedTimeForFlowSpec.get(uriString).equals(modificationTime) && !isRunImmediately)) {

Review Comment:
   I'm comparing `Long` object not primitive so it gives a warning about == comparison that may check the object references are equal not value 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo merged pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo merged PR #3640:
URL: https://github.com/apache/gobblin/pull/3640


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#issuecomment-1428617727

   service test looks flakey with a timeout, they pass locally. Fixed checkstyle (core tests), working on the module ones


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1105145875


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java:
##########
@@ -84,6 +84,7 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
   private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec FROM %s";
   private static final String GET_ALL_URIS_STATEMENT = "SELECT spec_uri FROM %s";
   private static final String GET_ALL_URIS_WITH_TAG_STATEMENT = "SELECT spec_uri FROM %s WHERE tag = ?";
+  private static final String GET_SPECS_BATCH_STATEMENT = "SELECT spec_uri, spec FROM %s ORDER BY spec_uri ASC LIMIT ? OFFSET ?";

Review Comment:
   Ah that's something we should specify in the function parameter values expected and throw an error if we receive input outside of our expected bounds. We shouldn't receive negative values (count = 0 or offset = 0 should be fine) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] codecov-commenter commented on pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#issuecomment-1423459456

   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3640?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3640](https://codecov.io/gh/apache/gobblin/pull/3640?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (80d0bce) into [master](https://codecov.io/gh/apache/gobblin/commit/13faea46bd2f23999fb1bf9ea579296fb86d1e3d?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (13faea4) will **decrease** coverage by `2.77%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3640      +/-   ##
   ============================================
   - Coverage     46.58%   43.81%   -2.77%     
   + Complexity    10681     2066    -8615     
   ============================================
     Files          2133      409    -1724     
     Lines         83573    17639   -65934     
     Branches       9295     2152    -7143     
   ============================================
   - Hits          38931     7729   -31202     
   + Misses        41076     9052   -32024     
   + Partials       3566      858    -2708     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3640?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...pache/gobblin/configuration/ConfigurationKeys.java](https://codecov.io/gh/apache/gobblin/pull/3640?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vY29uZmlndXJhdGlvbi9Db25maWd1cmF0aW9uS2V5cy5qYXZh) | | |
   | [...che/gobblin/runtime/api/InstrumentedSpecStore.java](https://codecov.io/gh/apache/gobblin/pull/3640?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL0luc3RydW1lbnRlZFNwZWNTdG9yZS5qYXZh) | | |
   | [...java/org/apache/gobblin/runtime/api/SpecStore.java](https://codecov.io/gh/apache/gobblin/pull/3640?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL1NwZWNTdG9yZS5qYXZh) | | |
   | [...apache/gobblin/runtime/metrics/RuntimeMetrics.java](https://codecov.io/gh/apache/gobblin/pull/3640?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbWV0cmljcy9SdW50aW1lTWV0cmljcy5qYXZh) | | |
   | [...ache/gobblin/runtime/spec\_catalog/FlowCatalog.java](https://codecov.io/gh/apache/gobblin/pull/3640?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvc3BlY19jYXRhbG9nL0Zsb3dDYXRhbG9nLmphdmE=) | | |
   | [...apache/gobblin/runtime/spec\_store/FSSpecStore.java](https://codecov.io/gh/apache/gobblin/pull/3640?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvc3BlY19zdG9yZS9GU1NwZWNTdG9yZS5qYXZh) | | |
   | [...gobblin/runtime/spec\_store/MysqlBaseSpecStore.java](https://codecov.io/gh/apache/gobblin/pull/3640?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvc3BlY19zdG9yZS9NeXNxbEJhc2VTcGVjU3RvcmUuamF2YQ==) | | |
   | [.../modules/scheduler/GobblinServiceJobScheduler.java](https://codecov.io/gh/apache/gobblin/pull/3640?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9zY2hlZHVsZXIvR29iYmxpblNlcnZpY2VKb2JTY2hlZHVsZXIuamF2YQ==) | | |
   | [...etention/policy/predicates/WhitelistPredicate.java](https://codecov.io/gh/apache/gobblin/pull/3640?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L3JldGVudGlvbi9wb2xpY3kvcHJlZGljYXRlcy9XaGl0ZWxpc3RQcmVkaWNhdGUuamF2YQ==) | | |
   | [...gobblin/runtime/commit/DatasetStateCommitStep.java](https://codecov.io/gh/apache/gobblin/pull/3640?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvY29tbWl0L0RhdGFzZXRTdGF0ZUNvbW1pdFN0ZXAuamF2YQ==) | | |
   | ... and [1719 more](https://codecov.io/gh/apache/gobblin/pull/3640?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] AndyJiang99 commented on a diff in pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "AndyJiang99 (via GitHub)" <gi...@apache.org>.
AndyJiang99 commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1103146215


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java:
##########
@@ -84,6 +84,7 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
   private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec FROM %s";
   private static final String GET_ALL_URIS_STATEMENT = "SELECT spec_uri FROM %s";
   private static final String GET_ALL_URIS_WITH_TAG_STATEMENT = "SELECT spec_uri FROM %s WHERE tag = ?";
+  private static final String GET_SPECS_BATCH_STATEMENT = "SELECT spec_uri, spec FROM %s ORDER BY spec_uri ASC LIMIT ? OFFSET ?";

Review Comment:
   It's possible that we can have two specs with the same spec_uri I believe. We should keep the modified_time ordering part of the statement too so if there are two specs with the same spec_uri so the ordering of the result from the same query stays the same at all times



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1116132447


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +412,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the existing flow and reschedule it unnecessarily
+    Long modificationTime = Long.valueOf(flowSpec.getConfigAsProperties().getProperty(MysqlBaseSpecStore.modificationTimeKey, "0"));
+    if (this.isSchedulingSpecsFromCatalog()) {
+      String uriString = flowSpec.getUri().toString();
+      // If spec does not exist in scheduler or have a modification time associated with it, assume it's the most recent
+      if (this.scheduledFlowSpecs.containsKey(uriString) && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+        if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= modificationTime) {

Review Comment:
   Added another condition for this to process if modification time is always 0 then process.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3640: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1117841094


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java:
##########
@@ -54,6 +54,9 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaRequests.exceeded";
   public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.time.to.check.quota";
 
+  public static final String GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.average.get.spec.speed.while.loading.all.specs.millis";

Review Comment:
   good call about the naming, I update all have a prefix `jobScheduler` and removed the delimiters between the name. The prefix does not contain a `.` after the `GobblinService` so we need one before the suffix otherwise they get concatenated. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org