You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by davidyan74 <gi...@git.apache.org> on 2015/12/30 02:43:44 UTC

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

GitHub user davidyan74 opened a pull request:

    https://github.com/apache/incubator-apex-core/pull/194

    APEXCORE-201 changed the way latency is calculated and fixed the prob…

    …lem when latency is stalled when an operator falls behind too many windows

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davidyan74/incubator-apex-core APEXCORE-201

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-core/pull/194.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #194
    
----
commit 68c500090efd3395ac4205f84b67756599178500
Author: David Yan <da...@datatorrent.com>
Date:   2015-12-30T01:35:04Z

    APEXCORE-201 changed the way latency is calculated and fixed the problem when latency is stalled when an operator falls behind too many windows

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#discussion_r50594505
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -1357,12 +1265,14 @@ private void processOperatorDeployStatus(final PTOperator oper, OperatorHeartbea
                     deactivatedOpers.add(oper);
                   }
                   sca.undeployOpers.add(oper.getId());
    +              slowestUpstreamOp.remove(oper);
    --- End diff --
    
    Can we have test for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#discussion_r50607181
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -1695,6 +1607,42 @@ public void run()
                 }
                 endWindowStatsMap.put(shb.getNodeId(), endWindowStats);
     
    +            if (!oper.getInputs().isEmpty()) {
    +              long latency = Long.MAX_VALUE;
    --- End diff --
    
    @davidyan74 : Do we need to care about following scenario
    Operator C has two upstream A and B. Current latency of A > B and based on that C adds A as it's slowest upstream but then B's stats come and now latency of B > A but C is still pointing A as it's slowest upstream...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#issuecomment-173432460
  
    @tweise please also review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#issuecomment-173411954
  
    @davidyan74 : Do you not want to calculate this at same window boundaries? Different leaf operators could be at different windowIds and similarly different upstream operators for a single leaf operator could be at different windowIds... 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

Posted by sandeshh <gi...@git.apache.org>.
Github user sandeshh commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#issuecomment-169098606
  
    There are no new unit test cases. Is it not required ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#issuecomment-168044423
  
    @vrozov please review.  this is for the stale latency problem you observed when there is a slow operator in the DAG


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#discussion_r50610441
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -913,106 +899,28 @@ private void saveMetaInfo() throws IOException
         return logicalMetrics.get(operatorName);
       }
     
    -  private void calculateLatency(PTOperator oper, Map<Integer, EndWindowStats> endWindowStatsMap, Set<PTOperator> endWindowStatsVisited, Set<PTOperator> leafOperators)
    +  private CriticalPathInfo findCriticalPath()
       {
    -    endWindowStatsVisited.add(oper);
    -    OperatorStatus operatorStatus = oper.stats;
    -
    -    EndWindowStats endWindowStats = endWindowStatsMap.get(oper.getId());
    -    if (endWindowStats == null) {
    -      LOG.info("End window stats is null for operator {}, probably a new operator after partitioning", oper);
    -      return;
    -    }
    -
    -    // find the maximum end window emit time from all input ports
    -    long upstreamMaxEmitTimestamp = -1;
    -    PTOperator upstreamMaxEmitTimestampOperator = null;
    -    for (PTOperator.PTInput input : oper.getInputs()) {
    -      if (null != input.source.source) {
    -        PTOperator upstreamOp = input.source.source;
    -        EndWindowStats upstreamEndWindowStats = endWindowStatsMap.get(upstreamOp.getId());
    -        if (upstreamEndWindowStats == null) {
    -          LOG.info("End window stats is null for operator {}", oper);
    -          return;
    -        }
    -        long adjustedEndWindowEmitTimestamp = upstreamEndWindowStats.emitTimestamp;
    -        MovingAverageLong rpcLatency = rpcLatencies.get(upstreamOp.getContainer().getExternalId());
    -        if (rpcLatency != null) {
    -          adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
    -        }
    -        if (adjustedEndWindowEmitTimestamp > upstreamMaxEmitTimestamp) {
    -          upstreamMaxEmitTimestamp = adjustedEndWindowEmitTimestamp;
    -          upstreamMaxEmitTimestampOperator = upstreamOp;
    -        }
    -      }
    -    }
    -
    -    if (upstreamMaxEmitTimestamp > 0) {
    -      long adjustedEndWindowEmitTimestamp = endWindowStats.emitTimestamp;
    -      MovingAverageLong rpcLatency = rpcLatencies.get(oper.getContainer().getExternalId());
    -      if (rpcLatency != null) {
    -        adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
    -      }
    -      if (upstreamMaxEmitTimestamp <= adjustedEndWindowEmitTimestamp) {
    -        LOG.debug("Adding {} to latency MA for {}", adjustedEndWindowEmitTimestamp - upstreamMaxEmitTimestamp, oper);
    -        operatorStatus.latencyMA.add(adjustedEndWindowEmitTimestamp - upstreamMaxEmitTimestamp);
    -      } else {
    -        operatorStatus.latencyMA.add(0);
    -        if (lastLatencyWarningTime < System.currentTimeMillis() - LATENCY_WARNING_THRESHOLD_MILLIS) {
    -          LOG.warn("Latency calculation for this operator may not be correct because upstream end window timestamp is greater than this operator's end window timestamp: {} ({}) > {} ({}). Please verify that the system clocks are in sync in your cluster. You can also try tweaking the RPC_LATENCY_COMPENSATION_SAMPLES application attribute (currently set to {}).",
    -                  upstreamMaxEmitTimestamp, upstreamMaxEmitTimestampOperator, adjustedEndWindowEmitTimestamp, oper, this.vars.rpcLatencyCompensationSamples);
    -          lastLatencyWarningTime = System.currentTimeMillis();
    -        }
    -      }
    -    }
    -
    -    if (oper.getOutputs().isEmpty()) {
    -      // it is a leaf operator
    -      leafOperators.add(oper);
    -    }
    -    else {
    -      for (PTOperator.PTOutput output : oper.getOutputs()) {
    -        for (PTOperator.PTInput input : output.sinks) {
    -          if (input.target != null) {
    -            PTOperator downStreamOp = input.target;
    -            if (!endWindowStatsVisited.contains(downStreamOp)) {
    -              calculateLatency(downStreamOp, endWindowStatsMap, endWindowStatsVisited, leafOperators);
    -            }
    -          }
    -        }
    +    CriticalPathInfo result = null;
    +    List<PTOperator> leafOperators = plan.getLeafOperators();
    +    for (PTOperator leafOperator : leafOperators) {
    +      CriticalPathInfo cpi = new CriticalPathInfo();
    +      findCriticalPathHelper(leafOperator, cpi);
    +      if (result == null || result.latency < cpi.latency) {
    +        result = cpi;
           }
         }
    +    return result;
       }
    -  /*
    -   * returns cumulative latency
    -   */
    -  private long findCriticalPath(Map<Integer, EndWindowStats> endWindowStatsMap, Set<PTOperator> operators, LinkedList<Integer> criticalPath)
    -  {
    -    long maxEndWindowTimestamp = 0;
    -    PTOperator maxOperator = null;
    -    for (PTOperator operator : operators) {
    -      EndWindowStats endWindowStats = endWindowStatsMap.get(operator.getId());
    -      if (maxEndWindowTimestamp < endWindowStats.emitTimestamp) {
    -        maxEndWindowTimestamp = endWindowStats.emitTimestamp;
    -        maxOperator = operator;
    -      }
    -    }
    -    if (maxOperator == null) {
    -      return 0;
    -    }
    -    criticalPath.addFirst(maxOperator.getId());
    -    OperatorStatus operatorStatus = maxOperator.stats;
     
    -    operators.clear();
    -    if (maxOperator.getInputs() == null || maxOperator.getInputs().isEmpty()) {
    -      return operatorStatus.latencyMA.getAvg();
    -    }
    -    for (PTOperator.PTInput input : maxOperator.getInputs()) {
    -      if (null != input.source.source) {
    -        operators.add(input.source.source);
    -      }
    +  private void findCriticalPathHelper(PTOperator operator, CriticalPathInfo cpi)
    +  {
    +    cpi.latency += operator.stats.getLatencyMA();
    --- End diff --
    
    Can this be optimized to not visit all the upstream operators if they have been already visited. 
    E.g.  If I have following dag
    <pre>
              |-->C 
    A--> B -->|-->D
    </pre>
    During finding critical path calculation, A,B are traversed for both leaf nodes C and D. Can we not store intermediary values for A, B when it is done either from C or D and use same values for other leaf D or C respectively?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#issuecomment-173416610
  
    @gauravgopi123 Calculating latency at same window boundaries was the original implementation. But that means the latency of some operators may not be up to date because of one slow operator in the DAG.  If an operator is 1 minute slower than the rest of the DAG, the reported latency of all other operators is one minute old.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#issuecomment-176856909
  
    Please open new PR against master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#discussion_r50626894
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -1695,6 +1607,42 @@ public void run()
                 }
                 endWindowStatsMap.put(shb.getNodeId(), endWindowStats);
     
    +            if (!oper.getInputs().isEmpty()) {
    +              long latency = Long.MAX_VALUE;
    --- End diff --
    
    @gauravgopi123 The slowest upstream operator should be updated upon the next stats update, correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#issuecomment-167915523
  
    @tweise please review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#discussion_r50581112
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -1402,6 +1313,7 @@ private void processOperatorDeployStatus(final PTOperator oper, OperatorHeartbea
             if (ds != null) {
               // operator was removed and needs to be undeployed from container
               sca.undeployOpers.add(oper.getId());
    +          slowestUpstreamOp.remove(oper.getId());
    --- End diff --
    
    Should we add test case to cover this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#discussion_r50626827
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -913,106 +899,28 @@ private void saveMetaInfo() throws IOException
         return logicalMetrics.get(operatorName);
       }
     
    -  private void calculateLatency(PTOperator oper, Map<Integer, EndWindowStats> endWindowStatsMap, Set<PTOperator> endWindowStatsVisited, Set<PTOperator> leafOperators)
    +  private CriticalPathInfo findCriticalPath()
       {
    -    endWindowStatsVisited.add(oper);
    -    OperatorStatus operatorStatus = oper.stats;
    -
    -    EndWindowStats endWindowStats = endWindowStatsMap.get(oper.getId());
    -    if (endWindowStats == null) {
    -      LOG.info("End window stats is null for operator {}, probably a new operator after partitioning", oper);
    -      return;
    -    }
    -
    -    // find the maximum end window emit time from all input ports
    -    long upstreamMaxEmitTimestamp = -1;
    -    PTOperator upstreamMaxEmitTimestampOperator = null;
    -    for (PTOperator.PTInput input : oper.getInputs()) {
    -      if (null != input.source.source) {
    -        PTOperator upstreamOp = input.source.source;
    -        EndWindowStats upstreamEndWindowStats = endWindowStatsMap.get(upstreamOp.getId());
    -        if (upstreamEndWindowStats == null) {
    -          LOG.info("End window stats is null for operator {}", oper);
    -          return;
    -        }
    -        long adjustedEndWindowEmitTimestamp = upstreamEndWindowStats.emitTimestamp;
    -        MovingAverageLong rpcLatency = rpcLatencies.get(upstreamOp.getContainer().getExternalId());
    -        if (rpcLatency != null) {
    -          adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
    -        }
    -        if (adjustedEndWindowEmitTimestamp > upstreamMaxEmitTimestamp) {
    -          upstreamMaxEmitTimestamp = adjustedEndWindowEmitTimestamp;
    -          upstreamMaxEmitTimestampOperator = upstreamOp;
    -        }
    -      }
    -    }
    -
    -    if (upstreamMaxEmitTimestamp > 0) {
    -      long adjustedEndWindowEmitTimestamp = endWindowStats.emitTimestamp;
    -      MovingAverageLong rpcLatency = rpcLatencies.get(oper.getContainer().getExternalId());
    -      if (rpcLatency != null) {
    -        adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
    -      }
    -      if (upstreamMaxEmitTimestamp <= adjustedEndWindowEmitTimestamp) {
    -        LOG.debug("Adding {} to latency MA for {}", adjustedEndWindowEmitTimestamp - upstreamMaxEmitTimestamp, oper);
    -        operatorStatus.latencyMA.add(adjustedEndWindowEmitTimestamp - upstreamMaxEmitTimestamp);
    -      } else {
    -        operatorStatus.latencyMA.add(0);
    -        if (lastLatencyWarningTime < System.currentTimeMillis() - LATENCY_WARNING_THRESHOLD_MILLIS) {
    -          LOG.warn("Latency calculation for this operator may not be correct because upstream end window timestamp is greater than this operator's end window timestamp: {} ({}) > {} ({}). Please verify that the system clocks are in sync in your cluster. You can also try tweaking the RPC_LATENCY_COMPENSATION_SAMPLES application attribute (currently set to {}).",
    -                  upstreamMaxEmitTimestamp, upstreamMaxEmitTimestampOperator, adjustedEndWindowEmitTimestamp, oper, this.vars.rpcLatencyCompensationSamples);
    -          lastLatencyWarningTime = System.currentTimeMillis();
    -        }
    -      }
    -    }
    -
    -    if (oper.getOutputs().isEmpty()) {
    -      // it is a leaf operator
    -      leafOperators.add(oper);
    -    }
    -    else {
    -      for (PTOperator.PTOutput output : oper.getOutputs()) {
    -        for (PTOperator.PTInput input : output.sinks) {
    -          if (input.target != null) {
    -            PTOperator downStreamOp = input.target;
    -            if (!endWindowStatsVisited.contains(downStreamOp)) {
    -              calculateLatency(downStreamOp, endWindowStatsMap, endWindowStatsVisited, leafOperators);
    -            }
    -          }
    -        }
    +    CriticalPathInfo result = null;
    +    List<PTOperator> leafOperators = plan.getLeafOperators();
    +    for (PTOperator leafOperator : leafOperators) {
    +      CriticalPathInfo cpi = new CriticalPathInfo();
    +      findCriticalPathHelper(leafOperator, cpi);
    +      if (result == null || result.latency < cpi.latency) {
    +        result = cpi;
           }
         }
    +    return result;
       }
    -  /*
    -   * returns cumulative latency
    -   */
    -  private long findCriticalPath(Map<Integer, EndWindowStats> endWindowStatsMap, Set<PTOperator> operators, LinkedList<Integer> criticalPath)
    -  {
    -    long maxEndWindowTimestamp = 0;
    -    PTOperator maxOperator = null;
    -    for (PTOperator operator : operators) {
    -      EndWindowStats endWindowStats = endWindowStatsMap.get(operator.getId());
    -      if (maxEndWindowTimestamp < endWindowStats.emitTimestamp) {
    -        maxEndWindowTimestamp = endWindowStats.emitTimestamp;
    -        maxOperator = operator;
    -      }
    -    }
    -    if (maxOperator == null) {
    -      return 0;
    -    }
    -    criticalPath.addFirst(maxOperator.getId());
    -    OperatorStatus operatorStatus = maxOperator.stats;
     
    -    operators.clear();
    -    if (maxOperator.getInputs() == null || maxOperator.getInputs().isEmpty()) {
    -      return operatorStatus.latencyMA.getAvg();
    -    }
    -    for (PTOperator.PTInput input : maxOperator.getInputs()) {
    -      if (null != input.source.source) {
    -        operators.add(input.source.source);
    -      }
    +  private void findCriticalPathHelper(PTOperator operator, CriticalPathInfo cpi)
    +  {
    +    cpi.latency += operator.stats.getLatencyMA();
    --- End diff --
    
    @gauravgopi123 Yes, you are correct.  We actually need to change this code anyway because the iteration feature has been merged.  Will do that when I come back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#discussion_r50580402
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -1402,6 +1313,7 @@ private void processOperatorDeployStatus(final PTOperator oper, OperatorHeartbea
             if (ds != null) {
               // operator was removed and needs to be undeployed from container
               sca.undeployOpers.add(oper.getId());
    +          slowestUpstreamOp.remove(oper.getId());
    --- End diff --
    
    @davidyan74 : Should this be slowestUpstreamOp.remove(oper) ??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#discussion_r50730068
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -1695,6 +1607,42 @@ public void run()
                 }
                 endWindowStatsMap.put(shb.getNodeId(), endWindowStats);
     
    +            if (!oper.getInputs().isEmpty()) {
    +              long latency = Long.MAX_VALUE;
    --- End diff --
    
    @davidyan74 : Consider following scenario
    For window W1, stats are received in following order A,C,B and Latency (A) < Latency(B) but since B arrived later than C, C adds A as it's slowest upstream.
    
    For Window W2, stats are received in B,A,C order and new Latency(A) > new Latency(B), now again C adds A as it's slowest upstream. 
    
    B is never added as slowest upstream for C in this scenario. 
    
    Again it depends on how accurate you want to show the latency values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/194#discussion_r50581793
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -1402,6 +1313,7 @@ private void processOperatorDeployStatus(final PTOperator oper, OperatorHeartbea
             if (ds != null) {
               // operator was removed and needs to be undeployed from container
               sca.undeployOpers.add(oper.getId());
    +          slowestUpstreamOp.remove(oper.getId());
    --- End diff --
    
    @gauravgopi123  thanks for catching that. fixed!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-201 changed the way lat...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-core/pull/194


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---