You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@nemo.apache.org by GitBox <gi...@apache.org> on 2021/08/12 08:17:14 UTC

[GitHub] [incubator-nemo] Lemarais opened a new pull request #317: Record Metrics associated with stream processing

Lemarais opened a new pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317


   JIRA: [NEMO-###: TITLE](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-###)
   
   **Major changes:**
   - 
   
   **Minor changes to note:**
   - record the number of processed tuples per task periodically.
   - record the number of serialized bytes per task periodically. 
   - send latencymark from source vertex and record latency at every tasks
   
   **Tests for the changes:**
   - 
   
   **Other comments:**
   - 
   
   Closes #GITHUB_PR_NUMBER
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r708014142



##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;
+  private String lastTaskId;
+  private final long timestamp;
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdtaskId = taskId;
+    this.timestamp = timestamp;
+    this.lastTaskId = "";
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * @return the task id where it is created
+   */
+  public String getCreatedtaskId() {

Review comment:
       storing json file is start from [here](https://github.com/apache/incubator-nemo/blob/65440c1a9acc5fdc4b7705de61610499ed2f6b84/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java#L217). But I don't know about the internal process




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-917796959


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-913202351


   @taegeonum I'm sorry for late, I've addressed all comments. Could you check it again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r705809356



##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.

Review comment:
       First of all, Pair of createdTaskId and timestamp indicates the starting point. For examples,
     &nbsp;    -----------B-----------
       /&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;\    &nbsp;&nbsp; &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;     A to E indicates tasks
    A ------------C-----------E    &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;    dash indicates edges
     \ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;/    &nbsp;  &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   &nbsp;&nbsp;  A is source, and E is sink
    &nbsp;&nbsp;     ------------D----------     
   Latencymark is created from task A with timestamp and createdTaskId. 
   And the lastTaskId needs to follow each path. 
   When Latencymark reached at task E, there are totally three path, A-B-E, A-C-E, A-D-E. And I want to 
   know latency for each path. So I make it hold the previous task Id. So when Latencymark reached at task E, Task E received three latencymark. And the lastTaskId will be B, C, and E, respectively. Of course, The lastTaskId itself does not have a meaning, but I expected that we would know the latency for each path if we followed the lastTaskId. This part may be controversial and I think it will be okay to remove if it is deemed unnecessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-913101243


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-897456660


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![No Duplication information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo-16px.png 'No Duplication information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=duplicated_lines_density&view=list) No Duplication information
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] wonook commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
wonook commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-942209976


   @taegeonum I think if all your comments have been addressed/answered, I think we could move on. How do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-927179268


   I added a Conditional statements to prevent sending duplicated latencymark. In this process, createdTimestamp and createdTaskId were used to check duplication. And all not used methods were removed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-917886749


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r705903886



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
##########
@@ -123,9 +140,63 @@ public TaskExecutor(final Task task,
     this.dataFetchers = pair.left();
     this.sortedHarnesses = pair.right();
 
+    // initialize metrics
+    this.numOfReadTupleMap = new HashMap<>();
+    this.lastSerializedReadByteMap = new HashMap<>();
+    for (DataFetcher dataFetcher : dataFetchers) {
+      this.numOfReadTupleMap.put(dataFetcher.getDataSource().getId(), new AtomicLong());
+      this.lastSerializedReadByteMap.put(dataFetcher.getDataSource().getId(), 0L);
+    }
+
+    // set the interval for recording stream metric
+    if (streamMetricRecordPeriod > 0) {
+      this.timeSinceLastRecordStreamMetric = System.currentTimeMillis();
+      this.periodicMetricService = Executors.newScheduledThreadPool(1);
+      this.periodicMetricService.scheduleAtFixedRate(
+        this::saveStreamMetric, 0, streamMetricRecordPeriod, TimeUnit.MILLISECONDS);
+    }
     this.timeSinceLastExecution = System.currentTimeMillis();
   }
 
+  // Send stream metric to the runtime master
+  private void saveStreamMetric() {

Review comment:
       method is called only [here](https://github.com/apache/incubator-nemo/blob/60715bd1bcabea0ec9abac1bec57959116962324/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java#L155) and It is called at separated thread. I'll add a comment about this context. 
   But anyway It is too inefficient that creating an additional thread for every task. However If I want to separate this, I have to store the created taskExecutors in Executor, And It is implemented [not merged PR](https://github.com/apache/incubator-nemo/pull/314/files). Is It okay to implement same Map in this PR?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-916642880


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum merged pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum merged pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on a change in pull request #317: [MINOR] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r690016081



##########
File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
##########
@@ -93,6 +94,13 @@ public void onWatermark(final Watermark watermark) {
     checkAndFinishBundle();
   }
 
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {
+    checkAndInvokeBundle();

Review comment:
       is it necessary? 

##########
File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
##########
@@ -160,6 +161,15 @@ public void onWatermark(final Watermark watermark) {
     checkAndFinishBundle();
   }
 
+
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {
+    checkAndInvokeBundle();

Review comment:
       is it necessary?
   
   

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
##########
@@ -106,6 +110,33 @@ private void setTaskDuration(final long taskDuration) {
     this.taskDuration = taskDuration;
   }
 
+  /**
+   * Method related to stream metric.
+   */
+  public final Map<String, List<StreamMetric>> getStreamMetric() {
+    return this.streamMetrics;
+  }
+
+  private void setStreamMetric(final Map<String, StreamMetric> streamMetricMap) {
+    for (String sourceVertexId : streamMetricMap.keySet()) {
+      StreamMetric streamMetric = streamMetricMap.get(sourceVertexId);
+      this.streamMetrics.putIfAbsent(sourceVertexId, new ArrayList<>());
+      this.streamMetrics.get(sourceVertexId).add(streamMetric);
+    }
+  }
+
+  /**
+   * Method related to latency.
+   */
+  public final Map<String, List<LatencyMetric>> getLatencymarks() {
+    return this.latencymarks;
+  }
+
+  private void addLatencymark(final LatencyMetric latencyMetric) {
+    this.latencymarks.putIfAbsent(latencyMetric.getLatencymark().getLastTaskId(), new ArrayList<>());

Review comment:
       Why `ArrayList`? Not `LinkedList`? 

##########
File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
##########
@@ -93,6 +94,13 @@ public void onWatermark(final Watermark watermark) {
     checkAndFinishBundle();
   }
 
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {
+    checkAndInvokeBundle();
+    getOutputCollector().emitLatencymark(latencymark);
+    checkAndFinishBundle();

Review comment:
       is it necessary? 

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;
+  private String lastTaskId;
+  private final long timestamp;
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdtaskId = taskId;
+    this.timestamp = timestamp;
+    this.lastTaskId = "";
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * @return the task id where it is created
+   */
+  public String getCreatedtaskId() {
+    return createdtaskId;
+  }
+
+
+  /**
+   * @return the task id where it is delivered from. task id of upstream task

Review comment:
       `task id of upstream task`? what does it mean? 

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;
+  private String lastTaskId;
+  private final long timestamp;
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdtaskId = taskId;
+    this.timestamp = timestamp;
+    this.lastTaskId = "";
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * @return the task id where it is created
+   */
+  public String getCreatedtaskId() {
+    return createdtaskId;
+  }
+
+
+  /**
+   * @return the task id where it is delivered from. task id of upstream task
+   */
+  public String getLastTaskId() {
+    return lastTaskId;
+  }
+
+  public void setLastTaskId(final String currTaskId) {
+    lastTaskId = currTaskId;
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final Latencymark latencymark = (Latencymark) o;
+    return (timestamp == latencymark.timestamp)
+      && (createdtaskId.equals(latencymark.createdtaskId)
+      && (lastTaskId.equals(latencymark.lastTaskId)));
+  }
+
+
+  @Override
+  public String toString() {
+    return String.valueOf("Latencymark(" + createdtaskId + ", " + timestamp + ")");

Review comment:
       `String.valueOf` is unnecessary

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.

Review comment:
       Why does it contain taskId? 

##########
File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
##########
@@ -166,6 +167,13 @@ public void onWatermark(final Watermark watermark) throws RuntimeException {
     checkAndFinishBundle();
   }
 
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {
+    checkAndInvokeBundle();
+    getOutputCollector().emitLatencymark(latencymark);
+    checkAndFinishBundle();

Review comment:
       is it necessary?
   
   

##########
File path: compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
##########
@@ -61,6 +62,11 @@ public void onWatermark(final Watermark watermark) {
     outputCollector.emitWatermark(watermark);
   }
 
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {

Review comment:
       Lots of duplicate `onLatencyMark` code blocks. It would be good to refactor it. 

##########
File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
##########
@@ -166,6 +167,13 @@ public void onWatermark(final Watermark watermark) throws RuntimeException {
     checkAndFinishBundle();
   }
 
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {
+    checkAndInvokeBundle();

Review comment:
       is it necessary?
   
   

##########
File path: conf/src/main/java/org/apache/nemo/conf/JobConf.java
##########
@@ -250,6 +250,25 @@
   public final class ExecutorJSONContents implements Name<String> {
   }
 
+  ///////////////////////// Metric Configurations
+  /**
+   * Period how often stream metrics are recorded. the unit of period is millisecond.
+   * -1 indicates that metrics are not recorded periodically.
+   */
+  @NamedParameter(doc = "Period how often stream-related metrics are recorded. the unit of period is millisecond.",
+    short_name = "stream_metric_period", default_value = "-1")
+  public final class StreamMetricPeriod implements Name<Integer> {
+  }
+
+  /**
+   * Period how often latencymarks are sent from source vertex. the unit of period is millisecond.
+   * -1 indicates that latencymarks are not sent.
+   */
+  @NamedParameter(doc = "Period how often latencymarks are sent from source vertex. the unit of period is millisecond.",

Review comment:
       the -> The

##########
File path: common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
##########
@@ -57,6 +58,8 @@
    */
   void onWatermark(Watermark watermark);
 

Review comment:
       Please add comments. 

##########
File path: compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
##########
@@ -64,6 +65,11 @@ public void onWatermark(final Watermark watermark) {
     outputCollector.emitWatermark(watermark);
   }
 
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {

Review comment:
       Lots of duplicate onLatencyMark code blocks. It would be good to refactor it.
   
   

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.common.metric;
+
+import org.apache.nemo.common.punctuation.Latencymark;
+
+import java.io.Serializable;
+
+/**
+ * Metric class for latency.
+ */
+public class LatencyMetric implements Serializable {

Review comment:
       final

##########
File path: conf/src/main/java/org/apache/nemo/conf/JobConf.java
##########
@@ -250,6 +250,25 @@
   public final class ExecutorJSONContents implements Name<String> {
   }
 
+  ///////////////////////// Metric Configurations
+  /**
+   * Period how often stream metrics are recorded. the unit of period is millisecond.
+   * -1 indicates that metrics are not recorded periodically.
+   */
+  @NamedParameter(doc = "Period how often stream-related metrics are recorded. the unit of period is millisecond.",

Review comment:
       the -> The

##########
File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
##########
@@ -160,6 +161,15 @@ public void onWatermark(final Watermark watermark) {
     checkAndFinishBundle();
   }
 
+
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {
+    checkAndInvokeBundle();
+    getOutputCollector().emitLatencymark(latencymark);
+    checkAndFinishBundle();

Review comment:
       is it necessary?
   
   

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
##########
@@ -84,6 +85,9 @@ public Object decode() throws IOException {
         final WatermarkWithIndex watermarkWithIndex =
           (WatermarkWithIndex) SerializationUtils.deserialize(inputStream);
         return watermarkWithIndex;
+      } else if (isWatermark == 0x02) {

Review comment:
       We need to change the variable name `isWatermark`

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
##########
@@ -304,6 +305,27 @@ public long getNumEncodedBytes() {
       }
       return numEncodedBytes;
     }
+
+    @Override
+    public long getCurrNumSerializedBytes() {
+      if (serializedCountingStream == null) {
+        return numSerializedBytes;
+      }
+      return numSerializedBytes + serializedCountingStream.getCount();
+    }
+
+    @Override
+    public long getCurrNumEncodedBytes() {

Review comment:
       What is the difference between getNumEncodedBytes and getCurrNumEncodedBytes?
   
   

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/StreamMetric.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.common.metric;
+
+import java.io.Serializable;
+
+/**
+ * Metric associated with stream. it is periodically recorded.
+ */
+public class StreamMetric implements Serializable {

Review comment:
       final

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.common.metric;
+
+import org.apache.nemo.common.punctuation.Latencymark;
+
+import java.io.Serializable;
+
+/**
+ * Metric class for latency.

Review comment:
       What is the purpose of this class?

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
##########
@@ -304,6 +305,27 @@ public long getNumEncodedBytes() {
       }
       return numEncodedBytes;
     }
+
+    @Override
+    public long getCurrNumSerializedBytes() {

Review comment:
       What is the difference between `getNumSerializedBytes` and `getCurrNumSerializedBytes`?  and why do we need this method? 

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
##########
@@ -261,6 +292,12 @@ public final String getId() {
   public final boolean processMetricMessage(final String metricField, final byte[] metricValue) {
     LOG.debug("metric {} has just arrived!", metricField);
     switch (metricField) {
+      case "streamMetric":

Review comment:
       Hard-coded strings doesn't look good. Maybe we need to refactor it in the future. 

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/StreamMetric.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.common.metric;
+
+import java.io.Serializable;
+
+/**
+ * Metric associated with stream. it is periodically recorded.

Review comment:
       Metric -> Metrics. it -> It 

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
##########
@@ -34,6 +36,8 @@
   private String containerId = "";
   private int scheduleAttempt = -1;
   private List<StateTransitionEvent<TaskState.State>> stateTransitionEvents = new ArrayList<>();
+  private final Map<String, List<StreamMetric>> streamMetrics = new HashMap<>();

Review comment:
       What is the key of this map? 

##########
File path: compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
##########
@@ -61,6 +62,11 @@ public void onWatermark(final Watermark watermark) {
     outputCollector.emitWatermark(watermark);
   }
 
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {

Review comment:
       Perhaps we can create an abstract class that contains 61-68 code blocks and inherit the abstract class here 

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
##########
@@ -131,4 +132,33 @@ public void emitWatermark(final Watermark watermark) {
       }
     }
   }
+
+  @Override
+  public void emitLatencymark(final Latencymark latencymark) {

Review comment:
       Lot's of duplicate codes with emitWatermark. We need refactoring 

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
##########
@@ -377,6 +450,22 @@ private void doExecute() {
     }
   }
 
+  /**
+   * Send data-processing metrics.
+   */
+  public void sendMetrics() {
+    metricMessageSender.send(TASK_METRIC_ID, taskId, "boundedSourceReadTime",

Review comment:
       Maybe we need to create Metrics class and add static variables, instead of using hard-coded strings. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899318465


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![No Duplication information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo-16px.png 'No Duplication information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=duplicated_lines_density&view=list) No Duplication information
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r690056935



##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.

Review comment:
       I considered it is possible that there are multiple stage that has source vertex. So I added createdTaskId field.
   latencymark does not record only latency between source to sink. It record latency between every task in the path. So I added lastTaskId field to track path.

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;
+  private String lastTaskId;
+  private final long timestamp;
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdtaskId = taskId;
+    this.timestamp = timestamp;
+    this.lastTaskId = "";
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * @return the task id where it is created
+   */
+  public String getCreatedtaskId() {
+    return createdtaskId;
+  }
+
+
+  /**
+   * @return the task id where it is delivered from. task id of upstream task

Review comment:
       latency mark tracks path from source to sink, So It means where this mark from. In the path from source to sink, It indicates the task id of the previous task. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-917874270


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r706973929



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
##########
@@ -123,9 +140,63 @@ public TaskExecutor(final Task task,
     this.dataFetchers = pair.left();
     this.sortedHarnesses = pair.right();
 
+    // initialize metrics
+    this.numOfReadTupleMap = new HashMap<>();
+    this.lastSerializedReadByteMap = new HashMap<>();
+    for (DataFetcher dataFetcher : dataFetchers) {
+      this.numOfReadTupleMap.put(dataFetcher.getDataSource().getId(), new AtomicLong());
+      this.lastSerializedReadByteMap.put(dataFetcher.getDataSource().getId(), 0L);
+    }
+
+    // set the interval for recording stream metric
+    if (streamMetricRecordPeriod > 0) {
+      this.timeSinceLastRecordStreamMetric = System.currentTimeMillis();
+      this.periodicMetricService = Executors.newScheduledThreadPool(1);
+      this.periodicMetricService.scheduleAtFixedRate(
+        this::saveStreamMetric, 0, streamMetricRecordPeriod, TimeUnit.MILLISECONDS);
+    }
     this.timeSinceLastExecution = System.currentTimeMillis();
   }
 
+  // Send stream metric to the runtime master
+  private void saveStreamMetric() {

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r707882330



##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.

Review comment:
       What happen in the following situation?
   -----------B ------ C --------
   /                                 \                  \
   A -------- D ------   E -------- F
   
   There are three paths: 
   1) A->B->C->F 
   2) A->B->C->E->F
   3) A->D->E->F
   
   How can we distinguish 2) and 3) ?

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;
+  private String lastTaskId;
+  private final long timestamp;
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdtaskId = taskId;
+    this.timestamp = timestamp;
+    this.lastTaskId = "";
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * @return the task id where it is created
+   */
+  public String getCreatedtaskId() {

Review comment:
       I cannot find the usage of `getCreatedtaskId()`. 

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;
+  private String lastTaskId;
+  private final long timestamp;
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdtaskId = taskId;
+    this.timestamp = timestamp;
+    this.lastTaskId = "";
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * @return the task id where it is created
+   */
+  public String getCreatedtaskId() {
+    return createdtaskId;
+  }
+
+
+  /**
+   * @return the task id where it is delivered from. task id of upstream task

Review comment:
       Why don't you change the name? getPreviousTaskId ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais edited a comment on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais edited a comment on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-925584649


   > @Lemarais ping :)
   
   I'm sorry for late. It was delayed due to personal reasons. I will revise it and commit it by 9/25.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r703959364



##########
File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
##########
@@ -299,6 +300,12 @@ public final void emit(final WindowedValue<KV<K, OutputT>> output) {
       oc.emit(output);
     }
 
+    /** Emit latencymark. */
+    @Override

Review comment:
       Unnecessary codes because AbstractDoFnTransform has the same code block? 

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.common.metric;
+
+import org.apache.nemo.common.punctuation.Latencymark;
+
+import java.io.Serializable;
+
+/**
+ * Metric class for recording latencymark and the time when the latencymark is recorded.
+ * The traversal time can be calculated by comparing the time when the latencymark was created with the time recorded.
+ */
+public final class LatencyMetric implements Serializable {
+  private Latencymark latencymark;
+  private long timestamp;

Review comment:
       Why not final?

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.common.metric;
+
+import org.apache.nemo.common.punctuation.Latencymark;
+
+import java.io.Serializable;
+
+/**
+ * Metric class for recording latencymark and the time when the latencymark is recorded.
+ * The traversal time can be calculated by comparing the time when the latencymark was created with the time recorded.
+ */
+public final class LatencyMetric implements Serializable {
+  private Latencymark latencymark;

Review comment:
       Why not final?

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.

Review comment:
       This contains two task id: createdTaskId and lastTaskId. I would like to know why the two fields are required. 

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
##########
@@ -106,6 +116,33 @@ private void setTaskDuration(final long taskDuration) {
     this.taskDuration = taskDuration;
   }
 
+  /**
+   * Method related to stream metric.
+   */
+  public final Map<String, List<StreamMetric>> getStreamMetric() {
+    return this.streamMetrics;
+  }
+
+  private void setStreamMetric(final Map<String, StreamMetric> streamMetricMap) {
+    for (String sourceVertexId : streamMetricMap.keySet()) {
+      StreamMetric streamMetric = streamMetricMap.get(sourceVertexId);
+      this.streamMetrics.putIfAbsent(sourceVertexId, new LinkedList<>());
+      this.streamMetrics.get(sourceVertexId).add(streamMetric);
+    }
+  }
+
+  /**
+   * Method related to latency.
+   */
+  public final Map<String, List<LatencyMetric>> getLatencymarks() {
+    return this.latencymarks;

Review comment:
       remove this 

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;

Review comment:
       Where is it used?

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;
+  private String lastTaskId;
+  private final long timestamp;
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdtaskId = taskId;
+    this.timestamp = timestamp;
+    this.lastTaskId = "";
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * @return the task id where it is created
+   */
+  public String getCreatedtaskId() {
+    return createdtaskId;
+  }
+
+
+  /**
+   * @return the task id where it is delivered from. task id of upstream task

Review comment:
       Is it currentTask? 

##########
File path: common/src/main/java/org/apache/nemo/common/ir/vertex/transform/LatencymarkEmitTransform.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.ir.vertex.transform;
+
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Latencymark;
+
+/**
+ * This transform does not emit watermarks.

Review comment:
       Is this comment correct? 

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
##########
@@ -106,6 +116,33 @@ private void setTaskDuration(final long taskDuration) {
     this.taskDuration = taskDuration;
   }
 
+  /**
+   * Method related to stream metric.
+   */
+  public final Map<String, List<StreamMetric>> getStreamMetric() {
+    return this.streamMetrics;

Review comment:
       return streamMetrics

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.common.metric;
+
+import org.apache.nemo.common.punctuation.Latencymark;
+
+import java.io.Serializable;
+
+/**
+ * Metric class for recording latencymark and the time when the latencymark is recorded.
+ * The traversal time can be calculated by comparing the time when the latencymark was created with the time recorded.
+ */
+public final class LatencyMetric implements Serializable {
+  private Latencymark latencymark;
+  private long timestamp;
+
+  /**
+   * Constructor with the latencymark and timestamp.
+   *
+   * @param latencymark the latencymark what task received.

Review comment:
       ?

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
##########
@@ -106,6 +116,33 @@ private void setTaskDuration(final long taskDuration) {
     this.taskDuration = taskDuration;
   }
 
+  /**
+   * Method related to stream metric.
+   */
+  public final Map<String, List<StreamMetric>> getStreamMetric() {
+    return this.streamMetrics;
+  }
+
+  private void setStreamMetric(final Map<String, StreamMetric> streamMetricMap) {
+    for (String sourceVertexId : streamMetricMap.keySet()) {
+      StreamMetric streamMetric = streamMetricMap.get(sourceVertexId);
+      this.streamMetrics.putIfAbsent(sourceVertexId, new LinkedList<>());

Review comment:
       remove this

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;
+  private String lastTaskId;
+  private final long timestamp;
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdtaskId = taskId;
+    this.timestamp = timestamp;
+    this.lastTaskId = "";
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * @return the task id where it is created
+   */
+  public String getCreatedtaskId() {

Review comment:
       Where is it used?

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.

Review comment:
       But, the comment says that `taskId` is the task where the event is created. It is not lastTaskId. Is this comment incorrect?  

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
##########
@@ -106,6 +116,33 @@ private void setTaskDuration(final long taskDuration) {
     this.taskDuration = taskDuration;
   }
 
+  /**
+   * Method related to stream metric.
+   */
+  public final Map<String, List<StreamMetric>> getStreamMetric() {
+    return this.streamMetrics;
+  }
+
+  private void setStreamMetric(final Map<String, StreamMetric> streamMetricMap) {
+    for (String sourceVertexId : streamMetricMap.keySet()) {
+      StreamMetric streamMetric = streamMetricMap.get(sourceVertexId);
+      this.streamMetrics.putIfAbsent(sourceVertexId, new LinkedList<>());
+      this.streamMetrics.get(sourceVertexId).add(streamMetric);

Review comment:
       remove this

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;
+  private String lastTaskId;
+  private final long timestamp;
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdtaskId = taskId;
+    this.timestamp = timestamp;
+    this.lastTaskId = "";
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * @return the task id where it is created
+   */
+  public String getCreatedtaskId() {
+    return createdtaskId;
+  }
+
+
+  /**
+   * @return the task id where it is delivered from. task id of upstream task

Review comment:
       The field name `lastTaskId` is confusing to me. Isn't it sink? What is the last? Is it previousTask? or lastTask? 

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
##########
@@ -123,9 +140,63 @@ public TaskExecutor(final Task task,
     this.dataFetchers = pair.left();
     this.sortedHarnesses = pair.right();
 
+    // initialize metrics
+    this.numOfReadTupleMap = new HashMap<>();
+    this.lastSerializedReadByteMap = new HashMap<>();
+    for (DataFetcher dataFetcher : dataFetchers) {
+      this.numOfReadTupleMap.put(dataFetcher.getDataSource().getId(), new AtomicLong());
+      this.lastSerializedReadByteMap.put(dataFetcher.getDataSource().getId(), 0L);
+    }
+
+    // set the interval for recording stream metric
+    if (streamMetricRecordPeriod > 0) {
+      this.timeSinceLastRecordStreamMetric = System.currentTimeMillis();
+      this.periodicMetricService = Executors.newScheduledThreadPool(1);
+      this.periodicMetricService.scheduleAtFixedRate(
+        this::saveStreamMetric, 0, streamMetricRecordPeriod, TimeUnit.MILLISECONDS);
+    }
     this.timeSinceLastExecution = System.currentTimeMillis();
   }
 
+  // Send stream metric to the runtime master
+  private void saveStreamMetric() {

Review comment:
       Sending metrics in each task may lead to huge overheads if the number of tasks is large. Can't we use separate threads for sending metrics? Maybe we need another class for retrieving and sending task metrics.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-916642880


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r690253204



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
##########
@@ -131,4 +132,33 @@ public void emitWatermark(final Watermark watermark) {
       }
     }
   }
+
+  @Override
+  public void emitLatencymark(final Latencymark latencymark) {

Review comment:
       Does it means that we need to combine emitLatencymark and emitWatermark?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r702384550



##########
File path: common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
##########
@@ -57,6 +58,8 @@
    */
   void onWatermark(Watermark watermark);
 

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r702384100



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
##########
@@ -131,4 +132,33 @@ public void emitWatermark(final Watermark watermark) {
       }
     }
   }
+
+  @Override
+  public void emitLatencymark(final Latencymark latencymark) {

Review comment:
       I tried to merge it with a method called emitWatermark. However, until the methods of writer and transform are divided, merging only methods of OutputCollector looks not good. emitLatencymark and emitWatermark looks similar but eventually watermark and latencymark have to be handled differently. So I suggest remaining both of methods, emitLatencymark and emitWatermark. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899318465


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![No Duplication information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo-16px.png 'No Duplication information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=duplicated_lines_density&view=list) No Duplication information
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r690063094



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
##########
@@ -304,6 +305,27 @@ public long getNumEncodedBytes() {
       }
       return numEncodedBytes;
     }
+
+    @Override
+    public long getCurrNumSerializedBytes() {

Review comment:
       [getNumSerializedBytes](https://github.com/apache/incubator-nemo/blob/74d4a86b5c99418922317be41f0655d2dc48be6a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java#L484) should be called after the actual data is taken out of iterator. and it is used to check whether this task is over or not. 
   
   I don't want to make a side effect by modifying getNumSerializedBytes. 
   So I created a new method [getCurrNumSerializedBytes](https://github.com/apache/incubator-nemo/blob/74d4a86b5c99418922317be41f0655d2dc48be6a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java#L494) to get serialized bytes before done. 
   

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
##########
@@ -304,6 +305,27 @@ public long getNumEncodedBytes() {
       }
       return numEncodedBytes;
     }
+
+    @Override
+    public long getCurrNumSerializedBytes() {
+      if (serializedCountingStream == null) {
+        return numSerializedBytes;
+      }
+      return numSerializedBytes + serializedCountingStream.getCount();
+    }
+
+    @Override
+    public long getCurrNumEncodedBytes() {

Review comment:
       It has same definition and reason with getCurrNumSerializedBytes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r705804055



##########
File path: common/src/main/java/org/apache/nemo/common/ir/vertex/transform/LatencymarkEmitTransform.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.ir.vertex.transform;
+
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Latencymark;
+
+/**
+ * This transform does not emit watermarks.

Review comment:
       No, you are right. I'll correct this comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899411109


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-918805365


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-945704566


   Sorry for the late reply. LGTM. I will merge it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r705909154



##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
##########
@@ -106,6 +116,33 @@ private void setTaskDuration(final long taskDuration) {
     this.taskDuration = taskDuration;
   }
 
+  /**
+   * Method related to stream metric.
+   */
+  public final Map<String, List<StreamMetric>> getStreamMetric() {
+    return this.streamMetrics;
+  }
+
+  private void setStreamMetric(final Map<String, StreamMetric> streamMetricMap) {
+    for (String sourceVertexId : streamMetricMap.keySet()) {
+      StreamMetric streamMetric = streamMetricMap.get(sourceVertexId);
+      this.streamMetrics.putIfAbsent(sourceVertexId, new LinkedList<>());

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899440875


   yes you are right. it measures only the record traversal time from src to sink. this is debugging purpose, we can enabling or disabling latency mark through arguments --latencymark_period, and default setting is disabled 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-917823150


   okay. I'll check error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r705822751



##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
##########
@@ -106,6 +116,33 @@ private void setTaskDuration(final long taskDuration) {
     this.taskDuration = taskDuration;
   }
 
+  /**
+   * Method related to stream metric.
+   */
+  public final Map<String, List<StreamMetric>> getStreamMetric() {
+    return this.streamMetrics;

Review comment:
       done

##########
File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
##########
@@ -299,6 +300,12 @@ public final void emit(final WindowedValue<KV<K, OutputT>> output) {
       oc.emit(output);
     }
 
+    /** Emit latencymark. */
+    @Override

Review comment:
       AbstractDoFnTransform has the onLatencymark method. 
   It is a method of GBKOutputCollector class

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
##########
@@ -106,6 +116,33 @@ private void setTaskDuration(final long taskDuration) {
     this.taskDuration = taskDuration;
   }
 
+  /**
+   * Method related to stream metric.
+   */
+  public final Map<String, List<StreamMetric>> getStreamMetric() {
+    return this.streamMetrics;
+  }
+
+  private void setStreamMetric(final Map<String, StreamMetric> streamMetricMap) {
+    for (String sourceVertexId : streamMetricMap.keySet()) {
+      StreamMetric streamMetric = streamMetricMap.get(sourceVertexId);
+      this.streamMetrics.putIfAbsent(sourceVertexId, new LinkedList<>());
+      this.streamMetrics.get(sourceVertexId).add(streamMetric);
+    }
+  }
+
+  /**
+   * Method related to latency.
+   */
+  public final Map<String, List<LatencyMetric>> getLatencymarks() {
+    return this.latencymarks;

Review comment:
       done

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
##########
@@ -106,6 +116,33 @@ private void setTaskDuration(final long taskDuration) {
     this.taskDuration = taskDuration;
   }
 
+  /**
+   * Method related to stream metric.
+   */
+  public final Map<String, List<StreamMetric>> getStreamMetric() {
+    return this.streamMetrics;
+  }
+
+  private void setStreamMetric(final Map<String, StreamMetric> streamMetricMap) {
+    for (String sourceVertexId : streamMetricMap.keySet()) {
+      StreamMetric streamMetric = streamMetricMap.get(sourceVertexId);
+      this.streamMetrics.putIfAbsent(sourceVertexId, new LinkedList<>());
+      this.streamMetrics.get(sourceVertexId).add(streamMetric);

Review comment:
       done

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.common.metric;
+
+import org.apache.nemo.common.punctuation.Latencymark;
+
+import java.io.Serializable;
+
+/**
+ * Metric class for recording latencymark and the time when the latencymark is recorded.
+ * The traversal time can be calculated by comparing the time when the latencymark was created with the time recorded.
+ */
+public final class LatencyMetric implements Serializable {
+  private Latencymark latencymark;
+  private long timestamp;

Review comment:
       done

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;

Review comment:
       It is used to indicates the source task

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
##########
@@ -123,9 +140,63 @@ public TaskExecutor(final Task task,
     this.dataFetchers = pair.left();
     this.sortedHarnesses = pair.right();
 
+    // initialize metrics
+    this.numOfReadTupleMap = new HashMap<>();
+    this.lastSerializedReadByteMap = new HashMap<>();
+    for (DataFetcher dataFetcher : dataFetchers) {
+      this.numOfReadTupleMap.put(dataFetcher.getDataSource().getId(), new AtomicLong());
+      this.lastSerializedReadByteMap.put(dataFetcher.getDataSource().getId(), 0L);
+    }
+
+    // set the interval for recording stream metric
+    if (streamMetricRecordPeriod > 0) {
+      this.timeSinceLastRecordStreamMetric = System.currentTimeMillis();
+      this.periodicMetricService = Executors.newScheduledThreadPool(1);
+      this.periodicMetricService.scheduleAtFixedRate(
+        this::saveStreamMetric, 0, streamMetricRecordPeriod, TimeUnit.MILLISECONDS);
+    }
     this.timeSinceLastExecution = System.currentTimeMillis();
   }
 
+  // Send stream metric to the runtime master
+  private void saveStreamMetric() {

Review comment:
       This method is called only (here)[https://github.com/apache/incubator-nemo/pull/314/files] and It is called at separated thread. I'll add a comment about this context. 
   But anyway It is too inefficient that creating an additional thread for every task. However If I want to separate this, I have to store the created taskExecutors in Executor, And It is implemented (not merged PR)[https://github.com/apache/incubator-nemo/pull/314/files]. Is It okay to implement same Map in this PR?
   

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.common.metric;
+
+import org.apache.nemo.common.punctuation.Latencymark;
+
+import java.io.Serializable;
+
+/**
+ * Metric class for recording latencymark and the time when the latencymark is recorded.
+ * The traversal time can be calculated by comparing the time when the latencymark was created with the time recorded.
+ */
+public final class LatencyMetric implements Serializable {
+  private Latencymark latencymark;
+  private long timestamp;
+
+  /**
+   * Constructor with the latencymark and timestamp.
+   *
+   * @param latencymark the latencymark what task received.

Review comment:
       done

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;
+  private String lastTaskId;
+  private final long timestamp;
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdtaskId = taskId;
+    this.timestamp = timestamp;
+    this.lastTaskId = "";
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * @return the task id where it is created
+   */
+  public String getCreatedtaskId() {

Review comment:
       As I know, every getter is used when metrics are stored in json file. 
   If it is not exist, createdTaskId does not stored in json file

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;
+  private String lastTaskId;
+  private final long timestamp;
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdtaskId = taskId;
+    this.timestamp = timestamp;
+    this.lastTaskId = "";
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * @return the task id where it is created
+   */
+  public String getCreatedtaskId() {
+    return createdtaskId;
+  }
+
+
+  /**
+   * @return the task id where it is delivered from. task id of upstream task

Review comment:
       It is previousTask I explained previous comment. 

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.common.metric;
+
+import org.apache.nemo.common.punctuation.Latencymark;
+
+import java.io.Serializable;
+
+/**
+ * Metric class for recording latencymark and the time when the latencymark is recorded.
+ * The traversal time can be calculated by comparing the time when the latencymark was created with the time recorded.
+ */
+public final class LatencyMetric implements Serializable {
+  private Latencymark latencymark;

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r705809356



##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.

Review comment:
       First of all, Pair of createdTaskId and timestamp indicates the starting point. For examples,
         -----------B-----------
       /                                        \          A to E indicates tasks
    A ------------C-----------E        dash indicates edges
      \                                         /           A is source, and E is sink
        ------------D----------     
   Latencymark is created from task A with timestamp and createdTaskId. 
   And the lastTaskId needs to follow each path. 
   When Latencymark reached at task E, there are totally three path, A-B-E, A-C-E, A-D-E. And I want to 
   know latency for each path. So I make it hold the previous task Id. So when Latencymark reached at task E, Task E received three latencymark. And the lastTaskId will be B, C, and E, respectively. Of course, The lastTaskId itself does not have a meaning, but I expected that we would know the latency for each path if we followed the lastTaskId. This part may be controversial and I think it will be okay to remove if it is deemed unnecessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r705903886



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
##########
@@ -123,9 +140,63 @@ public TaskExecutor(final Task task,
     this.dataFetchers = pair.left();
     this.sortedHarnesses = pair.right();
 
+    // initialize metrics
+    this.numOfReadTupleMap = new HashMap<>();
+    this.lastSerializedReadByteMap = new HashMap<>();
+    for (DataFetcher dataFetcher : dataFetchers) {
+      this.numOfReadTupleMap.put(dataFetcher.getDataSource().getId(), new AtomicLong());
+      this.lastSerializedReadByteMap.put(dataFetcher.getDataSource().getId(), 0L);
+    }
+
+    // set the interval for recording stream metric
+    if (streamMetricRecordPeriod > 0) {
+      this.timeSinceLastRecordStreamMetric = System.currentTimeMillis();
+      this.periodicMetricService = Executors.newScheduledThreadPool(1);
+      this.periodicMetricService.scheduleAtFixedRate(
+        this::saveStreamMetric, 0, streamMetricRecordPeriod, TimeUnit.MILLISECONDS);
+    }
     this.timeSinceLastExecution = System.currentTimeMillis();
   }
 
+  // Send stream metric to the runtime master
+  private void saveStreamMetric() {

Review comment:
       method is called only [here](https://github.com/apache/incubator-nemo/pull/317/files#diff-4a193406973f202873754b4e9fe163d4e0a9584b56fb7bef62671d7c69687167R156) and It is called at separated thread. I'll add a comment about this context. 
   But anyway It is too inefficient that creating an additional thread for every task. However If I want to separate this, I have to store the created taskExecutors in Executor, And It is implemented [not merged PR](https://github.com/apache/incubator-nemo/pull/314/files). Is It okay to implement same Map in this PR?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899381853


   I referred the flink to record latency. the mark is created only in source vertex .It has a timestamp when it was created, a task id where it created, and a task id where it delivered from. executor record the timestamp when each mark is reached. by this record, we can calculate the latency for each path.
   
   but what we need to be aware of is that it takes no time to process data and the time it stays in buffer when the data processed by aggregation transform. Only the time it stagnated is added. 
   
   There was a discussion with wonwook that it would be more accurate to use watermark. But It would be better to have both style of latency. flink style latency and latency calculated with watermark. 
   
   However, flink did not support recording every watermark. and when I thought about whether it would be good to record all the watermarks or not, i thought that it is not a good logic to merge into master branch. So I did not include recording watermark part. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899759058


   @taegeonum I've added some comments about metrics


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-897456660


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![No Duplication information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo-16px.png 'No Duplication information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=duplicated_lines_density&view=list) No Duplication information
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-897441032


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![No Duplication information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo-16px.png 'No Duplication information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=duplicated_lines_density&view=list) No Duplication information
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-897441032


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![No Duplication information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo-16px.png 'No Duplication information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=duplicated_lines_density&view=list) No Duplication information
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-918805365


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r709085987



##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;
+  private String previousTaskId;
+  private final long timestamp;
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdtaskId = taskId;
+    this.timestamp = timestamp;
+    this.previousTaskId = "";
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getTimestamp() {

Review comment:
       Sorry, I cannot find the usage of this code. Only `getPreviousTaskId()` is called.

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;

Review comment:
       Sorry, I cannot find the usage of this field. After collecting latency marks in the master, where is this used? I found that your code uses only the previousTaskId.

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;
+  private String lastTaskId;
+  private final long timestamp;
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdtaskId = taskId;
+    this.timestamp = timestamp;
+    this.lastTaskId = "";
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * @return the task id where it is created
+   */
+  public String getCreatedtaskId() {

Review comment:
       Sorry, I cannot find the usage of this code. Only `getPreviousTaskId()` is called.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r707882330



##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.

Review comment:
       What happen in the following situation?
   -----------B ------ C --------
   /  ............................  \  ..............  \
   A -------- D ------   E -------- F
   
   There are three paths: 
   1) A->B->C->F 
   2) A->B->C->E->F
   3) A->D->E->F
   
   How can we distinguish 2) and 3) ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899411109


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-917874270


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r705903886



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
##########
@@ -123,9 +140,63 @@ public TaskExecutor(final Task task,
     this.dataFetchers = pair.left();
     this.sortedHarnesses = pair.right();
 
+    // initialize metrics
+    this.numOfReadTupleMap = new HashMap<>();
+    this.lastSerializedReadByteMap = new HashMap<>();
+    for (DataFetcher dataFetcher : dataFetchers) {
+      this.numOfReadTupleMap.put(dataFetcher.getDataSource().getId(), new AtomicLong());
+      this.lastSerializedReadByteMap.put(dataFetcher.getDataSource().getId(), 0L);
+    }
+
+    // set the interval for recording stream metric
+    if (streamMetricRecordPeriod > 0) {
+      this.timeSinceLastRecordStreamMetric = System.currentTimeMillis();
+      this.periodicMetricService = Executors.newScheduledThreadPool(1);
+      this.periodicMetricService.scheduleAtFixedRate(
+        this::saveStreamMetric, 0, streamMetricRecordPeriod, TimeUnit.MILLISECONDS);
+    }
     this.timeSinceLastExecution = System.currentTimeMillis();
   }
 
+  // Send stream metric to the runtime master
+  private void saveStreamMetric() {

Review comment:
       method is called only [here](https://github.com/apache/incubator-nemo/pull/314/files) and It is called at separated thread. I'll add a comment about this context. 
   But anyway It is too inefficient that creating an additional thread for every task. However If I want to separate this, I have to store the created taskExecutors in Executor, And It is implemented [not merged PR](https://github.com/apache/incubator-nemo/pull/314/files). Is It okay to implement same Map in this PR?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-913102745


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899404442


   @Lemarais Could you please explain about what flink-style latency is? I don't think we need another event to measure latency. Each event also has a timestamp, so we can calculate the latency of each event by (current_processing_time - event_timestamp).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais edited a comment on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais edited a comment on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899440875


   yes you are right. it measures only the record traversal time from src to sink. this is debugging purpose, we can enable or disable latency mark through arguments --latencymark_period, and default setting is disabled 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r707986041



##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
##########
@@ -106,6 +116,35 @@ private void setTaskDuration(final long taskDuration) {
     this.taskDuration = taskDuration;
   }
 
+  /**
+   * Method related to stream metric.
+   *
+   * @return the streamMetrics
+   */
+  public final Map<String, List<StreamMetric>> getStreamMetric() {
+    return streamMetrics;
+  }
+
+  private void setStreamMetric(final Map<String, StreamMetric> streamMetricMap) {
+    for (String sourceVertexId : streamMetricMap.keySet()) {
+      StreamMetric streamMetric = streamMetricMap.get(sourceVertexId);
+      streamMetrics.putIfAbsent(sourceVertexId, new LinkedList<>());
+      streamMetrics.get(sourceVertexId).add(streamMetric);
+    }
+  }
+
+  /**
+   * Method related to latency.
+   */
+  public final Map<String, List<LatencyMetric>> getLatencymarks() {

Review comment:
       Where is it used? 

##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;
+  private String lastTaskId;
+  private final long timestamp;
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdtaskId = taskId;
+    this.timestamp = timestamp;
+    this.lastTaskId = "";
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * @return the task id where it is created
+   */
+  public String getCreatedtaskId() {

Review comment:
       Could you please let me know the code block? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r708014142



##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;
+  private String lastTaskId;
+  private final long timestamp;
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdtaskId = taskId;
+    this.timestamp = timestamp;
+    this.lastTaskId = "";
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * @return the task id where it is created
+   */
+  public String getCreatedtaskId() {

Review comment:
       storing json file is start from [here](https://github.com/apache/incubator-nemo/blob/65440c1a9acc5fdc4b7705de61610499ed2f6b84/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java#L217). But I don't know about internal process




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r707911107



##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.

Review comment:
       traverse time from E to F will not significantly different. So if we want to distinguish 2) and 3) we can check the record of the previous task. Then we can find a difference between C to E and D to E. Rather than the metric itself is meaningful, It needs an effort to gather transmission times between nodes
   
   I think the departure time from the previous task will be important. But In the current structure, There seems little bit difficult to judge. because several Latencymark with the same source task id and created time can be sent. 
   
   I will change to record the time when the latencymark departed from the previous task or modify to send only unique(with unique pair of createTaskid and createdTimestamp) Latencymark




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-917796959


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r707882330



##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.

Review comment:
       What happen in the following situation?
   -----------B ------ C --------
   / &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;\ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; \\
   A -------- D ------   E -------- F
   
   There are three paths: 
   1) A->B->C->F 
   2) A->B->C->E->F
   3) A->D->E->F
   
   How can we distinguish 2) and 3) if we only hold the createdTaskId and the previousTaskId? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r707882330



##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.

Review comment:
       What happen in the following situation?
   -----------B ------ C --------
   /  ............................  \  ..............\
   A -------- D ------   E -------- F
   
   There are three paths: 
   1) A->B->C->F 
   2) A->B->C->E->F
   3) A->D->E->F
   
   How can we distinguish 2) and 3) ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-916645545


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-916646055


   @taegeonum I've addressed all comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899748195


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on pull request #317: [MINOR] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899968164


   @Lemarais Also, Could you please create a jira issue? It doesn't look like a minor issue. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r702384565



##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;
+  private String lastTaskId;
+  private final long timestamp;
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdtaskId = taskId;
+    this.timestamp = timestamp;
+    this.lastTaskId = "";
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * @return the task id where it is created
+   */
+  public String getCreatedtaskId() {
+    return createdtaskId;
+  }
+
+
+  /**
+   * @return the task id where it is delivered from. task id of upstream task
+   */
+  public String getLastTaskId() {
+    return lastTaskId;
+  }
+
+  public void setLastTaskId(final String currTaskId) {
+    lastTaskId = currTaskId;
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final Latencymark latencymark = (Latencymark) o;
+    return (timestamp == latencymark.timestamp)
+      && (createdtaskId.equals(latencymark.createdtaskId)
+      && (lastTaskId.equals(latencymark.lastTaskId)));
+  }
+
+
+  @Override
+  public String toString() {
+    return String.valueOf("Latencymark(" + createdtaskId + ", " + timestamp + ")");

Review comment:
       done

##########
File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
##########
@@ -93,6 +94,13 @@ public void onWatermark(final Watermark watermark) {
     checkAndFinishBundle();
   }
 
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {
+    checkAndInvokeBundle();

Review comment:
       done

##########
File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
##########
@@ -93,6 +94,13 @@ public void onWatermark(final Watermark watermark) {
     checkAndFinishBundle();
   }
 
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {
+    checkAndInvokeBundle();
+    getOutputCollector().emitLatencymark(latencymark);
+    checkAndFinishBundle();

Review comment:
       done

##########
File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
##########
@@ -166,6 +167,13 @@ public void onWatermark(final Watermark watermark) throws RuntimeException {
     checkAndFinishBundle();
   }
 
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {
+    checkAndInvokeBundle();

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-927178636


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r702384596



##########
File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
##########
@@ -166,6 +167,13 @@ public void onWatermark(final Watermark watermark) throws RuntimeException {
     checkAndFinishBundle();
   }
 
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {
+    checkAndInvokeBundle();
+    getOutputCollector().emitLatencymark(latencymark);
+    checkAndFinishBundle();

Review comment:
       done

##########
File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
##########
@@ -160,6 +161,15 @@ public void onWatermark(final Watermark watermark) {
     checkAndFinishBundle();
   }
 
+
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {
+    checkAndInvokeBundle();

Review comment:
       done

##########
File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
##########
@@ -160,6 +161,15 @@ public void onWatermark(final Watermark watermark) {
     checkAndFinishBundle();
   }
 
+
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {
+    checkAndInvokeBundle();
+    getOutputCollector().emitLatencymark(latencymark);
+    checkAndFinishBundle();

Review comment:
       done

##########
File path: compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
##########
@@ -61,6 +62,11 @@ public void onWatermark(final Watermark watermark) {
     outputCollector.emitWatermark(watermark);
   }
 
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {

Review comment:
       done

##########
File path: compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
##########
@@ -64,6 +65,11 @@ public void onWatermark(final Watermark watermark) {
     outputCollector.emitWatermark(watermark);
   }
 
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {

Review comment:
       done

##########
File path: conf/src/main/java/org/apache/nemo/conf/JobConf.java
##########
@@ -250,6 +250,25 @@
   public final class ExecutorJSONContents implements Name<String> {
   }
 
+  ///////////////////////// Metric Configurations
+  /**
+   * Period how often stream metrics are recorded. the unit of period is millisecond.
+   * -1 indicates that metrics are not recorded periodically.
+   */
+  @NamedParameter(doc = "Period how often stream-related metrics are recorded. the unit of period is millisecond.",

Review comment:
       done

##########
File path: conf/src/main/java/org/apache/nemo/conf/JobConf.java
##########
@@ -250,6 +250,25 @@
   public final class ExecutorJSONContents implements Name<String> {
   }
 
+  ///////////////////////// Metric Configurations
+  /**
+   * Period how often stream metrics are recorded. the unit of period is millisecond.
+   * -1 indicates that metrics are not recorded periodically.
+   */
+  @NamedParameter(doc = "Period how often stream-related metrics are recorded. the unit of period is millisecond.",
+    short_name = "stream_metric_period", default_value = "-1")
+  public final class StreamMetricPeriod implements Name<Integer> {
+  }
+
+  /**
+   * Period how often latencymarks are sent from source vertex. the unit of period is millisecond.
+   * -1 indicates that latencymarks are not sent.
+   */
+  @NamedParameter(doc = "Period how often latencymarks are sent from source vertex. the unit of period is millisecond.",

Review comment:
       done

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.common.metric;
+
+import org.apache.nemo.common.punctuation.Latencymark;
+
+import java.io.Serializable;
+
+/**
+ * Metric class for latency.
+ */
+public class LatencyMetric implements Serializable {

Review comment:
       done

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.common.metric;
+
+import org.apache.nemo.common.punctuation.Latencymark;
+
+import java.io.Serializable;
+
+/**
+ * Metric class for latency.

Review comment:
       done

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/StreamMetric.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.common.metric;
+
+import java.io.Serializable;
+
+/**
+ * Metric associated with stream. it is periodically recorded.
+ */
+public class StreamMetric implements Serializable {

Review comment:
       done

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/StreamMetric.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.common.metric;
+
+import java.io.Serializable;
+
+/**
+ * Metric associated with stream. it is periodically recorded.

Review comment:
       done

##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
##########
@@ -106,6 +110,33 @@ private void setTaskDuration(final long taskDuration) {
     this.taskDuration = taskDuration;
   }
 
+  /**
+   * Method related to stream metric.
+   */
+  public final Map<String, List<StreamMetric>> getStreamMetric() {
+    return this.streamMetrics;
+  }
+
+  private void setStreamMetric(final Map<String, StreamMetric> streamMetricMap) {
+    for (String sourceVertexId : streamMetricMap.keySet()) {
+      StreamMetric streamMetric = streamMetricMap.get(sourceVertexId);
+      this.streamMetrics.putIfAbsent(sourceVertexId, new ArrayList<>());
+      this.streamMetrics.get(sourceVertexId).add(streamMetric);
+    }
+  }
+
+  /**
+   * Method related to latency.
+   */
+  public final Map<String, List<LatencyMetric>> getLatencymarks() {
+    return this.latencymarks;
+  }
+
+  private void addLatencymark(final LatencyMetric latencyMetric) {
+    this.latencymarks.putIfAbsent(latencyMetric.getLatencymark().getLastTaskId(), new ArrayList<>());

Review comment:
       done

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
##########
@@ -84,6 +85,9 @@ public Object decode() throws IOException {
         final WatermarkWithIndex watermarkWithIndex =
           (WatermarkWithIndex) SerializationUtils.deserialize(inputStream);
         return watermarkWithIndex;
+      } else if (isWatermark == 0x02) {

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r702388791



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
##########
@@ -131,4 +132,33 @@ public void emitWatermark(final Watermark watermark) {
       }
     }
   }
+
+  @Override
+  public void emitLatencymark(final Latencymark latencymark) {

Review comment:
       Or it's also welcome to think about a better structure together.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899325488


   @Lemarais Could you give me high-level explanation about what `LatencyMark` is and how to measure latency with `LatencyMark`? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899447442


   Thanks for the clarification. Could you please add some comments about what
   we discussed in this PR?
   
   2021년 8월 16일 (월) 오후 8:38, Lemarais ***@***.***>님이 작성:
   
   > yes you are right. it measures only the record traversal time from src to
   > sink. this is debugging purpose, we can enabling or disabling latency mark
   > through arguments --latencymark_period, and default setting is disabled
   >
   > —
   > You are receiving this because you were mentioned.
   >
   >
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/incubator-nemo/pull/317#issuecomment-899440875>,
   > or unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AAKOZ5YF7G3DPZ4ZY672I73T5D2BVANCNFSM5CARPO5A>
   > .
   > Triage notifications on the go with GitHub Mobile for iOS
   > <https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
   > or Android
   > <https://play.google.com/store/apps/details?id=com.github.android&utm_campaign=notification-email>
   > .
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r709088199



##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
##########
@@ -106,6 +116,35 @@ private void setTaskDuration(final long taskDuration) {
     this.taskDuration = taskDuration;
   }
 
+  /**
+   * Method related to stream metric.
+   *
+   * @return the streamMetrics
+   */
+  public final Map<String, List<StreamMetric>> getStreamMetric() {
+    return streamMetrics;
+  }
+
+  private void setStreamMetric(final Map<String, StreamMetric> streamMetricMap) {
+    for (String sourceVertexId : streamMetricMap.keySet()) {
+      StreamMetric streamMetric = streamMetricMap.get(sourceVertexId);
+      streamMetrics.putIfAbsent(sourceVertexId, new LinkedList<>());
+      streamMetrics.get(sourceVertexId).add(streamMetric);
+    }
+  }
+
+  /**
+   * Method related to latency.
+   */
+  public final Map<String, List<LatencyMetric>> getLatencymarks() {

Review comment:
       It doesn't look like this is used for storing json file. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-916645545


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] wonook commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
wonook commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-942209976


   @taegeonum I think if all your comments have been addressed/answered, I think we could move on. How do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-917886749


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899752556


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-925584649


   > @Lemarais ping :)
   I'm sorry for late. It was delayed due to personal reasons. I will revise it and commit it by 9/25.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r707882330



##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.

Review comment:
       What happen in the following situation?
   -----------B ------ C --------
   / &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;\ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; \\
   A -------- D ------   E -------- F
   
   There are three paths: 
   1) A->B->C->F 
   2) A->B->C->E->F
   3) A->D->E->F
   
   How can we distinguish 2) and 3) ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899417187


   [flink style latency](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/) is
   >Note that the latency markers are not accounting for the time user records spend in operators as they are bypassing them. In particular the markers are not accounting for the time records spend for example in window buffers. Only if operators are not able to accept new records, thus they are queuing up, the latency measured using the markers will reflect that.
   
   Can you explain what event means? First I tried to use watermark to calculate latency, But timestamp didn't always mean the time it was created. When using bounded sources, sometimes different values are intentionally assigned. when watermarks are reached at aggregation vertex, watermarks with long max values or long min values were created, and this watermark was transferred to the next task alone. So there is a limit to calculate latency using only watermarks. So I added new marker to calculate latency.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r690071971



##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
##########
@@ -34,6 +36,8 @@
   private String containerId = "";
   private int scheduleAttempt = -1;
   private List<StateTransitionEvent<TaskState.State>> stateTransitionEvents = new ArrayList<>();
+  private final Map<String, List<StreamMetric>> streamMetrics = new HashMap<>();

Review comment:
       source vertex id of data fetcher. It means where is the data come from. 
   
   I tried to use taskId instead. But there are no way to get previous task id in the task. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r705809356



##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.

Review comment:
       First of all, Pair of createdTaskId and timestamp indicates the starting point. For examples,
     &nbsp;    -----------B-----------
       /&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;\    &nbsp;&nbsp; &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;     A to E indicates tasks
    A ------------C-----------E    &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;    dash indicates edges
     \ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;/    &nbsp;  &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;   &nbsp;&nbsp;  A is a source, and E is a sink
    &nbsp;&nbsp;     ------------D----------     
   Latencymark is created from task A with timestamp and createdTaskId. 
   And the lastTaskId needs to follow each path. 
   When Latencymark reached at task E, there are totally three path, A-B-E, A-C-E, A-D-E. And I want to 
   know latency for each path. So I make it hold the previous task Id. So when Latencymark reached at task E, Task E received three latencymark. And the lastTaskId will be B, C, and E, respectively. Of course, The lastTaskId itself does not have a meaning, but I expected that we would know the latency for each path if we followed the lastTaskId. This part may be controversial and I think it will be okay to remove if it is deemed unnecessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-917818603


   @Lemarais Could  you please take a look why the test failed and resolve the error? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899752556


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r708015895



##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
##########
@@ -106,6 +116,35 @@ private void setTaskDuration(final long taskDuration) {
     this.taskDuration = taskDuration;
   }
 
+  /**
+   * Method related to stream metric.
+   *
+   * @return the streamMetrics
+   */
+  public final Map<String, List<StreamMetric>> getStreamMetric() {
+    return streamMetrics;
+  }
+
+  private void setStreamMetric(final Map<String, StreamMetric> streamMetricMap) {
+    for (String sourceVertexId : streamMetricMap.keySet()) {
+      StreamMetric streamMetric = streamMetricMap.get(sourceVertexId);
+      streamMetrics.putIfAbsent(sourceVertexId, new LinkedList<>());
+      streamMetrics.get(sourceVertexId).add(streamMetric);
+    }
+  }
+
+  /**
+   * Method related to latency.
+   */
+  public final Map<String, List<LatencyMetric>> getLatencymarks() {

Review comment:
       It is needed to storing json file, too. If it doesn't exist, metrics are not recorded in json file.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r707900374



##########
File path: common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdtaskId;
+  private String lastTaskId;
+  private final long timestamp;
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdtaskId = taskId;
+    this.timestamp = timestamp;
+    this.lastTaskId = "";
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * @return the task id where it is created
+   */
+  public String getCreatedtaskId() {

Review comment:
       This method is not explicitly used. In the process of storing metrics in the json format. It seems that the getter is automatically scanned and used, Because If I remove this method, createdTaskId field is not included in json file. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-913101243


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-913102745


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899437123


   @Lemarais Thanks for the reference. According to the reference, it looks like Flink measures only the record traversal time from src to sink within the system (not including queueing delay in sources, network delay, and buffered time in windows). 
   
   Is this for debugging purpose? Flink says that enabling this can significantly impact the performance of the cluster, so it is highly recommended to only use them for debugging purposes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
taegeonum commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-925577692


   @Lemarais ping :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] Lemarais commented on a change in pull request #317: [NEMO-483] Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
Lemarais commented on a change in pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#discussion_r690067001



##########
File path: runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
##########
@@ -106,6 +110,33 @@ private void setTaskDuration(final long taskDuration) {
     this.taskDuration = taskDuration;
   }
 
+  /**
+   * Method related to stream metric.
+   */
+  public final Map<String, List<StreamMetric>> getStreamMetric() {
+    return this.streamMetrics;
+  }
+
+  private void setStreamMetric(final Map<String, StreamMetric> streamMetricMap) {
+    for (String sourceVertexId : streamMetricMap.keySet()) {
+      StreamMetric streamMetric = streamMetricMap.get(sourceVertexId);
+      this.streamMetrics.putIfAbsent(sourceVertexId, new ArrayList<>());
+      this.streamMetrics.get(sourceVertexId).add(streamMetric);
+    }
+  }
+
+  /**
+   * Method related to latency.
+   */
+  public final Map<String, List<LatencyMetric>> getLatencymarks() {
+    return this.latencymarks;
+  }
+
+  private void addLatencymark(final LatencyMetric latencyMetric) {
+    this.latencymarks.putIfAbsent(latencyMetric.getLatencymark().getLastTaskId(), new ArrayList<>());

Review comment:
       I had a habit of using `ArrayList`. I'll change to `LinkedList`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #317: Record Metrics associated with stream processing

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #317:
URL: https://github.com/apache/incubator-nemo/pull/317#issuecomment-899748195


   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; ![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=317&resolved=false&types=CODE_SMELL)
   
   [![No Coverage information](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo-16px.png 'No Coverage information')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317) No Coverage information  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=317&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@nemo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org