You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/11/13 15:07:00 UTC

[jira] [Commented] (APEXCORE-781) Autometric values of an operator is showing wrongly in App master

    [ https://issues.apache.org/jira/browse/APEXCORE-781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16249694#comment-16249694 ] 

ASF GitHub Bot commented on APEXCORE-781:
-----------------------------------------

vrozov closed pull request #578: APEXCORE-781 Update the autometrics only if the number of partitions matches with the size of metric collection
URL: https://github.com/apache/apex-core/pull/578
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 07641d23e2..1566d2dd66 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -258,6 +258,7 @@
   private final Map<String, Queue<Pair<Long, Map<String, Object>>>> logicalMetrics = Maps.newConcurrentMap();
   //logical operator name to latest logical metrics.
   private final Map<String, Map<String, Object>> latestLogicalMetrics = Maps.newHashMap();
+  private final Map<String, Pair<Long, Map<String, Object>>> latestLogicalCompletedMetrics = Maps.newHashMap();
 
   //logical operator name to latest counters. exists for backward compatibility.
   private final Map<String, Object> latestLogicalCounters = Maps.newHashMap();
@@ -934,6 +935,7 @@ private void aggregateMetrics(long windowId, Map<Integer, EndWindowStats> endWin
           metricPool.add(physicalMetrics);
         }
       }
+
       if (metricPool.isEmpty()) {
         //nothing to aggregate
         continue;
@@ -960,6 +962,9 @@ public boolean add(Pair<Long, Map<String, Object>> longMapPair)
         }
         LOG.debug("Adding to logical metrics for {}", operatorMeta.getName());
         windowMetrics.add(new Pair<>(windowId, lm));
+        if (metricPool.size() == physicalOperators.size()) {
+          latestLogicalCompletedMetrics.put(operatorMeta.getName(), new Pair<>(windowId, lm));
+        }
         Map<String, Object> oldValue = latestLogicalMetrics.put(operatorMeta.getName(), lm);
         if (oldValue == null) {
           try {
@@ -2495,11 +2500,16 @@ public OperatorInfo getOperatorInfo(int operatorId)
 
   public LogicalOperatorInfo getLogicalOperatorInfo(String operatorName)
   {
+    return getLogicalOperatorInfo(operatorName, true);
+  }
+
+  public LogicalOperatorInfo getLogicalOperatorInfo(String operatorName, boolean isPartialAggregates)
+  {
     OperatorMeta operatorMeta = getLogicalPlan().getOperatorMeta(operatorName);
     if (operatorMeta == null) {
       return null;
     }
-    return fillLogicalOperatorInfo(operatorMeta);
+    return fillLogicalOperatorInfo(operatorMeta, isPartialAggregates);
   }
 
   public ModuleMeta getModuleMeta(String moduleName)
@@ -2526,7 +2536,7 @@ private ModuleMeta getModuleMeta(String moduleName, LogicalPlan dag)
     List<LogicalOperatorInfo> infoList = new ArrayList<>();
     Collection<OperatorMeta> allOperators = getLogicalPlan().getAllOperators();
     for (OperatorMeta operatorMeta : allOperators) {
-      infoList.add(fillLogicalOperatorInfo(operatorMeta));
+      infoList.add(getLogicalOperatorInfo(operatorMeta.getName()));
     }
     return infoList;
   }
@@ -2608,7 +2618,7 @@ private OperatorInfo fillPhysicalOperatorInfo(PTOperator operator)
     return oi;
   }
 
-  private LogicalOperatorInfo fillLogicalOperatorInfo(OperatorMeta operator)
+  private LogicalOperatorInfo fillLogicalOperatorInfo(OperatorMeta operator, boolean isPartialAggregates)
   {
     LogicalOperatorInfo loi = new LogicalOperatorInfo();
     loi.name = operator.getName();
@@ -2673,7 +2683,12 @@ private LogicalOperatorInfo fillLogicalOperatorInfo(OperatorMeta operator)
     if (physicalOperators.size() > 0 && checkpointTimeAggregate.getAvg() != null) {
       loi.checkpointTimeMA = checkpointTimeAggregate.getAvg().longValue();
       loi.counters = latestLogicalCounters.get(operator.getName());
-      loi.autoMetrics = latestLogicalMetrics.get(operator.getName());
+      if (isPartialAggregates) {
+        loi.autoMetrics = latestLogicalMetrics.get(operator.getName());
+      } else {
+        loi.autoMetrics = latestLogicalCompletedMetrics.get(operator.getName());
+      }
+
     }
 
     return loi;
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
index 995127c1b6..5e0eea3a84 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
@@ -574,6 +574,21 @@ public JSONObject getLogicalOperators() throws Exception
   }
 
   @GET
+  @Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}/metrics")
+  @Produces(MediaType.APPLICATION_JSON)
+  public JSONObject getLogicalOperator(@PathParam("operatorName") String operatorName, @QueryParam("isPartialAggregates") Boolean isPartialAggregates) throws Exception
+  {
+    init();
+    OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
+    if (logicalOperator == null) {
+      throw new NotFoundException();
+    }
+
+    LogicalOperatorInfo logicalOperatorInfo = dagManager.getLogicalOperatorInfo(operatorName, isPartialAggregates);
+    return new JSONObject(objectMapper.writeValueAsString(logicalOperatorInfo));
+  }
+
+  @GET
   @Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}")
   @Produces(MediaType.APPLICATION_JSON)
   public JSONObject getLogicalOperator(@PathParam("operatorName") String operatorName) throws Exception


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Autometric values of an operator is showing wrongly in App master 
> ------------------------------------------------------------------
>
>                 Key: APEXCORE-781
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-781
>             Project: Apache Apex Core
>          Issue Type: Bug
>            Reporter: Chaitanya
>            Assignee: Chaitanya
>
> Observation: 
>    - This was happening only if the operator has more than one partition. 
>    - In StreamingContainerManager, sometimes the size of collection of metric values and # of physical partitions of an operator is not matching. Due to this reason, the auto-metrics value is showing wrongly. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)