You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2023/01/13 18:27:14 UTC

[GitHub] [gobblin] Will-Lo opened a new pull request, #3623: [GOBBLIN-1764] Emit observability event

Will-Lo opened a new pull request, #3623:
URL: https://github.com/apache/gobblin/pull/3623

   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1764
   
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if applicable):
   
   GobblinTrackingEvents are not suitable for observability platforms at scale due to their frequency of emission and lack of standardized schema.
   
   GaaSObservabilityEvents are a new events that provides a job summary from pipelines in GaaS. It differs from GobblinTrackingEvents as it runs once per job pipeline, and it intended to be easily queryable and alert on.
   
   We want to emit this observability event from GaaS by deriving it from a job's job status. Since this feature is Experimental and WIP, it is not expected to fill out all of the fields immediately.
   
   This creates a template for a configurable GaaSObservabilityEventProducer, and the emission logic can be implemented by any derived class that will define the message sending mechanism. 
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] arjun4084346 commented on pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#issuecomment-1385907611

   I read the description of the PR, can you please elaborate more what benefit GaaSObservabilityEvents has over a GobblinTrackingEvent with name="GaaSObservabilityEvent". How is it more easily queryable compared to GobblinTrackingEvent? Is the frequency of emission is the only benefit?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo merged pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo merged PR #3623:
URL: https://github.com/apache/gobblin/pull/3623


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] codecov-commenter commented on pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#issuecomment-1382257855

   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3623?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3623](https://codecov.io/gh/apache/gobblin/pull/3623?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5d4a87e) into [master](https://codecov.io/gh/apache/gobblin/commit/5d1461f43d181b18675b52a7de7c411346b867cd?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5d1461f) will **increase** coverage by `1.46%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3623      +/-   ##
   ============================================
   + Coverage     46.54%   48.01%   +1.46%     
   + Complexity    10646     7799    -2847     
   ============================================
     Files          2129     1474     -655     
     Lines         83291    58095   -25196     
     Branches       9279     6684    -2595     
   ============================================
   - Hits          38768    27892   -10876     
   + Misses        40960    27588   -13372     
   + Partials       3563     2615     -948     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3623?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Ξ” | |
   |---|---|---|
   | [...n/runtime/troubleshooter/JobIssueEventHandler.java](https://codecov.io/gh/apache/gobblin/pull/3623?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvdHJvdWJsZXNob290ZXIvSm9iSXNzdWVFdmVudEhhbmRsZXIuamF2YQ==) | `68.42% <0.00%> (-1.85%)` | :arrow_down: |
   | [...blin/data/management/copy/RecursivePathFinder.java](https://codecov.io/gh/apache/gobblin/pull/3623?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvUmVjdXJzaXZlUGF0aEZpbmRlci5qYXZh) | `0.00% <0.00%> (-92.86%)` | :arrow_down: |
   | [...data/management/copy/extractor/EmptyExtractor.java](https://codecov.io/gh/apache/gobblin/pull/3623?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvZXh0cmFjdG9yL0VtcHR5RXh0cmFjdG9yLmphdmE=) | `0.00% <0.00%> (-83.34%)` | :arrow_down: |
   | [...a/management/copy/hive/HiveLocationDescriptor.java](https://codecov.io/gh/apache/gobblin/pull/3623?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvaGl2ZS9IaXZlTG9jYXRpb25EZXNjcmlwdG9yLmphdmE=) | `36.17% <0.00%> (-14.90%)` | :arrow_down: |
   | [...anagement/copy/hive/UnpartitionedTableFileSet.java](https://codecov.io/gh/apache/gobblin/pull/3623?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvaGl2ZS9VbnBhcnRpdGlvbmVkVGFibGVGaWxlU2V0LmphdmE=) | `66.66% <0.00%> (-12.97%)` | :arrow_down: |
   | [...ata/management/copy/hive/HiveCopyEntityHelper.java](https://codecov.io/gh/apache/gobblin/pull/3623?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvaGl2ZS9IaXZlQ29weUVudGl0eUhlbHBlci5qYXZh) | `50.56% <0.00%> (-11.59%)` | :arrow_down: |
   | [...ache/gobblin/util/commit/DeleteFileCommitStep.java](https://codecov.io/gh/apache/gobblin/pull/3623?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vdXRpbC9jb21taXQvRGVsZXRlRmlsZUNvbW1pdFN0ZXAuamF2YQ==) | `34.04% <0.00%> (-8.52%)` | :arrow_down: |
   | [...a/management/copy/publisher/CopyDataPublisher.java](https://codecov.io/gh/apache/gobblin/pull/3623?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvcHVibGlzaGVyL0NvcHlEYXRhUHVibGlzaGVyLmphdmE=) | `67.54% <0.00%> (-7.95%)` | :arrow_down: |
   | [.../hive/filter/LookbackPartitionFilterGenerator.java](https://codecov.io/gh/apache/gobblin/pull/3623?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvaGl2ZS9maWx0ZXIvTG9va2JhY2tQYXJ0aXRpb25GaWx0ZXJHZW5lcmF0b3IuamF2YQ==) | `78.94% <0.00%> (-5.27%)` | :arrow_down: |
   | [...e/gobblin/data/management/copy/hive/HiveUtils.java](https://codecov.io/gh/apache/gobblin/pull/3623?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvaGl2ZS9IaXZlVXRpbHMuamF2YQ==) | `52.83% <0.00%> (-3.78%)` | :arrow_down: |
   | ... and [663 more](https://codecov.io/gh/apache/gobblin/pull/3623?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1070030849


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -266,6 +277,11 @@ static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobS
 
       modifyStateIfRetryRequired(jobStatus);
       stateStore.put(storeName, tableName, jobStatus);
+
+      if (isStateTransitionToFinal(jobStatus, states) && eventProducer.isPresent()) {
+        log.info("I am here");

Review Comment:
   Oops I was debugging something and it got caught πŸ˜‚ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1080573520


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = NoopGaaSObservabilityEventProducer.class.getName();
+  public static final String ISSUES_READ_FAILED_METRIC_NAME =  GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
+
+  protected MetricContext metricContext;
+  protected State state;
+  protected MultiContextIssueRepository issueRepository;
+  boolean instrumentationEnabled;
+  ContextAwareMeter getIssuesFailedMeter;
+
+  public GaaSObservabilityEventProducer(State state, MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+    this.state = state;
+    this.issueRepository = issueRepository;
+    this.instrumentationEnabled = instrumentationEnabled;
+    if (this.instrumentationEnabled) {
+      this.metricContext = Instrumented.getMetricContext(state, getClass());
+      this.getIssuesFailedMeter = this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+          ISSUES_READ_FAILED_METRIC_NAME));
+    }
+  }
+
+  public void emitObservabilityEvent(State jobState) {

Review Comment:
   an argument could be made for this to be `final`
   
   update: below, I found out why you don't... perhaps document for maintainers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
ZihanLi58 commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1070086617


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_ENABLED = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "enabled";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String ISSUE_READ_ERROR_COUNT =  "GaaSObservability.producer.getIssuesFailedCount";

Review Comment:
   what's this field means? Can you add comment to explain or change it to more meaningful name



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -136,6 +140,12 @@ public <V> void onRetry(Attempt<V> attempt) {
             }
           }
         }));
+
+    if (instrumentationEnabled && ConfigUtils.getBoolean(config, GaaSObservabilityEventProducer.GAAS_OBSERVABILITY_EVENT_ENABLED, false)) {

Review Comment:
   why we want to check instrumentationEnabled to emit observability event? isn't GAAS_OBSERVABILITY_EVENT_ENABLED enough in this case?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_ENABLED = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "enabled";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String ISSUE_READ_ERROR_COUNT =  "GaaSObservability.producer.getIssuesFailedCount";
+
+  protected MetricContext metricContext;
+  protected State state;
+  protected MultiContextIssueRepository issueRepository;
+  ContextAwareMeter getIssuesFailedMeter;
+
+  public GaaSObservabilityEventProducer(State state, MultiContextIssueRepository issueRepository) {
+    this.metricContext = Instrumented.getMetricContext(state, getClass());
+    getIssuesFailedMeter = this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+        ISSUE_READ_ERROR_COUNT));
+    this.state = state;
+    this.issueRepository = issueRepository;
+  }
+
+  public void emitObservabilityEvent(State jobState) {
+    GaaSObservabilityEventExperimental event = createGaaSObservabilityEvent(jobState);
+    sendUnderlyingEvent(event);
+  }
+
+  /**
+   * Emits the GaaSObservabilityEvent with the mechanism that the child class is built upon e.g. Kafka
+   * @param event
+   */
+  abstract protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event);
+
+  /**
+   * Creates a GaaSObservabilityEvent which is derived from a final GaaS job pipeline state, which is combination of GTE job states in an ordered fashion
+   * @param jobState
+   * @return GaaSObservabilityEvent
+   */
+  private GaaSObservabilityEventExperimental createGaaSObservabilityEvent(State jobState) {
+    Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
+    Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
+    GaaSObservabilityEventExperimental.Builder builder = GaaSObservabilityEventExperimental.newBuilder();
+    List<Issue> issueList = null;
+    try {
+      issueList = issueRepository.getAll(TroubleshooterUtils.getContextIdForJob(jobState.getProperties())).stream().map(
+              issue -> new org.apache.gobblin.metrics.Issue(issue.getTime().toEpochSecond(),

Review Comment:
   Why specify the whole class name here? 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_ENABLED = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "enabled";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String ISSUE_READ_ERROR_COUNT =  "GaaSObservability.producer.getIssuesFailedCount";
+
+  protected MetricContext metricContext;
+  protected State state;
+  protected MultiContextIssueRepository issueRepository;
+  ContextAwareMeter getIssuesFailedMeter;
+
+  public GaaSObservabilityEventProducer(State state, MultiContextIssueRepository issueRepository) {
+    this.metricContext = Instrumented.getMetricContext(state, getClass());
+    getIssuesFailedMeter = this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+        ISSUE_READ_ERROR_COUNT));
+    this.state = state;
+    this.issueRepository = issueRepository;
+  }
+
+  public void emitObservabilityEvent(State jobState) {
+    GaaSObservabilityEventExperimental event = createGaaSObservabilityEvent(jobState);
+    sendUnderlyingEvent(event);
+  }
+
+  /**
+   * Emits the GaaSObservabilityEvent with the mechanism that the child class is built upon e.g. Kafka
+   * @param event
+   */
+  abstract protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event);
+
+  /**
+   * Creates a GaaSObservabilityEvent which is derived from a final GaaS job pipeline state, which is combination of GTE job states in an ordered fashion
+   * @param jobState
+   * @return GaaSObservabilityEvent
+   */
+  private GaaSObservabilityEventExperimental createGaaSObservabilityEvent(State jobState) {
+    Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
+    Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
+    GaaSObservabilityEventExperimental.Builder builder = GaaSObservabilityEventExperimental.newBuilder();
+    List<Issue> issueList = null;
+    try {
+      issueList = issueRepository.getAll(TroubleshooterUtils.getContextIdForJob(jobState.getProperties())).stream().map(
+              issue -> new org.apache.gobblin.metrics.Issue(issue.getTime().toEpochSecond(),
+                  IssueSeverity.valueOf(issue.getSeverity().toString()), issue.getCode(), issue.getSummary(), issue.getDetails(), issue.getProperties())).collect(Collectors.toList());
+    } catch (Exception e) {
+      // If issues cannot be fetched, increment metric but continue to try to emit the event
+      log.error("Could not fetch issues while creating GaaSObservabilityEvent due to ", e);
+      getIssuesFailedMeter.mark();
+    }
+    JobStatus status = convertExecutionStatusTojobState(jobState, ExecutionStatus.valueOf(jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD)));
+    builder.setTimestamp(System.currentTimeMillis())
+        .setFlowName(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD))
+        .setFlowGroup(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD))
+        .setFlowExecutionId(jobState.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))
+        .setJobName(jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD))
+        .setExecutorUrl(jobState.getProp(TimingEvent.METADATA_MESSAGE))
+        .setJobStartTime(jobStartTime)
+        .setJobEndTime(jobEndTime)
+        .setIssues(issueList)
+        .setJobStatus(status)
+        // TODO: Populate the below fields in a separate PR
+        .setExecutionUserUrn(null)
+        .setExecutorId("")
+        .setLastFlowModificationTime(0)
+        .setFlowGraphEdgeId("")
+        .setJobOrchestratedTime(null); // TODO: Investigate why TimingEvent.JOB_ORCHESTRATED_TIME is never propagated to the JobStatus
+    return builder.build();
+  }
+
+  private static JobStatus convertExecutionStatusTojobState(State state, ExecutionStatus executionStatus) {
+    switch (executionStatus) {
+      case FAILED:
+        // TODO: Separate failure cases to SUBMISSION FAILURE and COMPILATION FAILURE, investigate events to populate these fields
+        if (state.contains(TimingEvent.JOB_END_TIME)) {
+          return JobStatus.EXECUTION_FAILURE;
+        }

Review Comment:
   Should we return Compilation failure in else block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1073873739


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_ENABLED = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "enabled";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String ISSUE_READ_ERROR_COUNT =  "GaaSObservability.producer.getIssuesFailedCount";

Review Comment:
   Going to rename it as `ISSUES_READ_FAILED_METRIC_NAME`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1080652649


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = NoopGaaSObservabilityEventProducer.class.getName();
+  public static final String ISSUES_READ_FAILED_METRIC_NAME =  GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
+
+  protected MetricContext metricContext;
+  protected State state;
+  protected MultiContextIssueRepository issueRepository;
+  boolean instrumentationEnabled;
+  ContextAwareMeter getIssuesFailedMeter;
+
+  public GaaSObservabilityEventProducer(State state, MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+    this.state = state;
+    this.issueRepository = issueRepository;
+    this.instrumentationEnabled = instrumentationEnabled;
+    if (this.instrumentationEnabled) {
+      this.metricContext = Instrumented.getMetricContext(state, getClass());
+      this.getIssuesFailedMeter = this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+          ISSUES_READ_FAILED_METRIC_NAME));
+    }
+  }
+
+  public void emitObservabilityEvent(State jobState) {

Review Comment:
   I can still set it as final but it wouldn't stop me from mutating the jobState (not that it needs to be done though)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1070103059


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_ENABLED = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "enabled";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String ISSUE_READ_ERROR_COUNT =  "GaaSObservability.producer.getIssuesFailedCount";
+
+  protected MetricContext metricContext;
+  protected State state;
+  protected MultiContextIssueRepository issueRepository;
+  ContextAwareMeter getIssuesFailedMeter;
+
+  public GaaSObservabilityEventProducer(State state, MultiContextIssueRepository issueRepository) {
+    this.metricContext = Instrumented.getMetricContext(state, getClass());
+    getIssuesFailedMeter = this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+        ISSUE_READ_ERROR_COUNT));
+    this.state = state;
+    this.issueRepository = issueRepository;
+  }
+
+  public void emitObservabilityEvent(State jobState) {
+    GaaSObservabilityEventExperimental event = createGaaSObservabilityEvent(jobState);
+    sendUnderlyingEvent(event);
+  }
+
+  /**
+   * Emits the GaaSObservabilityEvent with the mechanism that the child class is built upon e.g. Kafka
+   * @param event
+   */
+  abstract protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event);
+
+  /**
+   * Creates a GaaSObservabilityEvent which is derived from a final GaaS job pipeline state, which is combination of GTE job states in an ordered fashion
+   * @param jobState
+   * @return GaaSObservabilityEvent
+   */
+  private GaaSObservabilityEventExperimental createGaaSObservabilityEvent(State jobState) {
+    Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
+    Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
+    GaaSObservabilityEventExperimental.Builder builder = GaaSObservabilityEventExperimental.newBuilder();
+    List<Issue> issueList = null;
+    try {
+      issueList = issueRepository.getAll(TroubleshooterUtils.getContextIdForJob(jobState.getProperties())).stream().map(
+              issue -> new org.apache.gobblin.metrics.Issue(issue.getTime().toEpochSecond(),
+                  IssueSeverity.valueOf(issue.getSeverity().toString()), issue.getCode(), issue.getSummary(), issue.getDetails(), issue.getProperties())).collect(Collectors.toList());

Review Comment:
   seems ripe for abstracting entirely within a conversion function
   [I mean L95-L96]



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1080707399


##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.Test;
+import org.testng.Assert;
+
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+public class GaaSObservabilityProducerTest {
+
+  private MultiContextIssueRepository issueRepository = new InMemoryMultiContextIssueRepository();
+
+  @Test
+  public void testCreateGaaSObservabilityEvent() throws Exception {
+    String flowGroup = "testFlowGroup1";
+    String flowName = "testFlowName1";
+    String jobName = String.format("%s_%s_%s", flowGroup, flowName, "testJobName1");
+    String flowExecutionId = "1";
+    this.issueRepository.put(
+        TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, flowExecutionId, jobName),
+        createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+    );
+    MockGaaSObservabilityProducer producer = new MockGaaSObservabilityProducer(new State(), this.issueRepository);
+    Map<String, String> gteEventMetadata = Maps.newHashMap();
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, "1");
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, flowName);
+    gteEventMetadata.put(TimingEvent.METADATA_MESSAGE, "hostName");
+    gteEventMetadata.put(TimingEvent.METADATA_START_TIME, "1");
+    gteEventMetadata.put(TimingEvent.METADATA_END_TIME, "100");
+    gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPLETE.name());
+
+    Properties jobStatusProps = new Properties();
+    jobStatusProps.putAll(gteEventMetadata);
+    producer.emitObservabilityEvent(new State(jobStatusProps));
+
+    List<GaaSObservabilityEventExperimental> emittedEvents = producer.getTestEmittedEvents();
+
+    Assert.assertEquals(emittedEvents.size(), 1);
+    Iterator<GaaSObservabilityEventExperimental> iterator = emittedEvents.iterator();
+    GaaSObservabilityEventExperimental event = iterator.next();
+    Assert.assertEquals(event.getFlowGroup(), flowGroup);
+    Assert.assertEquals(event.getFlowName(), flowName);
+    Assert.assertEquals(event.getJobName(), jobName);
+    Assert.assertEquals(event.getFlowExecutionId(), Long.valueOf(flowExecutionId));
+    Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
+    Assert.assertEquals(event.getExecutorUrl(), "hostName");
+    Assert.assertEquals(event.getIssues().size(), 1);
+  }
+
+  private Issue createTestIssue(String summary, String code, IssueSeverity severity) {
+    return Issue.builder().summary(summary).code(code).time(ZonedDateTime.now()).severity(severity).build();
+  }
+
+
+  public class MockGaaSObservabilityProducer extends GaaSObservabilityEventProducer {

Review Comment:
   Ah good catch I'll make it to its own file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1080567642


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -387,62 +460,95 @@ public void testProcessProgressingMessageWhenNoPreviousStatus() throws IOExcepti
     Assert.assertNull(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
   }
 
-  @Test (dependsOnMethods = "testProcessingRetriedForApparentlyTransientErrors")
-  public void testProcessMessageForCancelledAndKilledEvent() throws IOException, ReflectiveOperationException {
-    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic4");
+  @Test (dependsOnMethods = "testProcessProgressingMessageWhenNoPreviousStatus")
+  public void testJobMonitorCreatesGaaSObservabilityEvent() throws IOException, ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic6");
 
     //Submit GobblinTrackingEvents to Kafka
     ImmutableList.of(
         createFlowCompiledEvent(),
-        createJobOrchestratedEvent(1, 4),
-        createJobSLAKilledEvent(),
-        createJobOrchestratedEvent(2, 4),
-        createJobStartSLAKilledEvent(),
-        // Verify that kill event will not retry
-        createJobOrchestratedEvent(3, 4),
-        createJobCancelledEvent()
+        createJobSucceededEvent()
     ).forEach(event -> {
       context.submitEvent(event);
       kafkaReporter.report();
     });
-
     try {
       Thread.sleep(1000);
-    } catch(InterruptedException ex) {
+    } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
     }
-
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty());
+    MultiContextIssueRepository issueRepository = new InMemoryMultiContextIssueRepository();
+    MockGaaSObservabilityEventProducer mockEventProducer = new MockGaaSObservabilityEventProducer(
+        ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty(),
+        mockEventProducer);
     jobStatusMonitor.buildMetricsContextAndMetrics();
     Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
-      this.kafkaTestHelper.getIteratorForTopic(TOPIC),
-      this::convertMessageAndMetadataToDecodableKafkaRecord);
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
 
     State state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", "NA");
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
 
-    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.PENDING_RETRY.name());
-    Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD), Boolean.toString(true));
+    // Only the COMPLETE event should create a GaaSObservabilityEvent
+    List<GaaSObservabilityEventExperimental> emittedEvents = mockEventProducer.getTestEmittedEvents();
+    Iterator<GaaSObservabilityEventExperimental> iterator = emittedEvents.iterator();
+    GaaSObservabilityEventExperimental event1 = iterator.next();
+    Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
+    Assert.assertEquals(event1.getFlowName(), this.flowName);
+    Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
 
-    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
-    //Job orchestrated for retrying
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
+    jobStatusMonitor.shutDown();
+  }
 
-    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.PENDING_RETRY.name());
+  @Test (dependsOnMethods = "testJobMonitorCreatesGaaSObservabilityEvent")
+  public void testObservabilityEventSingleEmission() throws IOException, ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic7");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobCancelledEvent(),
+        createJobSucceededEvent() // This event should be ignored
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+    MultiContextIssueRepository issueRepository = new InMemoryMultiContextIssueRepository();
+    MockGaaSObservabilityEventProducer mockEventProducer = new MockGaaSObservabilityEventProducer(
+        ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty(),
+        mockEventProducer);
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);

Review Comment:
   perhaps not now... but definitely if you need to add further tests (as you fill in details of the observability event) - let's explore whether this repetitive boilerplate could live behind a reusable abstraction.



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -358,6 +370,66 @@ public void testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = "testProcessingRetriedForApparentlyTransientErrors")
+  public void testProcessMessageForCancelledAndKilledEvent() throws IOException, ReflectiveOperationException {

Review Comment:
   sorry, can't recall: was this in the prior revision or just introduced?
   
   I'm not clear whether it belongs in this PR or is unrelated...



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = NoopGaaSObservabilityEventProducer.class.getName();
+  public static final String ISSUES_READ_FAILED_METRIC_NAME =  GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
+
+  protected MetricContext metricContext;
+  protected State state;
+  protected MultiContextIssueRepository issueRepository;
+  boolean instrumentationEnabled;
+  ContextAwareMeter getIssuesFailedMeter;
+
+  public GaaSObservabilityEventProducer(State state, MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+    this.state = state;
+    this.issueRepository = issueRepository;
+    this.instrumentationEnabled = instrumentationEnabled;
+    if (this.instrumentationEnabled) {
+      this.metricContext = Instrumented.getMetricContext(state, getClass());
+      this.getIssuesFailedMeter = this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+          ISSUES_READ_FAILED_METRIC_NAME));
+    }
+  }
+
+  public void emitObservabilityEvent(State jobState) {

Review Comment:
   an argument cold be made for this to be `final`
   
   update: below, I found out why you don't... perhaps document for maintainers?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+
+
+public class NoopGaaSObservabilityEventProducer extends GaaSObservabilityEventProducer {

Review Comment:
   good use of the Null Pattern!
   
   any javadoc?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.Test;
+import org.testng.Assert;
+
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+public class GaaSObservabilityProducerTest {
+
+  private MultiContextIssueRepository issueRepository = new InMemoryMultiContextIssueRepository();
+
+  @Test
+  public void testCreateGaaSObservabilityEvent() throws Exception {
+    String flowGroup = "testFlowGroup1";
+    String flowName = "testFlowName1";
+    String jobName = String.format("%s_%s_%s", flowGroup, flowName, "testJobName1");
+    String flowExecutionId = "1";
+    this.issueRepository.put(
+        TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, flowExecutionId, jobName),
+        createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+    );
+    MockGaaSObservabilityProducer producer = new MockGaaSObservabilityProducer(new State(), this.issueRepository);
+    Map<String, String> gteEventMetadata = Maps.newHashMap();
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, "1");
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, flowName);
+    gteEventMetadata.put(TimingEvent.METADATA_MESSAGE, "hostName");
+    gteEventMetadata.put(TimingEvent.METADATA_START_TIME, "1");
+    gteEventMetadata.put(TimingEvent.METADATA_END_TIME, "100");
+    gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPLETE.name());
+
+    Properties jobStatusProps = new Properties();
+    jobStatusProps.putAll(gteEventMetadata);
+    producer.emitObservabilityEvent(new State(jobStatusProps));
+
+    List<GaaSObservabilityEventExperimental> emittedEvents = producer.getTestEmittedEvents();
+
+    Assert.assertEquals(emittedEvents.size(), 1);
+    Iterator<GaaSObservabilityEventExperimental> iterator = emittedEvents.iterator();
+    GaaSObservabilityEventExperimental event = iterator.next();
+    Assert.assertEquals(event.getFlowGroup(), flowGroup);
+    Assert.assertEquals(event.getFlowName(), flowName);
+    Assert.assertEquals(event.getJobName(), jobName);
+    Assert.assertEquals(event.getFlowExecutionId(), Long.valueOf(flowExecutionId));
+    Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
+    Assert.assertEquals(event.getExecutorUrl(), "hostName");
+    Assert.assertEquals(event.getIssues().size(), 1);
+  }
+
+  private Issue createTestIssue(String summary, String code, IssueSeverity severity) {
+    return Issue.builder().summary(summary).code(code).time(ZonedDateTime.now()).severity(severity).build();
+  }
+
+
+  public class MockGaaSObservabilityProducer extends GaaSObservabilityEventProducer {

Review Comment:
   didn't I see this impl above?  couldn't it be shared there (e.g. if you were to make this `public static`)?
   
   (or even put it into its own file)



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -527,13 +637,14 @@ private GobblinTrackingEvent createGTE(String eventName, Map<String, String> cus
     return new GobblinTrackingEvent(timestamp, namespace, eventName, metadata);
   }
 
-  MockKafkaAvroJobStatusMonitor createMockKafkaAvroJobStatusMonitor(AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig) throws IOException, ReflectiveOperationException {
+  MockKafkaAvroJobStatusMonitor createMockKafkaAvroJobStatusMonitor(AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig,
+      GaaSObservabilityEventProducer eventProducer) throws IOException, ReflectiveOperationException {

Review Comment:
   your choice, but rather than updating so many calls to now take a no-op version, I might overload this method with a two-param form, that merely forwards to the three-param one here, after constructing the no-op instance.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -288,6 +303,15 @@ private static void modifyStateIfRetryRequired(org.apache.gobblin.configuration.
     state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
   }
 
+  private static boolean isStateTransitionToFinal(org.apache.gobblin.configuration.State currentState, List<org.apache.gobblin.configuration.State> prevStates) {
+    Set<String> finalStates = ImmutableSet.of(ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name(), ExecutionStatus.FAILED.name());
+    if (prevStates.size() == 0) {
+      return finalStates.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
+    }
+    return currentState.contains(JobStatusRetriever.EVENT_NAME_FIELD) && finalStates.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD))
+        && !finalStates.contains(prevStates.get(prevStates.size()-1).getProp(JobStatusRetriever.EVENT_NAME_FIELD));

Review Comment:
   agreed... that's what I meant by perhaps not possible.  so the difference would be purely stylistic... either form looks fine



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -288,6 +303,15 @@ private static void modifyStateIfRetryRequired(org.apache.gobblin.configuration.
     state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
   }
 
+  private static boolean isStateTransitionToFinal(org.apache.gobblin.configuration.State currentState, List<org.apache.gobblin.configuration.State> prevStates) {
+    Set<String> finalStates = ImmutableSet.of(ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name(), ExecutionStatus.FAILED.name());

Review Comment:
   still thinking the same... why initialize on every invocation, rather than once when the class is loaded?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1072965138


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java:
##########
@@ -50,6 +51,7 @@ public class JobIssueEventHandler {
   private static final Logger issueLogger =
       LoggerFactory.getLogger("org.apache.gobblin.runtime.troubleshooter.JobIssueLogger");
 
+  @Getter

Review Comment:
   With your suggestion to move the class as parameter we no longer need this :) 



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java:
##########
@@ -50,6 +51,7 @@ public class JobIssueEventHandler {
   private static final Logger issueLogger =
       LoggerFactory.getLogger("org.apache.gobblin.runtime.troubleshooter.JobIssueLogger");
 
+  @Getter

Review Comment:
   With your suggestion to move the Producer as parameter we no longer need this :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
ZihanLi58 commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1080626419


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -288,6 +303,15 @@ private static void modifyStateIfRetryRequired(org.apache.gobblin.configuration.
     state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
   }
 
+  private static boolean isStateTransitionToFinal(org.apache.gobblin.configuration.State currentState, List<org.apache.gobblin.configuration.State> prevStates) {
+    Set<String> finalStates = ImmutableSet.of(ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name(), ExecutionStatus.FAILED.name());

Review Comment:
   I think we have FlowStatusGenerator.FINISHED_STATUSES which is essentially the same list, maybe reuse that one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1073002292


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_ENABLED = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "enabled";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String ISSUE_READ_ERROR_COUNT =  "GaaSObservability.producer.getIssuesFailedCount";
+
+  protected MetricContext metricContext;
+  protected State state;
+  protected MultiContextIssueRepository issueRepository;
+  ContextAwareMeter getIssuesFailedMeter;
+
+  public GaaSObservabilityEventProducer(State state, MultiContextIssueRepository issueRepository) {
+    this.metricContext = Instrumented.getMetricContext(state, getClass());
+    getIssuesFailedMeter = this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+        ISSUE_READ_ERROR_COUNT));
+    this.state = state;
+    this.issueRepository = issueRepository;
+  }
+
+  public void emitObservabilityEvent(State jobState) {
+    GaaSObservabilityEventExperimental event = createGaaSObservabilityEvent(jobState);
+    sendUnderlyingEvent(event);
+  }
+
+  /**
+   * Emits the GaaSObservabilityEvent with the mechanism that the child class is built upon e.g. Kafka
+   * @param event
+   */
+  abstract protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event);
+
+  /**
+   * Creates a GaaSObservabilityEvent which is derived from a final GaaS job pipeline state, which is combination of GTE job states in an ordered fashion
+   * @param jobState
+   * @return GaaSObservabilityEvent
+   */
+  private GaaSObservabilityEventExperimental createGaaSObservabilityEvent(State jobState) {
+    Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
+    Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
+    GaaSObservabilityEventExperimental.Builder builder = GaaSObservabilityEventExperimental.newBuilder();
+    List<Issue> issueList = null;
+    try {
+      issueList = issueRepository.getAll(TroubleshooterUtils.getContextIdForJob(jobState.getProperties())).stream().map(
+              issue -> new org.apache.gobblin.metrics.Issue(issue.getTime().toEpochSecond(),
+                  IssueSeverity.valueOf(issue.getSeverity().toString()), issue.getCode(), issue.getSummary(), issue.getDetails(), issue.getProperties())).collect(Collectors.toList());
+    } catch (Exception e) {
+      // If issues cannot be fetched, increment metric but continue to try to emit the event
+      log.error("Could not fetch issues while creating GaaSObservabilityEvent due to ", e);
+      getIssuesFailedMeter.mark();
+    }
+    JobStatus status = convertExecutionStatusTojobState(jobState, ExecutionStatus.valueOf(jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD)));
+    builder.setTimestamp(System.currentTimeMillis())
+        .setFlowName(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD))
+        .setFlowGroup(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD))
+        .setFlowExecutionId(jobState.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))
+        .setJobName(jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD))
+        .setExecutorUrl(jobState.getProp(TimingEvent.METADATA_MESSAGE))
+        .setJobStartTime(jobStartTime)
+        .setJobEndTime(jobEndTime)
+        .setIssues(issueList)
+        .setJobStatus(status)
+        // TODO: Populate the below fields in a separate PR
+        .setExecutionUserUrn(null)
+        .setExecutorId("")
+        .setLastFlowModificationTime(0)
+        .setFlowGraphEdgeId("")
+        .setJobOrchestratedTime(null); // TODO: Investigate why TimingEvent.JOB_ORCHESTRATED_TIME is never propagated to the JobStatus
+    return builder.build();
+  }
+
+  private static JobStatus convertExecutionStatusTojobState(State state, ExecutionStatus executionStatus) {
+    switch (executionStatus) {
+      case FAILED:
+        // TODO: Separate failure cases to SUBMISSION FAILURE and COMPILATION FAILURE, investigate events to populate these fields
+        if (state.contains(TimingEvent.JOB_END_TIME)) {
+          return JobStatus.EXECUTION_FAILURE;
+        }

Review Comment:
   But to prevent a null I will mark as compilation failure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1081029179


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = NoopGaaSObservabilityEventProducer.class.getName();
+  public static final String ISSUES_READ_FAILED_METRIC_NAME =  GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
+
+  protected MetricContext metricContext;
+  protected State state;
+  protected MultiContextIssueRepository issueRepository;
+  boolean instrumentationEnabled;
+  ContextAwareMeter getIssuesFailedMeter;
+
+  public GaaSObservabilityEventProducer(State state, MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+    this.state = state;
+    this.issueRepository = issueRepository;
+    this.instrumentationEnabled = instrumentationEnabled;
+    if (this.instrumentationEnabled) {
+      this.metricContext = Instrumented.getMetricContext(state, getClass());
+      this.getIssuesFailedMeter = this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+          ISSUES_READ_FAILED_METRIC_NAME));
+    }
+  }
+
+  public void emitObservabilityEvent(State jobState) {

Review Comment:
   rather I saw non-final come in with having the no-op version skip even creating the event by overriding this to an empty impl, in addition to doing so with `submitUnderlyingEvent`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1070071056


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -89,6 +97,7 @@ public class KafkaAvroJobStatusMonitorTest {
   private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
   private MetricContext context;
   private KafkaAvroEventKeyValueReporter.Builder<?> builder;
+  private static Queue<GaaSObservabilityEventExperimental> queue = new LinkedList<>();

Review Comment:
   took me a minute to see how this gets wired in so that the events arrive on it... now I found the static inner class.  this strikes me as brittle, esp. with `queue` being `static` and shared between all instances.  still I understand the difficulty you face when the system under test internally instantiates its own `MockGaaSObservabilityEventProducer` from a class name.  hence, I'd accept those downsides, and just concentrate on readability.
   
   either add a clearer comment up here saying it will be used in this unconventional way, for the benefit of testing.  or my own pref might be to create a small singleton, known to both this test class and the mock, which documents the arrangement, while encapsulating the queue. 
   e.g.:
   ```
   /** why this odd arrangement ... */
   static class QueueSharingSingleton {
     private Queue<GaaSObservabilityEventExperimental> queue = new LinkedList<>();
     private static INSTANCE = null;
   
     public static QSS getInstance() { ... }
   
     private QueueSharingSingleton {}
   
     public void putMessage(GaaSObservabilityEventExperimental msg) { queue.add(msg); }
     public List<GaaSObservabilityEventExperimental> peekAllMessages() { ... }
   }
   ```
   then initialize the above as:
   ```
   private QueueSharingSingleton queueSharing = QueueSharingSingleton.getInstance();
   ```
   (and do the same in the mock)
   
   this could even be used by other test classes that should produce observability events.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1072966969


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -219,7 +229,8 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
    * @throws IOException
    */
   @VisibleForTesting
-  static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobStatus, StateStore stateStore)
+  static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobStatus, StateStore stateStore,
+      Optional<GaaSObservabilityEventProducer> eventProducer)

Review Comment:
   I think I'm going to lean towards going neither down the route of nulls or Optionals, and make a NoopEventSubmitter which will act as the default class from the factory, and its behavior is to do no work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1072534871


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -288,6 +303,15 @@ private static void modifyStateIfRetryRequired(org.apache.gobblin.configuration.
     state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
   }
 
+  private static boolean isStateTransitionToFinal(org.apache.gobblin.configuration.State currentState, List<org.apache.gobblin.configuration.State> prevStates) {
+    Set<String> finalStates = ImmutableSet.of(ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name(), ExecutionStatus.FAILED.name());
+    if (prevStates.size() == 0) {
+      return finalStates.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
+    }
+    return currentState.contains(JobStatusRetriever.EVENT_NAME_FIELD) && finalStates.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD))
+        && !finalStates.contains(prevStates.get(prevStates.size()-1).getProp(JobStatusRetriever.EVENT_NAME_FIELD));

Review Comment:
   I think it's impossible for a state to flip flop because of the way how we perform `mergeState`, which follows an ordered execution status of 
   ```
     private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES = ImmutableList
         .of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.PENDING_RESUME, ExecutionStatus.PENDING_RETRY,
             ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.COMPLETE,
             ExecutionStatus.FAILED, ExecutionStatus.CANCELLED);
   ```
   I think there is a risk if there is a COMPLETE event followed by a FAILED or CANCELLED event but I believe the first "final" event state we run into is the one we want users to accept.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1080707760


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -527,13 +637,14 @@ private GobblinTrackingEvent createGTE(String eventName, Map<String, String> cus
     return new GobblinTrackingEvent(timestamp, namespace, eventName, metadata);
   }
 
-  MockKafkaAvroJobStatusMonitor createMockKafkaAvroJobStatusMonitor(AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig) throws IOException, ReflectiveOperationException {
+  MockKafkaAvroJobStatusMonitor createMockKafkaAvroJobStatusMonitor(AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig,
+      GaaSObservabilityEventProducer eventProducer) throws IOException, ReflectiveOperationException {

Review Comment:
   Yeah I realized that after, since I already made the changes I think I'll leave this for now but I'll do that if there's any additional configuration needed in future tests for this file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1072965455


##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.time.ZonedDateTime;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+
+import org.junit.Test;
+import org.testng.Assert;
+
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+public class GaaSObservabilityProducerTest {
+
+  private MultiContextIssueRepository issueRepository = new InMemoryMultiContextIssueRepository();
+  Queue<GaaSObservabilityEventExperimental> emittedEvents = new LinkedList<>();
+
+  @Test
+  public void testCreateGaaSObservabilityEvent() throws Exception {
+    String flowGroup = "testFlowGroup1";
+    String flowName = "testFlowName1";
+    String jobName = String.format("%s_%s_%s", flowGroup, flowName, "testJobName1");
+    String flowExecutionId = "1";
+    this.issueRepository.put(
+        TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, flowExecutionId, jobName),
+        createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+    );
+    GaaSObservabilityEventProducer producer = new MockGaaSObservabilityProducer(new State(), this.issueRepository);
+    Map<String, String> gteEventMetadata = Maps.newHashMap();
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, "1");
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, flowName);
+    gteEventMetadata.put(TimingEvent.METADATA_MESSAGE, "hostName");
+    gteEventMetadata.put(TimingEvent.METADATA_START_TIME, "1");
+    gteEventMetadata.put(TimingEvent.METADATA_END_TIME, "100");
+    gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPLETE.name());
+
+    Properties jobStatusProps = new Properties();
+    jobStatusProps.putAll(gteEventMetadata);
+    producer.emitObservabilityEvent(new State(jobStatusProps));
+
+    Assert.assertEquals(emittedEvents.size(), 1);
+    GaaSObservabilityEventExperimental event = emittedEvents.poll();
+    Assert.assertEquals(event.getFlowGroup(), flowGroup);
+    Assert.assertEquals(event.getFlowName(), flowName);
+    Assert.assertEquals(event.getJobName(), jobName);
+    Assert.assertEquals(event.getFlowExecutionId(), Long.valueOf(flowExecutionId));
+    Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
+    Assert.assertEquals(event.getExecutorUrl(), "hostName");
+    Assert.assertEquals(event.getIssues().size(), 1);
+  }
+
+  private Issue createTestIssue(String summary, String code, IssueSeverity severity) {
+    return Issue.builder().summary(summary).code(code).time(ZonedDateTime.now()).severity(severity).build();
+  }
+
+
+  public class MockGaaSObservabilityProducer extends GaaSObservabilityEventProducer {
+    public MockGaaSObservabilityProducer(State state, MultiContextIssueRepository issueRepository) {
+      super(state, issueRepository);
+    }
+    // Send the events to the class test queue, so tests should not run concurrently
+    @Override
+    protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event) {
+      emittedEvents.add(event);

Review Comment:
   No longer needed, now tests should support concurrent test execs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1072966717


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -136,6 +140,12 @@ public <V> void onRetry(Attempt<V> attempt) {
             }
           }
         }));
+
+    if (instrumentationEnabled && ConfigUtils.getBoolean(config, GaaSObservabilityEventProducer.GAAS_OBSERVABILITY_EVENT_ENABLED, false)) {

Review Comment:
   Yes I think I'm going to do a better method of the determining whether to emit the event or not through a `NoopGaaSObservabilityEventSubmitter`, which is the default class and will do no work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
homatthew commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1070031403


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -219,7 +229,8 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
    * @throws IOException
    */
   @VisibleForTesting
-  static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobStatus, StateStore stateStore)
+  static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobStatus, StateStore stateStore,
+      Optional<GaaSObservabilityEventProducer> eventProducer)

Review Comment:
   Curious on our philosophy  for using optional as a parameter. Why not use nulls + overloading. Java optionals are best used as a return value but as an input, I think it makes the developer experience less readable (There are plenty of debates on it on the internet). 
   
   Although, I have seen the pattern used in some other parts of Gobblin, so clearly past developers felt it was okay. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1070057390


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -219,7 +229,8 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
    * @throws IOException
    */
   @VisibleForTesting
-  static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobStatus, StateStore stateStore)
+  static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobStatus, StateStore stateStore,
+      Optional<GaaSObservabilityEventProducer> eventProducer)

Review Comment:
   I've not heard that debate....
   
   overall, the idea w/ `Optional` is to encode possible absence in the type system where it can be checked statically and automatically.  `null` is notoriously furtive... take for instance this very impl. (here), which does not check `stateStore != null`--but should it?  the practice I follow is that when that ought to be checked, the expectation should be stated, not in javadoc or other code comments, but via `Optional`.
   
   again, I've not heard specific concern w/ use as a parameter type.  quite the opposite, in fact: the use is noted to bring clear benefit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1072967326


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_ENABLED = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "enabled";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String ISSUE_READ_ERROR_COUNT =  "GaaSObservability.producer.getIssuesFailedCount";
+
+  protected MetricContext metricContext;
+  protected State state;
+  protected MultiContextIssueRepository issueRepository;
+  ContextAwareMeter getIssuesFailedMeter;
+
+  public GaaSObservabilityEventProducer(State state, MultiContextIssueRepository issueRepository) {
+    this.metricContext = Instrumented.getMetricContext(state, getClass());
+    getIssuesFailedMeter = this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+        ISSUE_READ_ERROR_COUNT));
+    this.state = state;
+    this.issueRepository = issueRepository;
+  }
+
+  public void emitObservabilityEvent(State jobState) {
+    GaaSObservabilityEventExperimental event = createGaaSObservabilityEvent(jobState);
+    sendUnderlyingEvent(event);
+  }
+
+  /**
+   * Emits the GaaSObservabilityEvent with the mechanism that the child class is built upon e.g. Kafka
+   * @param event
+   */
+  abstract protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event);
+
+  /**
+   * Creates a GaaSObservabilityEvent which is derived from a final GaaS job pipeline state, which is combination of GTE job states in an ordered fashion
+   * @param jobState
+   * @return GaaSObservabilityEvent
+   */
+  private GaaSObservabilityEventExperimental createGaaSObservabilityEvent(State jobState) {
+    Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
+    Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
+    GaaSObservabilityEventExperimental.Builder builder = GaaSObservabilityEventExperimental.newBuilder();
+    List<Issue> issueList = null;
+    try {
+      issueList = issueRepository.getAll(TroubleshooterUtils.getContextIdForJob(jobState.getProperties())).stream().map(
+              issue -> new org.apache.gobblin.metrics.Issue(issue.getTime().toEpochSecond(),
+                  IssueSeverity.valueOf(issue.getSeverity().toString()), issue.getCode(), issue.getSummary(), issue.getDetails(), issue.getProperties())).collect(Collectors.toList());
+    } catch (Exception e) {
+      // If issues cannot be fetched, increment metric but continue to try to emit the event
+      log.error("Could not fetch issues while creating GaaSObservabilityEvent due to ", e);
+      getIssuesFailedMeter.mark();
+    }
+    JobStatus status = convertExecutionStatusTojobState(jobState, ExecutionStatus.valueOf(jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD)));
+    builder.setTimestamp(System.currentTimeMillis())
+        .setFlowName(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD))
+        .setFlowGroup(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD))
+        .setFlowExecutionId(jobState.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))
+        .setJobName(jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD))
+        .setExecutorUrl(jobState.getProp(TimingEvent.METADATA_MESSAGE))
+        .setJobStartTime(jobStartTime)
+        .setJobEndTime(jobEndTime)
+        .setIssues(issueList)
+        .setJobStatus(status)
+        // TODO: Populate the below fields in a separate PR
+        .setExecutionUserUrn(null)
+        .setExecutorId("")
+        .setLastFlowModificationTime(0)
+        .setFlowGraphEdgeId("")
+        .setJobOrchestratedTime(null); // TODO: Investigate why TimingEvent.JOB_ORCHESTRATED_TIME is never propagated to the JobStatus
+    return builder.build();
+  }
+
+  private static JobStatus convertExecutionStatusTojobState(State state, ExecutionStatus executionStatus) {
+    switch (executionStatus) {
+      case FAILED:
+        // TODO: Separate failure cases to SUBMISSION FAILURE and COMPILATION FAILURE, investigate events to populate these fields
+        if (state.contains(TimingEvent.JOB_END_TIME)) {
+          return JobStatus.EXECUTION_FAILURE;
+        }

Review Comment:
   It could be a submission failure currently, so it's hard to differentiate. I'm going to put that logic in a different PR I think to break this up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1073005648


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java:
##########
@@ -65,9 +65,9 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
   private Meter messageParseFailures;
 
   public KafkaAvroJobStatusMonitor(String topic, Config config, int numThreads,
-      JobIssueEventHandler jobIssueEventHandler)
+      JobIssueEventHandler jobIssueEventHandler, boolean instrumentationEnabled)

Review Comment:
   I'm going to leave instrumentationEnabled only for incrementing the metrics of the meters/gauges, and the whether or not the class is configured to emit events will be handled separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1080655252


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -387,62 +460,95 @@ public void testProcessProgressingMessageWhenNoPreviousStatus() throws IOExcepti
     Assert.assertNull(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
   }
 
-  @Test (dependsOnMethods = "testProcessingRetriedForApparentlyTransientErrors")
-  public void testProcessMessageForCancelledAndKilledEvent() throws IOException, ReflectiveOperationException {
-    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic4");
+  @Test (dependsOnMethods = "testProcessProgressingMessageWhenNoPreviousStatus")
+  public void testJobMonitorCreatesGaaSObservabilityEvent() throws IOException, ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic6");
 
     //Submit GobblinTrackingEvents to Kafka
     ImmutableList.of(
         createFlowCompiledEvent(),
-        createJobOrchestratedEvent(1, 4),
-        createJobSLAKilledEvent(),
-        createJobOrchestratedEvent(2, 4),
-        createJobStartSLAKilledEvent(),
-        // Verify that kill event will not retry
-        createJobOrchestratedEvent(3, 4),
-        createJobCancelledEvent()
+        createJobSucceededEvent()
     ).forEach(event -> {
       context.submitEvent(event);
       kafkaReporter.report();
     });
-
     try {
       Thread.sleep(1000);
-    } catch(InterruptedException ex) {
+    } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
     }
-
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty());
+    MultiContextIssueRepository issueRepository = new InMemoryMultiContextIssueRepository();
+    MockGaaSObservabilityEventProducer mockEventProducer = new MockGaaSObservabilityEventProducer(
+        ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty(),
+        mockEventProducer);
     jobStatusMonitor.buildMetricsContextAndMetrics();
     Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
-      this.kafkaTestHelper.getIteratorForTopic(TOPIC),
-      this::convertMessageAndMetadataToDecodableKafkaRecord);
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
 
     State state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", "NA");
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
 
-    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.PENDING_RETRY.name());
-    Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD), Boolean.toString(true));
+    // Only the COMPLETE event should create a GaaSObservabilityEvent
+    List<GaaSObservabilityEventExperimental> emittedEvents = mockEventProducer.getTestEmittedEvents();
+    Iterator<GaaSObservabilityEventExperimental> iterator = emittedEvents.iterator();
+    GaaSObservabilityEventExperimental event1 = iterator.next();
+    Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
+    Assert.assertEquals(event1.getFlowName(), this.flowName);
+    Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
 
-    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
-    //Job orchestrated for retrying
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
+    jobStatusMonitor.shutDown();
+  }
 
-    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.PENDING_RETRY.name());
+  @Test (dependsOnMethods = "testJobMonitorCreatesGaaSObservabilityEvent")
+  public void testObservabilityEventSingleEmission() throws IOException, ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic7");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobCancelledEvent(),
+        createJobSucceededEvent() // This event should be ignored
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+    MultiContextIssueRepository issueRepository = new InMemoryMultiContextIssueRepository();
+    MockGaaSObservabilityEventProducer mockEventProducer = new MockGaaSObservabilityEventProducer(
+        ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty(),
+        mockEventProducer);
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);

Review Comment:
   Yeah I think I will definitely need to cut down on the boilerplate for future tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1070071056


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -89,6 +97,7 @@ public class KafkaAvroJobStatusMonitorTest {
   private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
   private MetricContext context;
   private KafkaAvroEventKeyValueReporter.Builder<?> builder;
+  private static Queue<GaaSObservabilityEventExperimental> queue = new LinkedList<>();

Review Comment:
   took me a minute to see how this gets wired in so that the events arrive on it... now I found the static inner class.  this strikes me as brittle, esp. with `queue` being `static` and shared between all instances.  still I understand the difficulty you face when the system under test internally instantiates its own `MockGaaSObservabilityEventProducer` from a class name.  hence, I'd accept those downsides, and just concentrate on readability.
   
   either add a clearer comment up here saying it will be used in this unconventional way, for the benefit of testing.  or my own pref might be to create a small singleton, known to both this test class and the mock, which documents the arrangement, while encapsulating the queue.  this could even be used by other test classes that should produce observability events.
   
   e.g.:
   ```
   /** why this odd arrangement ... */
   static class QueueSharingSingleton {
     private Queue<GaaSObservabilityEventExperimental> queue = new LinkedList<>();
     private static INSTANCE = null;
   
     public static QSS getInstance() { ... }
   
     private QueueSharingSingleton {}
   
     public void putMessage(GaaSObservabilityEventExperimental msg) { queue.add(msg); }
     public List<GaaSObservabilityEventExperimental> peekAllMessages() { ... }
   }
   ```
   then initialize the above as:
   ```
   private QueueSharingSingleton queueSharing = QueueSharingSingleton.getInstance();
   ```
   (and do the same in the mock)



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc:
##########
@@ -70,14 +70,20 @@
     },
     {
       "name": "jobStartTime",
-      "type": "long",
-      "doc": "Start time of the job in millis since Epoch",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Start time of the job in millis since Epoch, null if the job was never ran",

Review Comment:
   minor, grammar: "was run", not "was ran"
   
   (a second time below too)



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -618,8 +670,20 @@ public org.apache.gobblin.configuration.State parseJobStatus(GobblinTrackingEven
         int n = ++numFakeExceptionsFromParseJobStatus;
         throw new RuntimeException(String.format("BOOM! Failure [%d] w/ event at %d", n, event.getTimestamp()));
       } else {
+
         return super.parseJobStatus(event);
       }
     }
   }
+
+  public static class MockGaaSObservabilityEventProducer extends GaaSObservabilityEventProducer {

Review Comment:
   javadoc to clarify for future maintainers



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -447,6 +456,45 @@ public void testProcessMessageForCancelledAndKilledEvent() throws IOException, R
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = "testProcessMessageForCancelledAndKilledEvent")
+  public void testJobMonitorCreatesGaaSObservabilityEvent() throws IOException, ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic4");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createFlowSucceededEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+    Config config = ConfigFactory.empty()
+        .withValue(GaaSObservabilityEventProducer.GAAS_OBSERVABILITY_EVENT_ENABLED, ConfigValueFactory.fromAnyRef(true))
+        .withValue(GaaSObservabilityEventProducer.GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS, ConfigValueFactory.fromAnyRef(MockGaaSObservabilityEventProducer.class.getName()));
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), config);
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    State state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", "NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
+    // Only the COMPLETE event should create a GaaSObservabilityEvent
+    Assert.assertEquals(queue.size(), 1);
+    GaaSObservabilityEventExperimental event = queue.poll();
+    Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);

Review Comment:
   would be better to verify more of the event--probably in a helper method--it's flow group+name, job name, etc.
   



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java:
##########
@@ -65,9 +65,9 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
   private Meter messageParseFailures;
 
   public KafkaAvroJobStatusMonitor(String topic, Config config, int numThreads,
-      JobIssueEventHandler jobIssueEventHandler)
+      JobIssueEventHandler jobIssueEventHandler, boolean instrumentationEnabled)

Review Comment:
   instrumentation is vague, but even there suggests something different, such as us measuring how fast the class operates as it handles messages.  why not call it what it is--`shouldEmitObervabilityEvents`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -288,6 +303,15 @@ private static void modifyStateIfRetryRequired(org.apache.gobblin.configuration.
     state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
   }
 
+  private static boolean isStateTransitionToFinal(org.apache.gobblin.configuration.State currentState, List<org.apache.gobblin.configuration.State> prevStates) {
+    Set<String> finalStates = ImmutableSet.of(ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name(), ExecutionStatus.FAILED.name());

Review Comment:
   may be better as a class `static`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -136,6 +140,12 @@ public <V> void onRetry(Attempt<V> attempt) {
             }
           }
         }));
+
+    if (instrumentationEnabled && ConfigUtils.getBoolean(config, GaaSObservabilityEventProducer.GAAS_OBSERVABILITY_EVENT_ENABLED, false)) {
+      this.eventProducer = Optional.of(GaaSObservabilityEventProducer.getEventProducer(config,  jobIssueEventHandler.getIssueRepository()));

Review Comment:
   slightly unconventional to construct within from a class name, rather than to make the event producer a ctor param.  using reflection internally makes the tests more challenging to write, as it appears you've found.  of course, there's a place for reflection--just at a higher level than this class.
   
   I do see how you need the issue repo from the issue handler to create the observability event producer.  how about whereever that handler gets created and injected, that's where you'd inject an optional event producer?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_ENABLED = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "enabled";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String ISSUE_READ_ERROR_COUNT =  "GaaSObservability.producer.getIssuesFailedCount";
+
+  protected MetricContext metricContext;
+  protected State state;
+  protected MultiContextIssueRepository issueRepository;
+  ContextAwareMeter getIssuesFailedMeter;
+
+  public GaaSObservabilityEventProducer(State state, MultiContextIssueRepository issueRepository) {
+    this.metricContext = Instrumented.getMetricContext(state, getClass());
+    getIssuesFailedMeter = this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+        ISSUE_READ_ERROR_COUNT));
+    this.state = state;
+    this.issueRepository = issueRepository;
+  }
+
+  public void emitObservabilityEvent(State jobState) {
+    GaaSObservabilityEventExperimental event = createGaaSObservabilityEvent(jobState);
+    sendUnderlyingEvent(event);
+  }
+
+  /**
+   * Emits the GaaSObservabilityEvent with the mechanism that the child class is built upon e.g. Kafka
+   * @param event
+   */
+  abstract protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event);
+
+  /**
+   * Creates a GaaSObservabilityEvent which is derived from a final GaaS job pipeline state, which is combination of GTE job states in an ordered fashion
+   * @param jobState
+   * @return GaaSObservabilityEvent
+   */
+  private GaaSObservabilityEventExperimental createGaaSObservabilityEvent(State jobState) {
+    Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
+    Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
+    GaaSObservabilityEventExperimental.Builder builder = GaaSObservabilityEventExperimental.newBuilder();
+    List<Issue> issueList = null;
+    try {
+      issueList = issueRepository.getAll(TroubleshooterUtils.getContextIdForJob(jobState.getProperties())).stream().map(
+              issue -> new org.apache.gobblin.metrics.Issue(issue.getTime().toEpochSecond(),
+                  IssueSeverity.valueOf(issue.getSeverity().toString()), issue.getCode(), issue.getSummary(), issue.getDetails(), issue.getProperties())).collect(Collectors.toList());

Review Comment:
   seems ripe for abstracting entirely within a conversion function



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_ENABLED = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "enabled";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";

Review Comment:
   nit: I'd suffix these with `_KEY`, since they're not say a class, but a key for specifying the class



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java:
##########
@@ -50,6 +51,7 @@ public class JobIssueEventHandler {
   private static final Logger issueLogger =
       LoggerFactory.getLogger("org.apache.gobblin.runtime.troubleshooter.JobIssueLogger");
 
+  @Getter

Review Comment:
   any expectations to document here, such as only read-only use... or doesn't matter?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_ENABLED = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "enabled";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String ISSUE_READ_ERROR_COUNT =  "GaaSObservability.producer.getIssuesFailedCount";

Review Comment:
   why not the same prefix... I see later it's not a config key, but even so, why deviate?
   
   as far as naming this var, shall we say something about `METRIC_NAME` or `METRIC_KEY`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_ENABLED = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "enabled";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String ISSUE_READ_ERROR_COUNT =  "GaaSObservability.producer.getIssuesFailedCount";
+
+  protected MetricContext metricContext;
+  protected State state;
+  protected MultiContextIssueRepository issueRepository;
+  ContextAwareMeter getIssuesFailedMeter;

Review Comment:
   if visible for testing, let's add an annotation



##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.time.ZonedDateTime;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+
+import org.junit.Test;
+import org.testng.Assert;
+
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+public class GaaSObservabilityProducerTest {
+
+  private MultiContextIssueRepository issueRepository = new InMemoryMultiContextIssueRepository();
+  Queue<GaaSObservabilityEventExperimental> emittedEvents = new LinkedList<>();
+
+  @Test
+  public void testCreateGaaSObservabilityEvent() throws Exception {
+    String flowGroup = "testFlowGroup1";
+    String flowName = "testFlowName1";
+    String jobName = String.format("%s_%s_%s", flowGroup, flowName, "testJobName1");
+    String flowExecutionId = "1";
+    this.issueRepository.put(
+        TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, flowExecutionId, jobName),
+        createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+    );
+    GaaSObservabilityEventProducer producer = new MockGaaSObservabilityProducer(new State(), this.issueRepository);
+    Map<String, String> gteEventMetadata = Maps.newHashMap();
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, "1");
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, flowName);
+    gteEventMetadata.put(TimingEvent.METADATA_MESSAGE, "hostName");
+    gteEventMetadata.put(TimingEvent.METADATA_START_TIME, "1");
+    gteEventMetadata.put(TimingEvent.METADATA_END_TIME, "100");
+    gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPLETE.name());
+
+    Properties jobStatusProps = new Properties();
+    jobStatusProps.putAll(gteEventMetadata);
+    producer.emitObservabilityEvent(new State(jobStatusProps));
+
+    Assert.assertEquals(emittedEvents.size(), 1);
+    GaaSObservabilityEventExperimental event = emittedEvents.poll();
+    Assert.assertEquals(event.getFlowGroup(), flowGroup);
+    Assert.assertEquals(event.getFlowName(), flowName);
+    Assert.assertEquals(event.getJobName(), jobName);
+    Assert.assertEquals(event.getFlowExecutionId(), Long.valueOf(flowExecutionId));
+    Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
+    Assert.assertEquals(event.getExecutorUrl(), "hostName");
+    Assert.assertEquals(event.getIssues().size(), 1);
+  }
+
+  private Issue createTestIssue(String summary, String code, IssueSeverity severity) {
+    return Issue.builder().summary(summary).code(code).time(ZonedDateTime.now()).severity(severity).build();
+  }
+
+
+  public class MockGaaSObservabilityProducer extends GaaSObservabilityEventProducer {
+    public MockGaaSObservabilityProducer(State state, MultiContextIssueRepository issueRepository) {
+      super(state, issueRepository);
+    }
+    // Send the events to the class test queue, so tests should not run concurrently
+    @Override
+    protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event) {
+      emittedEvents.add(event);

Review Comment:
   unlike the other test, the queue is non-`static`... so does it support concurrent test exec?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -288,6 +303,15 @@ private static void modifyStateIfRetryRequired(org.apache.gobblin.configuration.
     state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
   }
 
+  private static boolean isStateTransitionToFinal(org.apache.gobblin.configuration.State currentState, List<org.apache.gobblin.configuration.State> prevStates) {
+    Set<String> finalStates = ImmutableSet.of(ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name(), ExecutionStatus.FAILED.name());
+    if (prevStates.size() == 0) {
+      return finalStates.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
+    }
+    return currentState.contains(JobStatusRetriever.EVENT_NAME_FIELD) && finalStates.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD))
+        && !finalStates.contains(prevStates.get(prevStates.size()-1).getProp(JobStatusRetriever.EVENT_NAME_FIELD));

Review Comment:
   thought marginally more processing, it more clearly captures the intent and handles a corner case (if even possible) of the state flip flopping from final to non-final back to final:
   ```
   Stream<Boolean> statesFinality =
       Stream.concat(prevStates.stream(), Stream.of(currentState)).map(isFinalState);
   return statesFinality.filter(identity).count() == 1 && Iterables.getLast(statesFinality) == true;
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -288,6 +303,15 @@ private static void modifyStateIfRetryRequired(org.apache.gobblin.configuration.
     state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
   }
 
+  private static boolean isStateTransitionToFinal(org.apache.gobblin.configuration.State currentState, List<org.apache.gobblin.configuration.State> prevStates) {

Review Comment:
   nit: impl seems more like `isNewStateTransitionToFinal` or `isFirstStateTransitionToFinal`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1081035824


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -618,6 +728,7 @@ public org.apache.gobblin.configuration.State parseJobStatus(GobblinTrackingEven
         int n = ++numFakeExceptionsFromParseJobStatus;
         throw new RuntimeException(String.format("BOOM! Failure [%d] w/ event at %d", n, event.getTimestamp()));
       } else {
+

Review Comment:
   stray newline?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
homatthew commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1070151228


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -219,7 +229,8 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
    * @throws IOException
    */
   @VisibleForTesting
-  static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobStatus, StateStore stateStore)
+  static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobStatus, StateStore stateStore,
+      Optional<GaaSObservabilityEventProducer> eventProducer)

Review Comment:
   I don't have a strong preference for one or the other as long as the team agrees on a logically consistent pattern. This topic has come up in my past roles, so I figured I'd ask out of curiosity. Good points about the API being more self descriptive as opposed to an annotation like `@Nullable` which is more hidden
   
   The argument against `Optional` as as follows:
   - WIthout optional, you can have 2 possible states `null` or actual reference
   - With optional, you can have 3 possible states `null`, `Optional.empty()`, `Optional.present()`
   
   It's reasonable to try your best not to pass null as a parameter. (e.g. Optional.ofNullable(...)). But it does add visual clutter from a readability perspective that doesn't add much value when method overloading would be totally sufficient. 
   
   This is especially true when there is only one optional parameter. Here, we don't have a bunch of permutations of options that the method caller could use (Which is probably a code smell on its own). 
   
   Some debates in case you want to go down the rabbit hole / get nerd sniped:
   https://rules.sonarsource.com/java/RSPEC-3553
   https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments
   http://dolszewski.com/java/java-8-optional-use-cases/
   https://stackoverflow.com/questions/26327957/should-java-8-getters-return-optional-type/26328555#26328555
   
   https://docs.oracle.com/javase/10/docs/api/java/util/Optional.html
   ```
   Optional is primarily intended for use as a method return type where there is a clear need to represent "no result," and where using null is likely to cause errors. A variable whose type is Optional should never itself be null; it should always point to an Optional instance.
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
homatthew commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1070151228


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -219,7 +229,8 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
    * @throws IOException
    */
   @VisibleForTesting
-  static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobStatus, StateStore stateStore)
+  static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobStatus, StateStore stateStore,
+      Optional<GaaSObservabilityEventProducer> eventProducer)

Review Comment:
   I don't have a strong preference for one or the other as long as the team agrees on a logically consistent pattern. This topic has come up in my past roles, so I figured I'd ask out of curiosity. Good points about the API being more self descriptive as opposed to an annotation like `@Nullable` which is more hidden
   
   The argument against `Optional` as as follows:
   - WIthout optional, you can have 2 possible states `null` or actual reference
   - With optional, you can have 3 possible states `null`, `Optional.empty()`, `Optional.present()`
   
   It's reasonable to try your best not to pass null as a parameter. (e.g. Optional.ofNullable(...)). But it does add visual clutter from a readability perspective that doesn't add much value when method overloading would be totally sufficient. 
   
   This is especially true when there is only one optional parameter. Here, we don't have a bunch of permutations of options that the method caller could use (Which is probably a code smell on its own). 
   
   Some debates in case you want to go down the rabbit hole / get nerd sniped:
   https://rules.sonarsource.com/java/RSPEC-3553
   https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments
   http://dolszewski.com/java/java-8-optional-use-cases/
   https://stackoverflow.com/questions/26327957/should-java-8-getters-return-optional-type/26328555#26328555
   
   https://docs.oracle.com/javase/10/docs/api/java/util/Optional.html
   
   > Optional is primarily intended for use as a method return type where there is a clear need to represent "no result," and where using null is likely to cause errors. A variable whose type is Optional should never itself be null; it should always point to an Optional instance.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1073002292


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_ENABLED = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "enabled";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String ISSUE_READ_ERROR_COUNT =  "GaaSObservability.producer.getIssuesFailedCount";
+
+  protected MetricContext metricContext;
+  protected State state;
+  protected MultiContextIssueRepository issueRepository;
+  ContextAwareMeter getIssuesFailedMeter;
+
+  public GaaSObservabilityEventProducer(State state, MultiContextIssueRepository issueRepository) {
+    this.metricContext = Instrumented.getMetricContext(state, getClass());
+    getIssuesFailedMeter = this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+        ISSUE_READ_ERROR_COUNT));
+    this.state = state;
+    this.issueRepository = issueRepository;
+  }
+
+  public void emitObservabilityEvent(State jobState) {
+    GaaSObservabilityEventExperimental event = createGaaSObservabilityEvent(jobState);
+    sendUnderlyingEvent(event);
+  }
+
+  /**
+   * Emits the GaaSObservabilityEvent with the mechanism that the child class is built upon e.g. Kafka
+   * @param event
+   */
+  abstract protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event);
+
+  /**
+   * Creates a GaaSObservabilityEvent which is derived from a final GaaS job pipeline state, which is combination of GTE job states in an ordered fashion
+   * @param jobState
+   * @return GaaSObservabilityEvent
+   */
+  private GaaSObservabilityEventExperimental createGaaSObservabilityEvent(State jobState) {
+    Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
+    Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
+    GaaSObservabilityEventExperimental.Builder builder = GaaSObservabilityEventExperimental.newBuilder();
+    List<Issue> issueList = null;
+    try {
+      issueList = issueRepository.getAll(TroubleshooterUtils.getContextIdForJob(jobState.getProperties())).stream().map(
+              issue -> new org.apache.gobblin.metrics.Issue(issue.getTime().toEpochSecond(),
+                  IssueSeverity.valueOf(issue.getSeverity().toString()), issue.getCode(), issue.getSummary(), issue.getDetails(), issue.getProperties())).collect(Collectors.toList());
+    } catch (Exception e) {
+      // If issues cannot be fetched, increment metric but continue to try to emit the event
+      log.error("Could not fetch issues while creating GaaSObservabilityEvent due to ", e);
+      getIssuesFailedMeter.mark();
+    }
+    JobStatus status = convertExecutionStatusTojobState(jobState, ExecutionStatus.valueOf(jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD)));
+    builder.setTimestamp(System.currentTimeMillis())
+        .setFlowName(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD))
+        .setFlowGroup(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD))
+        .setFlowExecutionId(jobState.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))
+        .setJobName(jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD))
+        .setExecutorUrl(jobState.getProp(TimingEvent.METADATA_MESSAGE))
+        .setJobStartTime(jobStartTime)
+        .setJobEndTime(jobEndTime)
+        .setIssues(issueList)
+        .setJobStatus(status)
+        // TODO: Populate the below fields in a separate PR
+        .setExecutionUserUrn(null)
+        .setExecutorId("")
+        .setLastFlowModificationTime(0)
+        .setFlowGraphEdgeId("")
+        .setJobOrchestratedTime(null); // TODO: Investigate why TimingEvent.JOB_ORCHESTRATED_TIME is never propagated to the JobStatus
+    return builder.build();
+  }
+
+  private static JobStatus convertExecutionStatusTojobState(State state, ExecutionStatus executionStatus) {
+    switch (executionStatus) {
+      case FAILED:
+        // TODO: Separate failure cases to SUBMISSION FAILURE and COMPILATION FAILURE, investigate events to populate these fields
+        if (state.contains(TimingEvent.JOB_END_TIME)) {
+          return JobStatus.EXECUTION_FAILURE;
+        }

Review Comment:
   But to prevent a null I will mark as submission failure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1073001902


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_ENABLED = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "enabled";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String ISSUE_READ_ERROR_COUNT =  "GaaSObservability.producer.getIssuesFailedCount";
+
+  protected MetricContext metricContext;
+  protected State state;
+  protected MultiContextIssueRepository issueRepository;
+  ContextAwareMeter getIssuesFailedMeter;
+
+  public GaaSObservabilityEventProducer(State state, MultiContextIssueRepository issueRepository) {
+    this.metricContext = Instrumented.getMetricContext(state, getClass());
+    getIssuesFailedMeter = this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+        ISSUE_READ_ERROR_COUNT));
+    this.state = state;
+    this.issueRepository = issueRepository;
+  }
+
+  public void emitObservabilityEvent(State jobState) {
+    GaaSObservabilityEventExperimental event = createGaaSObservabilityEvent(jobState);
+    sendUnderlyingEvent(event);
+  }
+
+  /**
+   * Emits the GaaSObservabilityEvent with the mechanism that the child class is built upon e.g. Kafka
+   * @param event
+   */
+  abstract protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental event);
+
+  /**
+   * Creates a GaaSObservabilityEvent which is derived from a final GaaS job pipeline state, which is combination of GTE job states in an ordered fashion
+   * @param jobState
+   * @return GaaSObservabilityEvent
+   */
+  private GaaSObservabilityEventExperimental createGaaSObservabilityEvent(State jobState) {
+    Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
+    Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ? jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
+    GaaSObservabilityEventExperimental.Builder builder = GaaSObservabilityEventExperimental.newBuilder();
+    List<Issue> issueList = null;
+    try {
+      issueList = issueRepository.getAll(TroubleshooterUtils.getContextIdForJob(jobState.getProperties())).stream().map(
+              issue -> new org.apache.gobblin.metrics.Issue(issue.getTime().toEpochSecond(),

Review Comment:
   Ah I think I was doing a conversion and had both classes, no longer needed though good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1081029179


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = "GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = NoopGaaSObservabilityEventProducer.class.getName();
+  public static final String ISSUES_READ_FAILED_METRIC_NAME =  GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
+
+  protected MetricContext metricContext;
+  protected State state;
+  protected MultiContextIssueRepository issueRepository;
+  boolean instrumentationEnabled;
+  ContextAwareMeter getIssuesFailedMeter;
+
+  public GaaSObservabilityEventProducer(State state, MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+    this.state = state;
+    this.issueRepository = issueRepository;
+    this.instrumentationEnabled = instrumentationEnabled;
+    if (this.instrumentationEnabled) {
+      this.metricContext = Instrumented.getMetricContext(state, getClass());
+      this.getIssuesFailedMeter = this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+          ISSUES_READ_FAILED_METRIC_NAME));
+    }
+  }
+
+  public void emitObservabilityEvent(State jobState) {

Review Comment:
   I thought you wanted the no-op version to not even need to create the event, so you override this to an empty impl, in addition to publishEvent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1080592247


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -358,6 +370,66 @@ public void testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = "testProcessingRetriedForApparentlyTransientErrors")
+  public void testProcessMessageForCancelledAndKilledEvent() throws IOException, ReflectiveOperationException {

Review Comment:
   I realized that the tests were out of order in their dependsOn after I added more tests so I fixed that in the file, sorry for the confusion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1081033861


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -358,6 +370,66 @@ public void testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = "testProcessingRetriedForApparentlyTransientErrors")
+  public void testProcessMessageForCancelledAndKilledEvent() throws IOException, ReflectiveOperationException {

Review Comment:
   no worries, makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3623: [GOBBLIN-1764] Emit observability event

Posted by GitBox <gi...@apache.org>.
homatthew commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1070022305


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -266,6 +277,11 @@ static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobS
 
       modifyStateIfRetryRequired(jobStatus);
       stateStore.put(storeName, tableName, jobStatus);
+
+      if (isStateTransitionToFinal(jobStatus, states) && eventProducer.isPresent()) {
+        log.info("I am here");

Review Comment:
   Did you mean to leave this here πŸ˜† 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org