You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "Will-Lo (via GitHub)" <gi...@apache.org> on 2023/04/04 23:43:52 UTC

[GitHub] [gobblin] Will-Lo opened a new pull request, #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Will-Lo opened a new pull request, #3667:
URL: https://github.com/apache/gobblin/pull/3667

   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1806
   
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if applicable):
   
   This PR adds a `JobSummaryTimer` which is emitted right before the `JobCommitTimer`, after the datasets are written and published to the destination.
   
   Collection of the records/bytes is dependent on the DataWriter implementing `bytesWritten()` and `recordsWritten()`, which is implemented by most writers and those that extend from the `InstrumentedDataWriter`. 
   
   Gobblin-as-a-Service ingests these JobSummaryTimer events and serializes them into the GaaSObservabilityEvent
   
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   Added unit tests to test that every job with writers emits a JobSummaryTimer and the serializer/deserialization in the GaaSObservabilityEvent
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "phet (via GitHub)" <gi...@apache.org>.
phet commented on code in PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#discussion_r1160832023


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java:
##########
@@ -728,6 +729,39 @@ protected void postProcessTaskStates(@SuppressWarnings("unused") List<TaskState>
    */
   protected void postProcessJobState(JobState jobState) {
     postProcessTaskStates(jobState.getTaskStates());
+    if (!GobblinMetrics.isEnabled(this.jobProps)) {
+      return;
+    }
+    List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
+    Map<String, JobState.DatasetState> datasetStates = this.jobContext.getDatasetStatesByUrns();
+    // Only process successful datasets unless configuration to process failed datasets is set
+    for (JobState.DatasetState datasetState : datasetStates.values()) {
+      if (datasetState.getState() == JobState.RunningState.COMMITTED
+        || (datasetState.getState() == JobState.RunningState.FAILED && this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+        long totalBytesWritten = 0;
+        long totalRecordsWritten = 0;
+        for (TaskState taskState : datasetState.getTaskStates()) {
+          if ((taskState.getWorkingState() == WorkUnitState.WorkingState.COMMITTED
+                || PropertiesUtils.getPropAsBoolean(jobProps, ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false"))
+              && taskState.contains(ConfigurationKeys.WRITER_BYTES_WRITTEN)
+              && taskState.contains(ConfigurationKeys.WRITER_RECORDS_WRITTEN)) {
+            totalBytesWritten += taskState.getPropAsLong(ConfigurationKeys.WRITER_BYTES_WRITTEN);

Review Comment:
   is it truly necessary to check `contains` before getting?  can't we just get, while supplying the default of `0L`?
   
   ... or are you concerned one but not the other could be set and wanting to inhibit counting just one in isolation?  if so, add a comment for maintainers



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java:
##########
@@ -728,6 +729,39 @@ protected void postProcessTaskStates(@SuppressWarnings("unused") List<TaskState>
    */
   protected void postProcessJobState(JobState jobState) {
     postProcessTaskStates(jobState.getTaskStates());
+    if (!GobblinMetrics.isEnabled(this.jobProps)) {
+      return;
+    }
+    List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
+    Map<String, JobState.DatasetState> datasetStates = this.jobContext.getDatasetStatesByUrns();
+    // Only process successful datasets unless configuration to process failed datasets is set
+    for (JobState.DatasetState datasetState : datasetStates.values()) {
+      if (datasetState.getState() == JobState.RunningState.COMMITTED
+        || (datasetState.getState() == JobState.RunningState.FAILED && this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+        long totalBytesWritten = 0;
+        long totalRecordsWritten = 0;
+        for (TaskState taskState : datasetState.getTaskStates()) {
+          if ((taskState.getWorkingState() == WorkUnitState.WorkingState.COMMITTED
+                || PropertiesUtils.getPropAsBoolean(jobProps, ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false"))

Review Comment:
   not sure how many iterations we might expect (esp. since only non-commit case), but as a general principle: this reaches into the props inside an inner loop, to pull out a value that's unchanged during execution.  best practice would be to read just once at the top.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+/**
+ * A class returned by {@link org.apache.gobblin.runtime.SafeDatasetCommit} to provide metrics for the dataset
+ * that can be reported as a single event in the commit phase.
+ */
+public class DatasetTaskSummary {
+  private final String datasetUrn;
+  private final long recordsWritten;
+  private final long bytesWritten;

Review Comment:
   what about `lombok.Data`... wouldn't that create a POJO just like what's here?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+/**
+ * A class returned by {@link org.apache.gobblin.runtime.SafeDatasetCommit} to provide metrics for the dataset
+ * that can be reported as a single event in the commit phase.
+ */
+public class DatasetTaskSummary {

Review Comment:
   from the perspective of those reading these metrics, it would be nice to know whether the count reflects what's committed or a failure.  maybe a boolean?
   
   (I'm presuming in the case of `COMMIT_SUCCESSFUL_TASKS` that the two could be comingled)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -93,6 +99,12 @@ private GaaSObservabilityEventExperimental createGaaSObservabilityEvent(final St
     Long jobOrchestratedTime = jobState.contains(TimingEvent.JOB_ORCHESTRATED_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_ORCHESTRATED_TIME) : null;
     Long jobPlanningPhaseStartTime = jobState.contains(TimingEvent.WORKUNIT_PLAN_START_TIME) ? jobState.getPropAsLong(TimingEvent.WORKUNIT_PLAN_START_TIME) : null;
     Long jobPlanningPhaseEndTime = jobState.contains(TimingEvent.WORKUNIT_PLAN_END_TIME) ? jobState.getPropAsLong(TimingEvent.WORKUNIT_PLAN_END_TIME) : null;
+    Type datasetTaskSummaryType = new TypeToken<ArrayList<DatasetTaskSummary>>(){}.getType();
+    List<DatasetTaskSummary> datasetTaskSummaries = jobState.contains(TimingEvent.DATASET_TASK_SUMMARIES) ?
+        GsonUtils.GSON_WITH_DATE_HANDLING.fromJson(jobState.getProp(TimingEvent.DATASET_TASK_SUMMARIES), datasetTaskSummaryType) : null;
+    List<DatasetMetric> datasetMetrics = datasetTaskSummaries != null ? datasetTaskSummaries.stream().map(summary ->
+        new DatasetMetric(summary.getDatasetUrn(), summary.getBytesWritten(), summary.getRecordsWritten())).collect(Collectors.toList()) : null;

Review Comment:
   any concern over tight coupling to instead encapsulate within a `static` conversion like `DatasetTaskSummary::toDatasetMetric`?
   
   (it seems preferable to me)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java:
##########
@@ -728,6 +729,39 @@ protected void postProcessTaskStates(@SuppressWarnings("unused") List<TaskState>
    */
   protected void postProcessJobState(JobState jobState) {
     postProcessTaskStates(jobState.getTaskStates());
+    if (!GobblinMetrics.isEnabled(this.jobProps)) {
+      return;
+    }
+    List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
+    Map<String, JobState.DatasetState> datasetStates = this.jobContext.getDatasetStatesByUrns();
+    // Only process successful datasets unless configuration to process failed datasets is set
+    for (JobState.DatasetState datasetState : datasetStates.values()) {
+      if (datasetState.getState() == JobState.RunningState.COMMITTED
+        || (datasetState.getState() == JobState.RunningState.FAILED && this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+        long totalBytesWritten = 0;
+        long totalRecordsWritten = 0;
+        for (TaskState taskState : datasetState.getTaskStates()) {
+          if ((taskState.getWorkingState() == WorkUnitState.WorkingState.COMMITTED
+                || PropertiesUtils.getPropAsBoolean(jobProps, ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false"))
+              && taskState.contains(ConfigurationKeys.WRITER_BYTES_WRITTEN)
+              && taskState.contains(ConfigurationKeys.WRITER_RECORDS_WRITTEN)) {
+            totalBytesWritten += taskState.getPropAsLong(ConfigurationKeys.WRITER_BYTES_WRITTEN);
+            totalRecordsWritten += taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN);
+          }
+        }
+        LOG.info("Reporting that " + totalRecordsWritten + " records and " + totalBytesWritten + " bytes were written for " + datasetState.getDatasetUrn());

Review Comment:
   I suggest a more parseable, more "grep-able" and less conversational format in case we ever want to audit events against logging.
   e.g.
   ```
   "DatasetMetrics for '%s' - (records: %d; bytes: %d)"
   ```



##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java:
##########
@@ -58,6 +61,10 @@ public void testCreateGaaSObservabilityEventWithFullMetadata() throws Exception
         TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, flowExecutionId, jobName),
         createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
     );
+    List<DatasetTaskSummary> summaries = new ArrayList<>();
+    summaries.add(new DatasetTaskSummary("/testFolder", 100, 1000));
+    summaries.add(new DatasetTaskSummary("/testFolder2", 1000, 10000));

Review Comment:
   nit - since you both write and then re-read for verification, rather than hard-coding the same constants in both places, I'd save aside each `DatasetTaskSummary` here to be able to access from it down below when it comes to verification



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc:
##########
@@ -188,6 +188,38 @@
           }
         }
       ]
-    }]
+    },
+    {
+      "name": "datasetsWritten",
+      "type": [
+        "null",
+        {
+          "type": "array",
+          "items": {
+            "type": "record",
+            "name": "DatasetMetric",
+            "doc": "DatasetMetric contains bytes and records written by Gobblin writers for the dataset URN.",
+            "fields": [
+              {
+                "name": "datasetUrn",
+                "type": "string",
+                "doc": "URN of the dataset"
+              },
+              {
+                "name": "bytesWritten",
+                "type": "long",
+                "doc": "Number of bytes written for the dataset"

Review Comment:
   which jobs is this applicable to?  e.g. could it work for retention-release?  what about for pulling record-by-record from a CRM system and writing a subset of the records fields to a relational DB?  
   
   how to measure the number of bytes "written" to the DB... maybe approximate from the char count of the stringified SQL statement?
   
   (the questions I'm unclear on are probably worth anticipating from those looking at the "doc" strings here)



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc:
##########
@@ -188,6 +188,38 @@
           }
         }
       ]
-    }]
+    },
+    {
+      "name": "datasetsWritten",
+      "type": [
+        "null",
+        {
+          "type": "array",
+          "items": {
+            "type": "record",
+            "name": "DatasetMetric",
+            "doc": "DatasetMetric contains bytes and records written by Gobblin writers for the dataset URN.",
+            "fields": [
+              {
+                "name": "datasetUrn",
+                "type": "string",
+                "doc": "URN of the dataset"
+              },
+              {
+                "name": "bytesWritten",
+                "type": "long",
+                "doc": "Number of bytes written for the dataset"
+              },
+              {
+                "name": "recordsWritten",
+                "type": "long",
+                "doc": "Number of records written for the dataset"

Review Comment:
   when we say "records", would that be files in the case of `CopySource` / distcp?  if so, let's describe accordingly in the doc string... and possibly even in the name (e.g. `entitiesWritten`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] codecov-commenter commented on pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#issuecomment-1496741617

   ## [Codecov](https://codecov.io/gh/apache/gobblin/pull/3667?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3667](https://codecov.io/gh/apache/gobblin/pull/3667?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e307b56) into [master](https://codecov.io/gh/apache/gobblin/commit/be6d46c2a117b1248d88d22cc64db5a12b802d16?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (be6d46c) will **decrease** coverage by `2.08%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3667      +/-   ##
   ============================================
   - Coverage     46.85%   44.78%   -2.08%     
   + Complexity    10749     2090    -8659     
   ============================================
     Files          2138      411    -1727     
     Lines         83989    17697   -66292     
     Branches       9331     2157    -7174     
   ============================================
   - Hits          39353     7925   -31428     
   + Misses        41054     8915   -32139     
   + Partials       3582      857    -2725     
   ```
   
   
   [see 1732 files with indirect coverage changes](https://codecov.io/gh/apache/gobblin/pull/3667/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo commented on code in PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#discussion_r1160940122


##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc:
##########
@@ -188,6 +188,38 @@
           }
         }
       ]
-    }]
+    },
+    {
+      "name": "datasetsWritten",
+      "type": [
+        "null",
+        {
+          "type": "array",
+          "items": {
+            "type": "record",
+            "name": "DatasetMetric",
+            "doc": "DatasetMetric contains bytes and records written by Gobblin writers for the dataset URN.",
+            "fields": [
+              {
+                "name": "datasetUrn",
+                "type": "string",
+                "doc": "URN of the dataset"
+              },
+              {
+                "name": "bytesWritten",
+                "type": "long",
+                "doc": "Number of bytes written for the dataset"
+              },
+              {
+                "name": "recordsWritten",
+                "type": "long",
+                "doc": "Number of records written for the dataset"

Review Comment:
   Yup it's files written, will update to entitiesWritten



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "phet (via GitHub)" <gi...@apache.org>.
phet commented on code in PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#discussion_r1163069107


##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc:
##########
@@ -208,12 +208,17 @@
               {
                 "name": "bytesWritten",
                 "type": "long",
-                "doc": "Number of bytes written for the dataset"
+                "doc": "Number of bytes written for the dataset, can be -1 if unsupported by the writer (e.g. jdbc writer)"
               },
               {
-                "name": "recordsWritten",
+                "name": "entitiesWritten",
                 "type": "long",
-                "doc": "Number of records written for the dataset"
+                "doc": "Number of entities written for the dataset by the Gobblin writer"

Review Comment:
   "(e.g. files or records)"



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc:
##########
@@ -208,12 +208,17 @@
               {
                 "name": "bytesWritten",
                 "type": "long",
-                "doc": "Number of bytes written for the dataset"
+                "doc": "Number of bytes written for the dataset, can be -1 if unsupported by the writer (e.g. jdbc writer)"
               },
               {
-                "name": "recordsWritten",
+                "name": "entitiesWritten",
                 "type": "long",
-                "doc": "Number of records written for the dataset"
+                "doc": "Number of entities written for the dataset by the Gobblin writer"
+              },
+              {
+                "name": "datasetCommitSucceeded",

Review Comment:
   nit: do we really want to repeat 'dataset', as in `datasetsWritten[*].datasetCommitSucceeded` (e.g. we don't name it `datasetBytesWritten`)?  maybe `wasCommitted`(?)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java:
##########
@@ -17,30 +17,26 @@
 
 package org.apache.gobblin.runtime;
 
+import lombok.Data;
+
+import org.apache.gobblin.metrics.DatasetMetric;
+
+
 /**
  * A class returned by {@link org.apache.gobblin.runtime.SafeDatasetCommit} to provide metrics for the dataset
  * that can be reported as a single event in the commit phase.
  */
+@Data
 public class DatasetTaskSummary {
   private final String datasetUrn;
   private final long recordsWritten;
   private final long bytesWritten;
+  private final boolean datasetCommitSucceeded;
 
-  public DatasetTaskSummary(String datasetUrn, long recordsWritten, long bytesWritten) {
-    this.datasetUrn = datasetUrn;
-    this.recordsWritten = recordsWritten;
-    this.bytesWritten = bytesWritten;
-  }
-
-  public String getDatasetUrn() {
-    return datasetUrn;
-  }
-
-  public long getRecordsWritten() {
-    return recordsWritten;
-  }
-
-  public long getBytesWritten() {
-    return bytesWritten;
+  /**
+   * Convert a {@link DatasetTaskSummary} to a {@link DatasetMetric}.
+   */
+  public static DatasetMetric toDatasetMetric(DatasetTaskSummary datasetTaskSummary) {

Review Comment:
   NBD, but why not an instance method?  (invocation syntax, inside `.map()` would remain unchanged)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo commented on code in PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#discussion_r1160940512


##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc:
##########
@@ -188,6 +188,38 @@
           }
         }
       ]
-    }]
+    },
+    {
+      "name": "datasetsWritten",
+      "type": [
+        "null",
+        {
+          "type": "array",
+          "items": {
+            "type": "record",
+            "name": "DatasetMetric",
+            "doc": "DatasetMetric contains bytes and records written by Gobblin writers for the dataset URN.",
+            "fields": [
+              {
+                "name": "datasetUrn",
+                "type": "string",
+                "doc": "URN of the dataset"
+              },
+              {
+                "name": "bytesWritten",
+                "type": "long",
+                "doc": "Number of bytes written for the dataset"
+              },
+              {
+                "name": "recordsWritten",
+                "type": "long",
+                "doc": "Number of records written for the dataset"

Review Comment:
   Could also be rows for the mysql/espresso case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo commented on code in PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#discussion_r1163461852


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java:
##########
@@ -17,30 +17,26 @@
 
 package org.apache.gobblin.runtime;
 
+import lombok.Data;
+
+import org.apache.gobblin.metrics.DatasetMetric;
+
+
 /**
  * A class returned by {@link org.apache.gobblin.runtime.SafeDatasetCommit} to provide metrics for the dataset
  * that can be reported as a single event in the commit phase.
  */
+@Data
 public class DatasetTaskSummary {
   private final String datasetUrn;
   private final long recordsWritten;
   private final long bytesWritten;
+  private final boolean datasetCommitSucceeded;
 
-  public DatasetTaskSummary(String datasetUrn, long recordsWritten, long bytesWritten) {
-    this.datasetUrn = datasetUrn;
-    this.recordsWritten = recordsWritten;
-    this.bytesWritten = bytesWritten;
-  }
-
-  public String getDatasetUrn() {
-    return datasetUrn;
-  }
-
-  public long getRecordsWritten() {
-    return recordsWritten;
-  }
-
-  public long getBytesWritten() {
-    return bytesWritten;
+  /**
+   * Convert a {@link DatasetTaskSummary} to a {@link DatasetMetric}.
+   */
+  public static DatasetMetric toDatasetMetric(DatasetTaskSummary datasetTaskSummary) {

Review Comment:
   if it's an instance method the syntax inside map would be different since it wouldn't be static anymore right? It should look like `.map(summary -> summary.toDatasetMetric())`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo commented on code in PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#discussion_r1160923357


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java:
##########
@@ -728,6 +729,39 @@ protected void postProcessTaskStates(@SuppressWarnings("unused") List<TaskState>
    */
   protected void postProcessJobState(JobState jobState) {
     postProcessTaskStates(jobState.getTaskStates());
+    if (!GobblinMetrics.isEnabled(this.jobProps)) {
+      return;
+    }
+    List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
+    Map<String, JobState.DatasetState> datasetStates = this.jobContext.getDatasetStatesByUrns();
+    // Only process successful datasets unless configuration to process failed datasets is set
+    for (JobState.DatasetState datasetState : datasetStates.values()) {
+      if (datasetState.getState() == JobState.RunningState.COMMITTED
+        || (datasetState.getState() == JobState.RunningState.FAILED && this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+        long totalBytesWritten = 0;
+        long totalRecordsWritten = 0;
+        for (TaskState taskState : datasetState.getTaskStates()) {
+          if ((taskState.getWorkingState() == WorkUnitState.WorkingState.COMMITTED
+                || PropertiesUtils.getPropAsBoolean(jobProps, ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false"))
+              && taskState.contains(ConfigurationKeys.WRITER_BYTES_WRITTEN)
+              && taskState.contains(ConfigurationKeys.WRITER_RECORDS_WRITTEN)) {
+            totalBytesWritten += taskState.getPropAsLong(ConfigurationKeys.WRITER_BYTES_WRITTEN);

Review Comment:
   In the Gobblin writer implementation it would expect both to be implemented or neither, so I'll leave a comment that some writers omit these metrics



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo commented on code in PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#discussion_r1160941804


##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc:
##########
@@ -188,6 +188,38 @@
           }
         }
       ]
-    }]
+    },
+    {
+      "name": "datasetsWritten",
+      "type": [
+        "null",
+        {
+          "type": "array",
+          "items": {
+            "type": "record",
+            "name": "DatasetMetric",
+            "doc": "DatasetMetric contains bytes and records written by Gobblin writers for the dataset URN.",
+            "fields": [
+              {
+                "name": "datasetUrn",
+                "type": "string",
+                "doc": "URN of the dataset"
+              },
+              {
+                "name": "bytesWritten",
+                "type": "long",
+                "doc": "Number of bytes written for the dataset"

Review Comment:
   I'll add it to the doc for bytesWritten that it can be set to -1 for unsupported



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo commented on code in PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#discussion_r1160947814


##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc:
##########
@@ -188,6 +188,38 @@
           }
         }
       ]
-    }]
+    },
+    {
+      "name": "datasetsWritten",
+      "type": [
+        "null",
+        {
+          "type": "array",
+          "items": {
+            "type": "record",
+            "name": "DatasetMetric",
+            "doc": "DatasetMetric contains bytes and records written by Gobblin writers for the dataset URN.",
+            "fields": [
+              {
+                "name": "datasetUrn",
+                "type": "string",
+                "doc": "URN of the dataset"
+              },
+              {
+                "name": "bytesWritten",
+                "type": "long",
+                "doc": "Number of bytes written for the dataset"

Review Comment:
   For jobs that this is applicable, it's not usable for retention but it should be compatible with any writer that records its records/bytes written, e.g. Distcp, Hdfs -> Mysql. So it's dependent on the writer implementation, but I can see that almost every Gobblin writer implements `recordsWritten()` and `bytesWritten()` as its enforced in the `Writer` interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "phet (via GitHub)" <gi...@apache.org>.
phet commented on code in PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#discussion_r1160861533


##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc:
##########
@@ -188,6 +188,38 @@
           }
         }
       ]
-    }]
+    },
+    {
+      "name": "datasetsWritten",
+      "type": [
+        "null",
+        {
+          "type": "array",
+          "items": {
+            "type": "record",
+            "name": "DatasetMetric",
+            "doc": "DatasetMetric contains bytes and records written by Gobblin writers for the dataset URN.",
+            "fields": [
+              {
+                "name": "datasetUrn",
+                "type": "string",
+                "doc": "URN of the dataset"
+              },
+              {
+                "name": "bytesWritten",
+                "type": "long",
+                "doc": "Number of bytes written for the dataset"

Review Comment:
   which jobs is this applicable to?  e.g. could it work for retention-release?  what about for pulling record-by-record from a CRM system and writing a subset of the records fields to a relational DB?  
   
   how to measure the number of bytes "written" to the DB... maybe approximate from the char count of the stringified SQL statement?
   
   (the questions I'm unclear on are ones to anticipate from users seeking clarity from these "doc" strings)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo commented on PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#issuecomment-1504378848

   Oh, I meant that I'll aggregate on the task level but let the client handle aggregation on the dataset level, since it is rare to have a single pipeline manage say tens of thousands of datasets.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo merged pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo merged PR #3667:
URL: https://github.com/apache/gobblin/pull/3667


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo commented on code in PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#discussion_r1163465018


##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc:
##########
@@ -208,12 +208,17 @@
               {
                 "name": "bytesWritten",
                 "type": "long",
-                "doc": "Number of bytes written for the dataset"
+                "doc": "Number of bytes written for the dataset, can be -1 if unsupported by the writer (e.g. jdbc writer)"
               },
               {
-                "name": "recordsWritten",
+                "name": "entitiesWritten",
                 "type": "long",
-                "doc": "Number of records written for the dataset"
+                "doc": "Number of entities written for the dataset by the Gobblin writer"
+              },
+              {
+                "name": "datasetCommitSucceeded",

Review Comment:
   Going to name it `successfullyCommitted` so lombok doesn't add `isWasCommitted`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo commented on code in PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#discussion_r1160941473


##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc:
##########
@@ -188,6 +188,38 @@
           }
         }
       ]
-    }]
+    },
+    {
+      "name": "datasetsWritten",
+      "type": [
+        "null",
+        {
+          "type": "array",
+          "items": {
+            "type": "record",
+            "name": "DatasetMetric",
+            "doc": "DatasetMetric contains bytes and records written by Gobblin writers for the dataset URN.",
+            "fields": [
+              {
+                "name": "datasetUrn",
+                "type": "string",
+                "doc": "URN of the dataset"
+              },
+              {
+                "name": "bytesWritten",
+                "type": "long",
+                "doc": "Number of bytes written for the dataset"

Review Comment:
   For the DB writers it's not supported, for `JdbcWriter they support records written but for bytesWritten it's implemented as below.
   ```
     /**
      * This is not supported for JDBC writer.
      * {@inheritDoc}
      * @see org.apache.gobblin.writer.DataWriter#bytesWritten()
      */
     @Override
     public long bytesWritten() throws IOException {
       return -1L;
     }
     ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo commented on PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#issuecomment-1500676439

   Responding to the top level comment: `would we wish to pre-aggregate that within the event itself`, I think aggregation is needed because there can be thousands of tasks for large pipelines, which makes serializing all the states into an event lead to large Kafka events which we want to avoid. Also, I think most clients wouldn't care too much about the inner details of every individual task/mapper, it's mainly the concern of the Gobblin framework to deal with correctly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "phet (via GitHub)" <gi...@apache.org>.
phet commented on PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#issuecomment-1503758833

   > Responding to the top level comment: `would we wish to pre-aggregate that within the event itself`, I think aggregation is needed because there can be thousands of tasks for large pipelines, which makes serializing all the states into an event lead to large Kafka events which we want to avoid. Also, I think most clients wouldn't care too much about the inner details of every individual task/mapper, it's mainly the concern of the Gobblin framework to deal with correctly.
   
   does this mean you'll add a summary field to aggregate measurement across all datasets?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "phet (via GitHub)" <gi...@apache.org>.
phet commented on PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#issuecomment-1505582228

   > Oh, I meant that I'll aggregate on the task level but let the client handle aggregation on the dataset level, since it is rare to have a single pipeline manage say tens of thousands of datasets.
   
   yes, I agree, but think of it less ITO efficiency and how many ops when performing aggregation, than of convenience: even if there are only 5, it still requires a piece of code to total them per-event, within every downstream analysis considering the job-level total.  I believe this to be most analyses...
   
   so although I generally recommend against de-normalized data structures, I see the ready-to-use convenience out weighing the design principle.  consider the simple problem of counting by user, who's copying > 1TB in a job.  it's trivially simple to filter by bytes... but only so long with a job-level sum.  if the event has only task-level aggregation, the friction increases considerably, whether in SQL or kusto.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3667: [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent

Posted by "phet (via GitHub)" <gi...@apache.org>.
phet commented on code in PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#discussion_r1164358410


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java:
##########
@@ -17,30 +17,26 @@
 
 package org.apache.gobblin.runtime;
 
+import lombok.Data;
+
+import org.apache.gobblin.metrics.DatasetMetric;
+
+
 /**
  * A class returned by {@link org.apache.gobblin.runtime.SafeDatasetCommit} to provide metrics for the dataset
  * that can be reported as a single event in the commit phase.
  */
+@Data
 public class DatasetTaskSummary {
   private final String datasetUrn;
   private final long recordsWritten;
   private final long bytesWritten;
+  private final boolean datasetCommitSucceeded;
 
-  public DatasetTaskSummary(String datasetUrn, long recordsWritten, long bytesWritten) {
-    this.datasetUrn = datasetUrn;
-    this.recordsWritten = recordsWritten;
-    this.bytesWritten = bytesWritten;
-  }
-
-  public String getDatasetUrn() {
-    return datasetUrn;
-  }
-
-  public long getRecordsWritten() {
-    return recordsWritten;
-  }
-
-  public long getBytesWritten() {
-    return bytesWritten;
+  /**
+   * Convert a {@link DatasetTaskSummary} to a {@link DatasetMetric}.
+   */
+  public static DatasetMetric toDatasetMetric(DatasetTaskSummary datasetTaskSummary) {

Review Comment:
   syntax should be no different - https://docs.oracle.com/javase/tutorial/java/javaOO/methodreferences.html
   e.g.
   ```
   String::compareToIgnoreCase
   String::concat
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org