You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by ishark <gi...@git.apache.org> on 2016/02/29 23:52:09 UTC

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

GitHub user ishark opened a pull request:

    https://github.com/apache/incubator-apex-core/pull/250

    APEXCORE-10 #resolve Changes for supporting anti-affinity in operators

    Added list of affinity rules as attribute in DAGContext
    Added validation for affinity, anti-affinity and locality rules in dag validate phase
    Added handling of Container and Node affinity rules. Rack affinity is not supported at this point
    Also added Changes for supporting Node specific request in Cloudera
    Added Unit tests for affinity rules and dag validation for the same
    Added Json String codec for setting affinity rules through properties.xml
    Added Set Affinity and anti -affinity APIs to dag

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ishark/incubator-apex-core APEXCORE-10

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-core/pull/250.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #250
    
----
commit d62a21e73b069fc66af0102f8b8884758abfb252
Author: ishark <is...@datatorrent.com>
Date:   2016-02-29T22:16:51Z

    APEXCORE-10 #resolve Changes for supporting anti-affinity amongst operators
    Added list of affinity rules as attribute in DAGContext
    Added validation for affinity, anti-affinity and locality rules in dag validate phase
    Added handling of Container and Node affinity rules. Rack affinity is not supported at this point
    Also added Changes for supporting Node specific request in Cloudera
    Added Unit tests for affinity rules and dag validation for the same
    Added Json String codec for setting affinity rules through properties.xml
    Added Set Affinity and anti -affinity APIs to dag

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#issuecomment-201640472
  
    @ishark there are merge conflicts, can you please rebase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57507498
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/ContainerRequestHandler.java ---
    @@ -0,0 +1,59 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.tuple.MutablePair;
    +import org.apache.hadoop.yarn.client.api.AMRMClient;
    +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
    +
    +/**
    + * Handles creating container requests and reissuing them on timeout
    + *
    + */
    +public class ContainerRequestHandler
    +{
    +  protected static final int NUMBER_MISSED_HEARTBEATS = 30;
    +
    +  public void reissueContainerRequests(AMRMClient<ContainerRequest> amRmClient, Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, ResourceRequestHandler resourceRequestor, List<ContainerRequest> containerRequests, List<ContainerRequest> removedContainerRequests)
    +  {
    +    if (!requestedResources.isEmpty()) {
    +      // resourceRequestor.clearNodeMapping();
    +      for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> entry : requestedResources.entrySet()) {
    +        if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) {
    --- End diff --
    
    A line comment to that effect may be helpful (enhancement :-).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57506420
  
    --- Diff: api/src/main/java/com/datatorrent/api/AffinityRule.java ---
    @@ -0,0 +1,211 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.api;
    +
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import com.datatorrent.api.DAG.Locality;
    +
    +/**
    + * Affinity rule specifies constraints for physical deployment of operator
    + * containers. There are two types of rules that can be specified: Affinity and
    + * Anti-affinity. Each rule contains list of operators or pair of 2 operators or
    + * a regex that should match at least 2 operators. Based on the type of rule,
    + * affinity or anti-affinity, the operators will be deployed together or away
    + * from each other. The locality indicates the level at which the rule should be
    + * applied. E.g. CONTAINER_LOCAL affinity would indicate operators Should be
    + * allocated within same container NODE_LOCAL anti-affinity indicates that the
    + * operators should not be allocated on the same node. The rule can be either
    + * strict or relaxed.
    + *
    + */
    +public class AffinityRule implements Serializable
    +{
    +  @Override
    +  public String toString()
    +  {
    +    return "AffinityRule {operatorsList=" + operatorsList + ", operatorRegex=" + operatorRegex + ", operators="
    +        + operators + ", locality=" + locality + ", type=" + type + ", relaxLocality=" + relaxLocality + "}";
    +  }
    +
    +  private static final long serialVersionUID = 107131504929875386L;
    +
    +  /**
    +   * Pair of operator names to specify affinity rule
    +   * The order of operators is not considered in this class
    +   * i.e. OperatorPair("O1", "O2") is equal to OperatorPair("O2", "O1")
    +   */
    +  public static class OperatorPair implements Serializable
    --- End diff --
    
    Why not use a generic pair (or extend it)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57507595
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/ContainerRequestHandlerCloudera.java ---
    @@ -0,0 +1,138 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.commons.lang3.tuple.MutablePair;
    +import org.apache.hadoop.yarn.client.api.AMRMClient;
    +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
    +
    +/**
    + * Handles creating container requests and issuing node-specific container
    + * requests by blacklisting specifically for cloudera
    + */
    +public class ContainerRequestHandlerCloudera extends ContainerRequestHandler
    +{
    +  HashMap<ContainerRequest, ContainerStartRequest> hostSpecificRequests = new HashMap<>();
    +  HashMap<ContainerRequest, ContainerStartRequest> otherContainerRequests = new HashMap<>();
    +  HashMap<String, List<ContainerRequest>> hostSpecificRequestsMap = new HashMap<>();
    +  List<String> blacklistedNodesForHostSpecificRequests = null;
    +
    +  public void reissueContainerRequests(AMRMClient<ContainerRequest> amRmClient, Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, ResourceRequestHandler resourceRequestor, List<ContainerRequest> containerRequests, List<ContainerRequest> removedContainerRequests)
    +  {
    +    // Issue all host specific requests first
    +    if (!hostSpecificRequestsMap.isEmpty() && requestedResources.isEmpty()) {
    +      LOG.info("Issue Host specific requests first");
    +      // Blacklist all the nodes and issue request for host specific
    +      Entry<String, List<ContainerRequest>> set = hostSpecificRequestsMap.entrySet().iterator().next();
    +      List<ContainerRequest> requests = set.getValue();
    +      List<String> blacklistNodes = resourceRequestor.getNodesExceptHost(requests.get(0).getNodes());
    +      amRmClient.updateBlacklist(blacklistNodes, requests.get(0).getNodes());
    +      blacklistedNodesForHostSpecificRequests = blacklistNodes;
    +      LOG.info("Sending {} request(s) after blacklisting nodes {} and removed host from request {}", requests.size(), blacklistNodes, requests.get(0).getNodes());
    --- End diff --
    
    Maybe it is better to log blacklisting all but ... since the list of nodes can be very large.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#issuecomment-207146683
  
    Have addressed review comments and updated diff.
    Tests ran fine locally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58927359
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---
    @@ -344,9 +346,67 @@ public PhysicalPlan(LogicalPlan dag, PlanContext ctx) {
             addLogicalOperator(n);
           }
         }
    +    
    +    inlinePrefs.prefs.clear();
    +    localityPrefs.prefs.clear();
    +    
    +    // Add inlinePrefs and localityPreds for affinity rules
    --- End diff --
    
    How do we handle this with dynamic plan changes? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#issuecomment-205941259
  
    Have addressed most of the review comments except convenience APIs in DAG. Did quick sanity testing on Cloudera after changes.
    @tweise Could you please review?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58925665
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---
    @@ -344,9 +346,67 @@ public PhysicalPlan(LogicalPlan dag, PlanContext ctx) {
             addLogicalOperator(n);
           }
         }
    +    
    +    inlinePrefs.prefs.clear();
    +    localityPrefs.prefs.clear();
    +    
    +    // Add inlinePrefs and localityPreds for affinity rules
    --- End diff --
    
    typo


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#issuecomment-191536088
  
    @tweise Pleas review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58977816
  
    --- Diff: api/src/main/java/com/datatorrent/api/StringCodec.java ---
    @@ -377,4 +384,43 @@ public String toString(Class<? extends T> clazz)
         private static final long serialVersionUID = 201312082053L;
       }
     
    +  public class JsonStringCodec<T> implements StringCodec<T>, Serializable
    +  {
    +    private static final long serialVersionUID = 2513932518264776006L;
    +    Class<?> clazz;
    +
    +    public JsonStringCodec(Class<T> clazz)
    +    {
    +      this.clazz = clazz;
    +    }
    +
    +    @Override
    +    public T fromString(String string)
    +    {
    +      try {
    +        ObjectMapper mapper = new ObjectMapper();
    +        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    +        ObjectReader reader = mapper.reader(clazz);
    +        return reader.readValue(string);
    +      } catch (IOException e) {
    +        Throwables.propagate(e);
    --- End diff --
    
    Would be nice to fix the other occurrences in this file also and remove usage of DTThrowable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58926771
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---
    @@ -357,25 +417,51 @@ public PhysicalPlan(LogicalPlan dag, PlanContext ctx) {
               if (!container.operators.isEmpty()) {
                 LOG.warn("Operator {} shares container without locality contraint due to insufficient resources.", oper);
               }
    +          // TODO: Check if PT Operators conflict in anti-affinity, Pick the first container that does not conflict
    --- End diff --
    
    Why would this happen here and not during logical plan validation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57507404
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/ContainerRequestHandler.java ---
    @@ -0,0 +1,59 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.tuple.MutablePair;
    +import org.apache.hadoop.yarn.client.api.AMRMClient;
    +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
    +
    +/**
    + * Handles creating container requests and reissuing them on timeout
    + *
    + */
    +public class ContainerRequestHandler
    +{
    +  protected static final int NUMBER_MISSED_HEARTBEATS = 30;
    +
    +  public void reissueContainerRequests(AMRMClient<ContainerRequest> amRmClient, Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, ResourceRequestHandler resourceRequestor, List<ContainerRequest> containerRequests, List<ContainerRequest> removedContainerRequests)
    +  {
    +    if (!requestedResources.isEmpty()) {
    +      // resourceRequestor.clearNodeMapping();
    +      for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> entry : requestedResources.entrySet()) {
    +        if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) {
    --- End diff --
    
    This flow actually did not change from previously. I just moved it in a separate class. If container request was not honored by Yarn within this time frame, we reissue the request. This class was added to provide an override for Cloudera to manage Node specific request. Will update the Jira with these details.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57506744
  
    --- Diff: api/pom.xml ---
    @@ -70,14 +70,6 @@
               <groupId>commons-beanutils</groupId>
             </exclusion>
             <exclusion>
    -          <artifactId>jackson-core-asl</artifactId>
    --- End diff --
    
    This was added for supporting setting of Affinity Rules from properties.xml. Added a JsonStringCodec for reading Affinity Rule Json..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57508355
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java ---
    @@ -108,6 +113,23 @@ public void updateNodeReports(List<NodeReport> nodeReports)
         }
       }
     
    +  public List<String> getNodesExceptHost(List<String> hostNames)
    --- End diff --
    
    Does hostNames always contain one element? The contains operation is expensive...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57507058
  
    --- Diff: api/src/main/java/com/datatorrent/api/StringCodec.java ---
    @@ -377,4 +383,43 @@ public String toString(Class<? extends T> clazz)
         private static final long serialVersionUID = 201312082053L;
       }
     
    +  public class JsonStringCodec<T> implements StringCodec<T>, Serializable
    +  {
    +    private static final long serialVersionUID = 2513932518264776006L;
    +    Class<?> clazz;
    +
    +    public JsonStringCodec(Class<T> clazz)
    +    {
    +      this.clazz = clazz;
    +    }
    +
    +    @Override
    +    public T fromString(String string)
    +    {
    +      try {
    +        ObjectMapper mapper = new ObjectMapper();
    +        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    +        ObjectReader reader = mapper.reader(clazz);
    +        return reader.readValue(string);
    +      } catch (IOException e) {
    +        DTThrowable.wrapIfChecked(e);
    +      }
    +      return null;
    +    }
    +
    +    @Override
    +    public String toString(T pojo)
    +    {
    +      try {
    +        ObjectMapper mapper = new ObjectMapper();
    +        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    +        ObjectWriter writer = mapper.writer();
    +        return writer.writeValueAsString(pojo);
    +      } catch (IOException e) {
    +        DTThrowable.wrapIfChecked(e);
    --- End diff --
    
    This is not specific to this PR but in general, should we consider using Throwables.propagate() instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57506922
  
    --- Diff: api/src/main/java/com/datatorrent/api/DAG.java ---
    @@ -250,6 +250,24 @@
       public abstract <T> void setAttribute(Operator operator, Attribute<T> key, T value);
     
       /**
    +   * Set affinity between operators
    +   * @param locality
    +   * @param relaxLocality
    +   * @param first operator
    +   * @param one or more operators
    +   */
    +  public abstract void setAffinity(Locality locality, boolean relaxLocality, String firstOperator, String... operators);
    --- End diff --
    
    Yes, the affinity/anti-affinity can be set from attributes directly. These were added as convenience methods as per discussion on dev mailing list. Here are sample usages with these methods:
    https://github.com/ishark/Apex-Samples/blob/master/affinity-example/src/main/java/com/example/affinity/ApplicationDagAPIs.java#L33-L34
    
    Compared to when setting via attributes:
    https://github.com/ishark/Apex-Samples/blob/master/affinity-example/src/main/java/com/example/affinity/ApplicationDagContext.java#L42-L54


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57508595
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java ---
    @@ -170,19 +205,69 @@ public String getHost(ContainerStartRequest csr, boolean first)
                 containers.add(nodeLocalOper.getContainer());
               }
             }
    -        for (Map.Entry<String, NodeReport> nodeEntry : nodeReportMap.entrySet()) {
    -          int memAvailable = nodeEntry.getValue().getCapability().getMemory() - nodeEntry.getValue().getUsed().getMemory();
    -          int vCoresAvailable = nodeEntry.getValue().getCapability().getVirtualCores() - nodeEntry.getValue().getUsed().getVirtualCores();
    -          if (memAvailable >= aggrMemory && vCoresAvailable >= vCores) {
    -            host = nodeEntry.getKey();
    -            grpObj.setHost(host);
    -            nodeLocalMapping.put(nodeLocalSet, host);
    -            return host;
    -          }
    +        host = assignHost(host, antiHosts, antiPreferredHosts, grpObj, nodeLocalSet, aggrMemory, vCores);
    +
    +        if (host == null && !antiPreferredHosts.isEmpty() && !antiHosts.isEmpty()) {
    +          // Drop the preferred constraint and try allocation
    +          antiPreferredHosts.clear();
    +          host = assignHost(host, antiHosts, antiPreferredHosts, grpObj, nodeLocalSet, aggrMemory, vCores);
    +        }
    +        if (host != null) {
    +          antiAffinityMapping.put(c, host);
    +        } else {
    +          host = INVALID_HOST;
             }
           }
         }
    +    LOG.info("Found host {}", host);
         return host;
       }
     
    +  public void populateAntiHostList(PTContainer c, List<String> antiHosts)
    --- End diff --
    
    Add javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58955811
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---
    @@ -357,25 +417,51 @@ public PhysicalPlan(LogicalPlan dag, PlanContext ctx) {
               if (!container.operators.isEmpty()) {
                 LOG.warn("Operator {} shares container without locality contraint due to insufficient resources.", oper);
               }
    +          // TODO: Check if PT Operators conflict in anti-affinity, Pick the first container that does not conflict
    --- End diff --
    
    Oh, yes then the check won't be needed. Will remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57508604
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java ---
    @@ -170,19 +205,69 @@ public String getHost(ContainerStartRequest csr, boolean first)
                 containers.add(nodeLocalOper.getContainer());
               }
             }
    -        for (Map.Entry<String, NodeReport> nodeEntry : nodeReportMap.entrySet()) {
    -          int memAvailable = nodeEntry.getValue().getCapability().getMemory() - nodeEntry.getValue().getUsed().getMemory();
    -          int vCoresAvailable = nodeEntry.getValue().getCapability().getVirtualCores() - nodeEntry.getValue().getUsed().getVirtualCores();
    -          if (memAvailable >= aggrMemory && vCoresAvailable >= vCores) {
    -            host = nodeEntry.getKey();
    -            grpObj.setHost(host);
    -            nodeLocalMapping.put(nodeLocalSet, host);
    -            return host;
    -          }
    +        host = assignHost(host, antiHosts, antiPreferredHosts, grpObj, nodeLocalSet, aggrMemory, vCores);
    +
    +        if (host == null && !antiPreferredHosts.isEmpty() && !antiHosts.isEmpty()) {
    +          // Drop the preferred constraint and try allocation
    +          antiPreferredHosts.clear();
    +          host = assignHost(host, antiHosts, antiPreferredHosts, grpObj, nodeLocalSet, aggrMemory, vCores);
    +        }
    +        if (host != null) {
    +          antiAffinityMapping.put(c, host);
    +        } else {
    +          host = INVALID_HOST;
             }
           }
         }
    +    LOG.info("Found host {}", host);
         return host;
       }
     
    +  public void populateAntiHostList(PTContainer c, List<String> antiHosts)
    +  {
    +    for (PTContainer container : c.getStrictAntiPrefs()) {
    +      if (antiAffinityMapping.containsKey(container)) {
    +        antiHosts.add(antiAffinityMapping.get(container));
    +      } else {
    +        // Check if there is an anti-affinity with host locality
    +        String antiHost = getAntiHostsList(container);
    +        if (antiHost != null) {
    +          antiHosts.add(antiHost);
    +        }
    +      }
    +    }
    +  }
    +
    +  public String getAntiHostsList(PTContainer container)
    +  {
    +    for (PTOperator oper : container.getOperators()) {
    +      HostOperatorSet grpObj = oper.getNodeLocalOperators();
    +      String host = nodeLocalMapping.get(grpObj.getOperatorSet());
    +      if (host != null) {
    +        return host;
    +      }
    +      if (grpObj.getHost() != null) {
    +        host = grpObj.getHost();
    +        return host;
    +      }
    +    }
    +    return null;
    +  }
    +
    +  public String assignHost(String host, List<String> antiHosts, List<String> antiPreferredHosts, HostOperatorSet grpObj, Set<PTOperator> nodeLocalSet, int aggrMemory, int vCores)
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57508918
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---
    @@ -668,7 +671,10 @@ private void execute() throws YarnException, IOException
         int numReleasedContainers = 0;
         int nextRequestPriority = 0;
         ResourceRequestHandler resourceRequestor = new ResourceRequestHandler();
    +    // Use override for container requestor in case of cloudera distribution,to handle host specific requests
    +    ContainerRequestHandler containerRequestor = System.getenv().containsKey("CDH_HADOOP_BIN") ? new ContainerRequestHandlerCloudera() : new ContainerRequestHandler();
    --- End diff --
    
    Would it makes sense to combine ResourceRequestHandler and ContainerRequestHandler?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57507443
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/ContainerRequestHandlerCloudera.java ---
    @@ -0,0 +1,138 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.commons.lang3.tuple.MutablePair;
    +import org.apache.hadoop.yarn.client.api.AMRMClient;
    +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
    +
    +/**
    + * Handles creating container requests and issuing node-specific container
    + * requests by blacklisting specifically for cloudera
    --- End diff --
    
    Will add it in comments, mentioning the Jira here for reference: https://issues.apache.org/jira/browse/YARN-1412


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58977758
  
    --- Diff: api/src/main/java/com/datatorrent/api/StringCodec.java ---
    @@ -377,4 +384,43 @@ public String toString(Class<? extends T> clazz)
         private static final long serialVersionUID = 201312082053L;
       }
     
    +  public class JsonStringCodec<T> implements StringCodec<T>, Serializable
    +  {
    +    private static final long serialVersionUID = 2513932518264776006L;
    +    Class<?> clazz;
    +
    +    public JsonStringCodec(Class<T> clazz)
    +    {
    +      this.clazz = clazz;
    +    }
    +
    +    @Override
    +    public T fromString(String string)
    +    {
    +      try {
    +        ObjectMapper mapper = new ObjectMapper();
    +        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    +        ObjectReader reader = mapper.reader(clazz);
    +        return reader.readValue(string);
    +      } catch (IOException e) {
    +        Throwables.propagate(e);
    --- End diff --
    
    Exception propagation. No need for subsequent return null


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57508019
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/ContainerRequestHandlerCloudera.java ---
    @@ -0,0 +1,138 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.commons.lang3.tuple.MutablePair;
    +import org.apache.hadoop.yarn.client.api.AMRMClient;
    +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
    +
    +/**
    + * Handles creating container requests and issuing node-specific container
    + * requests by blacklisting specifically for cloudera
    + */
    +public class ContainerRequestHandlerCloudera extends ContainerRequestHandler
    +{
    +  HashMap<ContainerRequest, ContainerStartRequest> hostSpecificRequests = new HashMap<>();
    +  HashMap<ContainerRequest, ContainerStartRequest> otherContainerRequests = new HashMap<>();
    +  HashMap<String, List<ContainerRequest>> hostSpecificRequestsMap = new HashMap<>();
    +  List<String> blacklistedNodesForHostSpecificRequests = null;
    +
    +  public void reissueContainerRequests(AMRMClient<ContainerRequest> amRmClient, Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, ResourceRequestHandler resourceRequestor, List<ContainerRequest> containerRequests, List<ContainerRequest> removedContainerRequests)
    +  {
    +    // Issue all host specific requests first
    +    if (!hostSpecificRequestsMap.isEmpty() && requestedResources.isEmpty()) {
    +      LOG.info("Issue Host specific requests first");
    +      // Blacklist all the nodes and issue request for host specific
    +      Entry<String, List<ContainerRequest>> set = hostSpecificRequestsMap.entrySet().iterator().next();
    +      List<ContainerRequest> requests = set.getValue();
    +      List<String> blacklistNodes = resourceRequestor.getNodesExceptHost(requests.get(0).getNodes());
    +      amRmClient.updateBlacklist(blacklistNodes, requests.get(0).getNodes());
    +      blacklistedNodesForHostSpecificRequests = blacklistNodes;
    +      LOG.info("Sending {} request(s) after blacklisting nodes {} and removed host from request {}", requests.size(), blacklistNodes, requests.get(0).getNodes());
    +
    +      for (ContainerRequest cr : requests) {
    +        ContainerStartRequest csr = hostSpecificRequests.get(cr);
    +        ContainerRequest newCr = new ContainerRequest(cr.getCapability(), null, null, cr.getPriority());
    +        MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, newCr);
    +        requestedResources.put(csr, pair);
    +        containerRequests.add(newCr);
    +        hostSpecificRequests.remove(cr);
    +      }
    +      hostSpecificRequestsMap.remove(set.getKey());
    +    } else if (!requestedResources.isEmpty()) {
    +      // Check if any requests timed out, create new requests in that case
    +      recreateContainerRequest(requestedResources, loopCounter, resourceRequestor, removedContainerRequests);
    +    } else {
    +      if (blacklistedNodesForHostSpecificRequests != null) {
    +        // Remove the nodes blacklisted nodes for host specific requests
    --- End diff --
    
    failed to parse comment :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58955589
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---
    @@ -750,6 +752,11 @@ private void execute() throws YarnException, IOException
             expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, hdfsKeyTabFile, credentials, rmAddress, true);
           }
     
    +      if (System.currentTimeMillis() > nodeReportUpdateTime) {
    --- End diff --
    
    It has to be in the loop, but no need to call it multiple times. The millis are used in several places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57633606
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java ---
    @@ -2435,7 +2435,7 @@ private void setAttributes(List<? extends Conf> confs, Attribute.AttributeMap at
               else {
                 if (processedAttributes.add(attribute)) {
                   String val = e.getValue();
    -              if (val.trim().charAt(0) == '{') {
    +              if (val.trim().charAt(0) == '{' && !(attribute.codec instanceof JsonStringCodec)) {
    --- End diff --
    
    Nested objects parsing was failing with default JSON parsing. It needed className for correct parsing, so added JsonStringCodec.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57508511
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java ---
    @@ -153,11 +177,22 @@ public String getHost(ContainerStartRequest csr, boolean first)
     
         // the host requested didn't have the resources so looking for other hosts
         host = null;
    +    List<String> antiHosts = new ArrayList<>();
    +    List<String> antiPreferredHosts = new ArrayList<>();
    +    if (!c.getStrictAntiPrefs().isEmpty()) {
    +      // Check if containers are allocated already for the anti-affinity
    +      // containers
    --- End diff --
    
    extra line break


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58954902
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---
    @@ -386,6 +472,38 @@ public PhysicalPlan(LogicalPlan dag, PlanContext ctx) {
         this.undeployOpers.clear();
       }
     
    +  public void setAntiAffinityForContainers(LogicalPlan dag, Collection<AffinityRule> affinityRules, Map<PTOperator, PTContainer> operatorContainerMap)
    --- End diff --
    
    Both are possible ways of doing it. This is more consistent with how node locality is handled i.e. PTOperator contains list of operators that should be co-allocated. Advantage of having the anti-affinity list of containers in each container is it makes it easy to assign host to container requests based on anti-affinity list.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58961724
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---
    @@ -344,9 +346,67 @@ public PhysicalPlan(LogicalPlan dag, PlanContext ctx) {
             addLogicalOperator(n);
           }
         }
    +    
    +    inlinePrefs.prefs.clear();
    +    localityPrefs.prefs.clear();
    +    
    +    // Add inlinePrefs and localityPreds for affinity rules
    --- End diff --
    
    Great point. Did not handle it earlier. 
    Removed clearing of inlinePrefs and localityPrefs. These preference are applied again on dynamic plan changes. So affinity will be set again on dynamic changes. As for anti-affinity, added population of antiAffinityPreferences for containers on re-allocation.
    
    Added a test case for the same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58620829
  
    --- Diff: api/src/main/java/com/datatorrent/api/AffinityRule.java ---
    @@ -0,0 +1,153 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.api;
    +
    +import java.io.Serializable;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +import com.datatorrent.api.DAG.Locality;
    +
    +/**
    + * Affinity rule specifies constraints for physical deployment of operator
    + * containers. There are two types of rules that can be specified: Affinity and
    + * Anti-affinity. Each rule contains list of operators or pair of 2 operators or
    + * a regex that should match at least 2 operators. Based on the type of rule,
    + * affinity or anti-affinity, the operators will be deployed together or away
    + * from each other. The locality indicates the level at which the rule should be
    + * applied. E.g. CONTAINER_LOCAL affinity would indicate operators Should be
    + * allocated within same container NODE_LOCAL anti-affinity indicates that the
    + * operators should not be allocated on the same node. The rule can be either
    + * strict or relaxed.
    + *
    + */
    +public class AffinityRule implements Serializable
    +{
    +  @Override
    +  public String toString()
    +  {
    +    return "AffinityRule {operatorsList=" + operatorsList + ", operatorRegex=" + operatorRegex + ", locality=" + locality + ", type=" + type + ", relaxLocality=" + relaxLocality + "}";
    +  }
    +
    +  private static final long serialVersionUID = 107131504929875386L;
    +
    +  /**
    +   * Type of affinity rule setting affects how operators are scheduled for
    +   * deployment by the platform.
    +   */
    +  public static enum Type
    +  {
    +    /**
    +     * AFFINITY indicates that operators in the rule should be deployed within
    +     * locality specified in the rule
    +     */
    +    AFFINITY,
    +    /**
    +     * ANTI_AFFINITY indicates that operators in the rule should NOT deployed
    +     * within same locality as specified in rule
    +     */
    +    ANTI_AFFINITY
    +  }
    +
    +  private List<String> operatorsList;
    +  private String operatorRegex;
    +  private Locality locality;
    +  private Type type;
    +  private boolean relaxLocality;
    +
    +  public AffinityRule()
    +  {
    +  }
    +
    +  public AffinityRule(Type type, Locality locality, boolean relaxLocality)
    +  {
    +    this.type = type;
    +    this.locality = locality;
    +    this.setRelaxLocality(relaxLocality);
    +  }
    +
    +  public AffinityRule(Type type, Locality locality, boolean relaxLocality, String... operators)
    --- End diff --
    
    Does it require at least 2 operators? Is so, maybe make it String, String...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58952486
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---
    @@ -750,6 +752,11 @@ private void execute() throws YarnException, IOException
             expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, hdfsKeyTabFile, credentials, rmAddress, true);
           }
     
    +      if (System.currentTimeMillis() > nodeReportUpdateTime) {
    --- End diff --
    
    I have put it inside loop so that node reports get updated periodically. This helps in keeping latest available resources in node reports so that on container kill/restart, we can allocate nodes based on latest available resources.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57508062
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/ContainerRequestHandlerCloudera.java ---
    @@ -0,0 +1,138 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.commons.lang3.tuple.MutablePair;
    +import org.apache.hadoop.yarn.client.api.AMRMClient;
    +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
    +
    +/**
    + * Handles creating container requests and issuing node-specific container
    + * requests by blacklisting specifically for cloudera
    + */
    +public class ContainerRequestHandlerCloudera extends ContainerRequestHandler
    --- End diff --
    
    Perhaps instead name it so it indicates what it does (blacklist based) vs. why we have it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57509015
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java ---
    @@ -2435,7 +2435,7 @@ private void setAttributes(List<? extends Conf> confs, Attribute.AttributeMap at
               else {
                 if (processedAttributes.add(attribute)) {
                   String val = e.getValue();
    -              if (val.trim().charAt(0) == '{') {
    +              if (val.trim().charAt(0) == '{' && !(attribute.codec instanceof JsonStringCodec)) {
    --- End diff --
    
    Why this if we already have a way to parse JSON?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58622624
  
    --- Diff: api/src/main/java/com/datatorrent/api/DAG.java ---
    @@ -250,6 +250,24 @@
       public abstract <T> void setAttribute(Operator operator, Attribute<T> key, T value);
     
       /**
    +   * Set affinity between operators
    +   * @param locality
    +   * @param relaxLocality
    +   * @param first operator
    +   * @param one or more operators
    +   */
    +  public abstract void setAffinity(Locality locality, boolean relaxLocality, String firstOperator, String... operators);
    --- End diff --
    
    Is still think these should be removed. Any other opinions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#issuecomment-207551258
  
    I have added reassigning of anti-affinity sets for containers in assignContainers after repartitioning is done. But will create a follow up Jira to validate dynamic update scenarios.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58918937
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java ---
    @@ -0,0 +1,142 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.commons.lang3.tuple.MutablePair;
    +import org.apache.hadoop.yarn.client.api.AMRMClient;
    +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
    +
    +/**
    + * Handles creating container requests and issuing node-specific container
    + * requests by blacklisting (specifically for cloudera)
    + * 
    + * Host specific container requests are not allocated on Cloudera as captured in
    + * Jira Yarn-1412 (https://issues.apache.org/jira/browse/YARN-1412) 
    + * To handle such requests, we blacklist all the other nodes before issueing node request.
    + */
    +public class BlacklistBasedResourceRequestHandler extends ResourceRequestHandler
    +{
    +  HashMap<ContainerRequest, ContainerStartRequest> hostSpecificRequests = new HashMap<>();
    +  HashMap<ContainerRequest, ContainerStartRequest> otherContainerRequests = new HashMap<>();
    +  HashMap<String, List<ContainerRequest>> hostSpecificRequestsMap = new HashMap<>();
    +  List<String> blacklistedNodesForHostSpecificRequests = null;
    +
    +  public void reissueContainerRequests(AMRMClient<ContainerRequest> amRmClient, Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, ResourceRequestHandler resourceRequestor, List<ContainerRequest> containerRequests, List<ContainerRequest> removedContainerRequests)
    +  {
    +    // Issue all host specific requests first
    +    if (!hostSpecificRequestsMap.isEmpty() && requestedResources.isEmpty()) {
    +      LOG.info("Issue Host specific requests first");
    +      // Blacklist all the nodes and issue request for host specific
    +      Entry<String, List<ContainerRequest>> set = hostSpecificRequestsMap.entrySet().iterator().next();
    +      List<ContainerRequest> requests = set.getValue();
    +      List<String> blacklistNodes = resourceRequestor.getNodesExceptHost(requests.get(0).getNodes());
    +      amRmClient.updateBlacklist(blacklistNodes, requests.get(0).getNodes());
    +      blacklistedNodesForHostSpecificRequests = blacklistNodes;
    +      LOG.info("Sending {} request(s) after blacklisting nodes {} and removed host from request {}", requests.size(), blacklistNodes, requests.get(0).getNodes());
    --- End diff --
    
    I still have a concern with the logging of potentially very large collections of nodes (consider cluster size).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#issuecomment-205987658
  
    Looking at it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#issuecomment-191536790
  
    Testing Done: 
    
    1. Unit tests for Dag Validation scenarios
    2. Unit tests for host allocation based on affinity rules
    3. Tested following scenarios on Cloudera and Hortonworks:
       - Anti-affinity between partitions of operators
       - Anti-affinity between any two operators (stream connected and not connected) 
       - Anti-affinity between parallel partitioned operators
       - Node and Container Affinity between operators not connected by stream
       - Node affinity between 2 Stream connected operators when the stream is not Node Local
       - Container affinity between 2 Stream connected operators when the stream is not Node Local
       - Affinity and Anti-affinity rules with Locality_host set for operator
       - Out of resources condition in case of Strict Anti-affinity. Application should keep waiting till resources are allocated as per rules
      - Checked preferred anti-affinity rules are dropped if resources are not available



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57507255
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/ContainerRequestHandler.java ---
    @@ -0,0 +1,59 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.tuple.MutablePair;
    +import org.apache.hadoop.yarn.client.api.AMRMClient;
    +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
    +
    +/**
    + * Handles creating container requests and reissuing them on timeout
    + *
    + */
    +public class ContainerRequestHandler
    +{
    +  protected static final int NUMBER_MISSED_HEARTBEATS = 30;
    +
    +  public void reissueContainerRequests(AMRMClient<ContainerRequest> amRmClient, Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, ResourceRequestHandler resourceRequestor, List<ContainerRequest> containerRequests, List<ContainerRequest> removedContainerRequests)
    +  {
    +    if (!requestedResources.isEmpty()) {
    +      // resourceRequestor.clearNodeMapping();
    +      for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> entry : requestedResources.entrySet()) {
    +        if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) {
    --- End diff --
    
    What happens otherwise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57633300
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java ---
    @@ -108,6 +113,23 @@ public void updateNodeReports(List<NodeReport> nodeReports)
         }
       }
     
    +  public List<String> getNodesExceptHost(List<String> hostNames)
    --- End diff --
    
    We assign a single host for each request. However, Yarn ContainerRequest can have list of nodes.. Added hostnames to set for contains check. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#issuecomment-207178373
  
    @ishark please resolve the merge conflict


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58623950
  
    --- Diff: api/src/main/java/com/datatorrent/api/AffinityRule.java ---
    @@ -0,0 +1,153 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.api;
    +
    +import java.io.Serializable;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +import com.datatorrent.api.DAG.Locality;
    +
    +/**
    + * Affinity rule specifies constraints for physical deployment of operator
    + * containers. There are two types of rules that can be specified: Affinity and
    + * Anti-affinity. Each rule contains list of operators or pair of 2 operators or
    + * a regex that should match at least 2 operators. Based on the type of rule,
    + * affinity or anti-affinity, the operators will be deployed together or away
    + * from each other. The locality indicates the level at which the rule should be
    + * applied. E.g. CONTAINER_LOCAL affinity would indicate operators Should be
    + * allocated within same container NODE_LOCAL anti-affinity indicates that the
    + * operators should not be allocated on the same node. The rule can be either
    + * strict or relaxed.
    + *
    + */
    +public class AffinityRule implements Serializable
    +{
    +  @Override
    +  public String toString()
    +  {
    +    return "AffinityRule {operatorsList=" + operatorsList + ", operatorRegex=" + operatorRegex + ", locality=" + locality + ", type=" + type + ", relaxLocality=" + relaxLocality + "}";
    +  }
    +
    +  private static final long serialVersionUID = 107131504929875386L;
    +
    +  /**
    +   * Type of affinity rule setting affects how operators are scheduled for
    +   * deployment by the platform.
    +   */
    +  public static enum Type
    +  {
    +    /**
    +     * AFFINITY indicates that operators in the rule should be deployed within
    +     * locality specified in the rule
    +     */
    +    AFFINITY,
    +    /**
    +     * ANTI_AFFINITY indicates that operators in the rule should NOT deployed
    +     * within same locality as specified in rule
    +     */
    +    ANTI_AFFINITY
    +  }
    +
    +  private List<String> operatorsList;
    +  private String operatorRegex;
    +  private Locality locality;
    +  private Type type;
    +  private boolean relaxLocality;
    +
    +  public AffinityRule()
    +  {
    +  }
    +
    +  public AffinityRule(Type type, Locality locality, boolean relaxLocality)
    +  {
    +    this.type = type;
    +    this.locality = locality;
    +    this.setRelaxLocality(relaxLocality);
    +  }
    +
    +  public AffinityRule(Type type, Locality locality, boolean relaxLocality, String... operators)
    --- End diff --
    
    Changed it to String, String ..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57509050
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/AffinityRulesTest.java ---
    @@ -0,0 +1,177 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram;
    +
    +import java.io.File;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.apache.hadoop.yarn.api.records.NodeReport;
    +import org.apache.hadoop.yarn.api.records.NodeState;
    +import org.apache.hadoop.yarn.server.utils.BuilderUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +
    +import com.datatorrent.api.AffinityRule;
    +import com.datatorrent.api.AffinityRule.OperatorPair;
    +import com.datatorrent.api.AffinityRule.Type;
    +import com.datatorrent.api.AffinityRulesSet;
    +import com.datatorrent.api.Context.DAGContext;
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.api.DAG.Locality;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
    +import com.datatorrent.stram.engine.GenericTestOperator;
    +import com.datatorrent.stram.engine.TestGeneratorInputOperator;
    +import com.datatorrent.stram.plan.logical.LogicalPlan;
    +import com.datatorrent.stram.plan.physical.PTContainer;
    +import com.datatorrent.stram.plan.physical.PTOperator;
    +import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
    +
    +public class AffinityRulesTest
    +{
    +  private static final Logger LOG = LoggerFactory.getLogger(AffinityRulesTest.class);
    +
    +  @Test
    +  public void testOperatorPartitionsAntiAffinity()
    +  {
    +    LogicalPlan dag = new LogicalPlan();
    +    TestGeneratorInputOperator o1 = dag.addOperator("O1", new TestGeneratorInputOperator());
    +    GenericTestOperator o2 = dag.addOperator("O2", new GenericTestOperator());
    +    GenericTestOperator o3 = dag.addOperator("O3", new GenericTestOperator());
    +    dag.addStream("stream1", o1.outport, o2.inport1);
    +    dag.addStream("stream2", o2.outport1, o3.inport1);
    +
    +    dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(5));
    +
    +    AffinityRulesSet ruleSet = new AffinityRulesSet();
    +    // Valid case:
    +    List<AffinityRule> rules = new ArrayList<>();
    +    ruleSet.setAffinityRules(rules);
    +    AffinityRule rule1 = new AffinityRule(Type.ANTI_AFFINITY, new OperatorPair("O2", "O2"), Locality.NODE_LOCAL, false);
    +    rules.add(rule1);
    +    dag.setAttribute(DAGContext.AFFINITY_RULES_SET, ruleSet);
    +    dag.validate();
    +    dag.getAttributes().put(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, new File("target", HostLocalTest.class.getName()).getAbsolutePath());
    --- End diff --
    
    see StramTestSupport.TestMeta


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-core/pull/250


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57506947
  
    --- Diff: api/src/main/java/com/datatorrent/api/DAG.java ---
    @@ -250,6 +250,24 @@
       public abstract <T> void setAttribute(Operator operator, Attribute<T> key, T value);
     
       /**
    +   * Set affinity between operators
    +   * @param locality
    +   * @param relaxLocality
    +   * @param first operator
    +   * @param one or more operators
    +   */
    +  public abstract void setAffinity(Locality locality, boolean relaxLocality, String firstOperator, String... operators);
    --- End diff --
    
    I see. IMO we should not bloat the core API with convenience or helper methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#issuecomment-201702989
  
    Haven't looked at the plan changes yet, rest looks good so far.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57506464
  
    --- Diff: api/src/main/java/com/datatorrent/api/AffinityRule.java ---
    @@ -0,0 +1,211 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.api;
    +
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import com.datatorrent.api.DAG.Locality;
    +
    +/**
    + * Affinity rule specifies constraints for physical deployment of operator
    + * containers. There are two types of rules that can be specified: Affinity and
    + * Anti-affinity. Each rule contains list of operators or pair of 2 operators or
    + * a regex that should match at least 2 operators. Based on the type of rule,
    + * affinity or anti-affinity, the operators will be deployed together or away
    + * from each other. The locality indicates the level at which the rule should be
    + * applied. E.g. CONTAINER_LOCAL affinity would indicate operators Should be
    + * allocated within same container NODE_LOCAL anti-affinity indicates that the
    + * operators should not be allocated on the same node. The rule can be either
    + * strict or relaxed.
    + *
    + */
    +public class AffinityRule implements Serializable
    +{
    +  @Override
    +  public String toString()
    +  {
    +    return "AffinityRule {operatorsList=" + operatorsList + ", operatorRegex=" + operatorRegex + ", operators="
    +        + operators + ", locality=" + locality + ", type=" + type + ", relaxLocality=" + relaxLocality + "}";
    +  }
    +
    +  private static final long serialVersionUID = 107131504929875386L;
    +
    +  /**
    +   * Pair of operator names to specify affinity rule
    +   * The order of operators is not considered in this class
    +   * i.e. OperatorPair("O1", "O2") is equal to OperatorPair("O2", "O1")
    +   */
    +  public static class OperatorPair implements Serializable
    --- End diff --
    
    Why is it a pair and not a collection, looks like the regex can match more operators?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57506608
  
    --- Diff: api/src/main/java/com/datatorrent/api/DAG.java ---
    @@ -250,6 +250,24 @@
       public abstract <T> void setAttribute(Operator operator, Attribute<T> key, T value);
     
       /**
    +   * Set affinity between operators
    +   * @param locality
    +   * @param relaxLocality
    +   * @param first operator
    +   * @param one or more operators
    +   */
    +  public abstract void setAffinity(Locality locality, boolean relaxLocality, String firstOperator, String... operators);
    --- End diff --
    
    Why these additions? Why not specify the attribute directly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57507488
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/ContainerRequestHandler.java ---
    @@ -0,0 +1,59 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.tuple.MutablePair;
    +import org.apache.hadoop.yarn.client.api.AMRMClient;
    +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
    +
    +/**
    + * Handles creating container requests and reissuing them on timeout
    + *
    + */
    +public class ContainerRequestHandler
    +{
    +  protected static final int NUMBER_MISSED_HEARTBEATS = 30;
    +
    +  public void reissueContainerRequests(AMRMClient<ContainerRequest> amRmClient, Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, ResourceRequestHandler resourceRequestor, List<ContainerRequest> containerRequests, List<ContainerRequest> removedContainerRequests)
    +  {
    +    if (!requestedResources.isEmpty()) {
    +      // resourceRequestor.clearNodeMapping();
    --- End diff --
    
    remove comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58919720
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---
    @@ -750,6 +752,11 @@ private void execute() throws YarnException, IOException
             expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, hdfsKeyTabFile, credentials, rmAddress, true);
           }
     
    +      if (System.currentTimeMillis() > nodeReportUpdateTime) {
    --- End diff --
    
    Still see System.currentTimeMillis repeated within the loop. Please make a final variable out of it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58952684
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java ---
    @@ -0,0 +1,142 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.commons.lang3.tuple.MutablePair;
    +import org.apache.hadoop.yarn.client.api.AMRMClient;
    +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
    +
    +/**
    + * Handles creating container requests and issuing node-specific container
    + * requests by blacklisting (specifically for cloudera)
    + * 
    + * Host specific container requests are not allocated on Cloudera as captured in
    + * Jira Yarn-1412 (https://issues.apache.org/jira/browse/YARN-1412) 
    + * To handle such requests, we blacklist all the other nodes before issueing node request.
    + */
    +public class BlacklistBasedResourceRequestHandler extends ResourceRequestHandler
    +{
    +  HashMap<ContainerRequest, ContainerStartRequest> hostSpecificRequests = new HashMap<>();
    +  HashMap<ContainerRequest, ContainerStartRequest> otherContainerRequests = new HashMap<>();
    +  HashMap<String, List<ContainerRequest>> hostSpecificRequestsMap = new HashMap<>();
    +  List<String> blacklistedNodesForHostSpecificRequests = null;
    +
    +  public void reissueContainerRequests(AMRMClient<ContainerRequest> amRmClient, Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, ResourceRequestHandler resourceRequestor, List<ContainerRequest> containerRequests, List<ContainerRequest> removedContainerRequests)
    +  {
    +    // Issue all host specific requests first
    +    if (!hostSpecificRequestsMap.isEmpty() && requestedResources.isEmpty()) {
    +      LOG.info("Issue Host specific requests first");
    +      // Blacklist all the nodes and issue request for host specific
    +      Entry<String, List<ContainerRequest>> set = hostSpecificRequestsMap.entrySet().iterator().next();
    +      List<ContainerRequest> requests = set.getValue();
    +      List<String> blacklistNodes = resourceRequestor.getNodesExceptHost(requests.get(0).getNodes());
    +      amRmClient.updateBlacklist(blacklistNodes, requests.get(0).getNodes());
    +      blacklistedNodesForHostSpecificRequests = blacklistNodes;
    +      LOG.info("Sending {} request(s) after blacklisting nodes {} and removed host from request {}", requests.size(), blacklistNodes, requests.get(0).getNodes());
    --- End diff --
    
    Fixed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57508599
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java ---
    @@ -170,19 +205,69 @@ public String getHost(ContainerStartRequest csr, boolean first)
                 containers.add(nodeLocalOper.getContainer());
               }
             }
    -        for (Map.Entry<String, NodeReport> nodeEntry : nodeReportMap.entrySet()) {
    -          int memAvailable = nodeEntry.getValue().getCapability().getMemory() - nodeEntry.getValue().getUsed().getMemory();
    -          int vCoresAvailable = nodeEntry.getValue().getCapability().getVirtualCores() - nodeEntry.getValue().getUsed().getVirtualCores();
    -          if (memAvailable >= aggrMemory && vCoresAvailable >= vCores) {
    -            host = nodeEntry.getKey();
    -            grpObj.setHost(host);
    -            nodeLocalMapping.put(nodeLocalSet, host);
    -            return host;
    -          }
    +        host = assignHost(host, antiHosts, antiPreferredHosts, grpObj, nodeLocalSet, aggrMemory, vCores);
    +
    +        if (host == null && !antiPreferredHosts.isEmpty() && !antiHosts.isEmpty()) {
    +          // Drop the preferred constraint and try allocation
    +          antiPreferredHosts.clear();
    +          host = assignHost(host, antiHosts, antiPreferredHosts, grpObj, nodeLocalSet, aggrMemory, vCores);
    +        }
    +        if (host != null) {
    +          antiAffinityMapping.put(c, host);
    +        } else {
    +          host = INVALID_HOST;
             }
           }
         }
    +    LOG.info("Found host {}", host);
         return host;
       }
     
    +  public void populateAntiHostList(PTContainer c, List<String> antiHosts)
    +  {
    +    for (PTContainer container : c.getStrictAntiPrefs()) {
    +      if (antiAffinityMapping.containsKey(container)) {
    +        antiHosts.add(antiAffinityMapping.get(container));
    +      } else {
    +        // Check if there is an anti-affinity with host locality
    +        String antiHost = getAntiHostsList(container);
    +        if (antiHost != null) {
    +          antiHosts.add(antiHost);
    +        }
    +      }
    +    }
    +  }
    +
    +  public String getAntiHostsList(PTContainer container)
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58955458
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---
    @@ -357,25 +417,51 @@ public PhysicalPlan(LogicalPlan dag, PlanContext ctx) {
               if (!container.operators.isEmpty()) {
                 LOG.warn("Operator {} shares container without locality contraint due to insufficient resources.", oper);
               }
    +          // TODO: Check if PT Operators conflict in anti-affinity, Pick the first container that does not conflict
    --- End diff --
    
    maxContainer is deprecated, so the check won't be needed then?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#issuecomment-207202664
  
    Are the affinity sets updated when containers are added or removed due to dynamic partitioning? If they are not, please create a follow up JIRA so we can address it later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58927033
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---
    @@ -386,6 +472,38 @@ public PhysicalPlan(LogicalPlan dag, PlanContext ctx) {
         this.undeployOpers.clear();
       }
     
    +  public void setAntiAffinityForContainers(LogicalPlan dag, Collection<AffinityRule> affinityRules, Map<PTOperator, PTContainer> operatorContainerMap)
    --- End diff --
    
    Would it make sense to maintain the relationship in PhysicalPlan instead of adding to each container?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57508956
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---
    @@ -713,6 +717,11 @@ private void execute() throws YarnException, IOException
             expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, hdfsKeyTabFile, credentials, rmAddress, true);
           }
     
    +      if (System.currentTimeMillis() - nodeReportUpdateTime >= UPDATE_NODE_REPORTS_INTERVAL) {
    --- End diff --
    
    We should see to not retrieve currentTimeMillis in one iteration (also see security stuff above).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57506976
  
    --- Diff: api/src/main/java/com/datatorrent/api/AffinityRule.java ---
    @@ -0,0 +1,211 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.api;
    +
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import com.datatorrent.api.DAG.Locality;
    +
    +/**
    + * Affinity rule specifies constraints for physical deployment of operator
    + * containers. There are two types of rules that can be specified: Affinity and
    + * Anti-affinity. Each rule contains list of operators or pair of 2 operators or
    + * a regex that should match at least 2 operators. Based on the type of rule,
    + * affinity or anti-affinity, the operators will be deployed together or away
    + * from each other. The locality indicates the level at which the rule should be
    + * applied. E.g. CONTAINER_LOCAL affinity would indicate operators Should be
    + * allocated within same container NODE_LOCAL anti-affinity indicates that the
    + * operators should not be allocated on the same node. The rule can be either
    + * strict or relaxed.
    + *
    + */
    +public class AffinityRule implements Serializable
    +{
    +  @Override
    +  public String toString()
    +  {
    +    return "AffinityRule {operatorsList=" + operatorsList + ", operatorRegex=" + operatorRegex + ", operators="
    +        + operators + ", locality=" + locality + ", type=" + type + ", relaxLocality=" + relaxLocality + "}";
    +  }
    +
    +  private static final long serialVersionUID = 107131504929875386L;
    +
    +  /**
    +   * Pair of operator names to specify affinity rule
    +   * The order of operators is not considered in this class
    +   * i.e. OperatorPair("O1", "O2") is equal to OperatorPair("O2", "O1")
    +   */
    +  public static class OperatorPair implements Serializable
    --- End diff --
    
    Collection support is for convenience. Instead of saying anti affinity between operator pairs lets say A & B , B & C, with collection we can directly set anti-affinity between collection A, B, C. 
    Internally, everything is converted into operator pairs, as this is easier to validate against stream locality and other affinity rules.
    
    The OperatorPair class was created because Pair class inside apex-common is not available under api.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57507288
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/ContainerRequestHandlerCloudera.java ---
    @@ -0,0 +1,138 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.commons.lang3.tuple.MutablePair;
    +import org.apache.hadoop.yarn.client.api.AMRMClient;
    +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
    +
    +/**
    + * Handles creating container requests and issuing node-specific container
    + * requests by blacklisting specifically for cloudera
    --- End diff --
    
    Maybe a bit more explanation and JIRA references?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57506337
  
    --- Diff: api/pom.xml ---
    @@ -70,14 +70,6 @@
               <groupId>commons-beanutils</groupId>
             </exclusion>
             <exclusion>
    -          <artifactId>jackson-core-asl</artifactId>
    --- End diff --
    
    Why this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r57507141
  
    --- Diff: api/src/main/java/com/datatorrent/api/AffinityRule.java ---
    @@ -0,0 +1,211 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.api;
    +
    +import java.io.Serializable;
    +import java.util.List;
    +
    +import com.datatorrent.api.DAG.Locality;
    +
    +/**
    + * Affinity rule specifies constraints for physical deployment of operator
    + * containers. There are two types of rules that can be specified: Affinity and
    + * Anti-affinity. Each rule contains list of operators or pair of 2 operators or
    + * a regex that should match at least 2 operators. Based on the type of rule,
    + * affinity or anti-affinity, the operators will be deployed together or away
    + * from each other. The locality indicates the level at which the rule should be
    + * applied. E.g. CONTAINER_LOCAL affinity would indicate operators Should be
    + * allocated within same container NODE_LOCAL anti-affinity indicates that the
    + * operators should not be allocated on the same node. The rule can be either
    + * strict or relaxed.
    + *
    + */
    +public class AffinityRule implements Serializable
    +{
    +  @Override
    +  public String toString()
    +  {
    +    return "AffinityRule {operatorsList=" + operatorsList + ", operatorRegex=" + operatorRegex + ", operators="
    +        + operators + ", locality=" + locality + ", type=" + type + ", relaxLocality=" + relaxLocality + "}";
    +  }
    +
    +  private static final long serialVersionUID = 107131504929875386L;
    +
    +  /**
    +   * Pair of operator names to specify affinity rule
    +   * The order of operators is not considered in this class
    +   * i.e. OperatorPair("O1", "O2") is equal to OperatorPair("O2", "O1")
    +   */
    +  public static class OperatorPair implements Serializable
    --- End diff --
    
    It's fine to use pair internally, but maybe it is not needed in the API? varargs looks more appropriate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-10 #resolve Changes for...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/250#discussion_r58954585
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---
    @@ -357,25 +417,51 @@ public PhysicalPlan(LogicalPlan dag, PlanContext ctx) {
               if (!container.operators.isEmpty()) {
                 LOG.warn("Operator {} shares container without locality contraint due to insufficient resources.", oper);
               }
    +          // TODO: Check if PT Operators conflict in anti-affinity, Pick the first container that does not conflict
    --- End diff --
    
    This check cannot be done in logical Plan validation. Since, 2 operators could be assigned to same container even if there is not locality constraint. This may occur if maxContainer value is set less than the needed number of containers for operator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---