You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by tweise <gi...@git.apache.org> on 2015/12/18 22:34:41 UTC

[GitHub] incubator-apex-core pull request: APEXCORE-3 Module Support REBASE...

GitHub user tweise opened a pull request:

    https://github.com/apache/incubator-apex-core/pull/189

    APEXCORE-3 Module Support REBASED

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tweise/incubator-apex-core devel-3

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-core/pull/189.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #189
    
----
commit e32adb9fa7b1b76016b815d00be7df947ed4f46f
Author: Vlad Rozov <v....@datatorrent.com>
Date:   2015-09-24T03:30:51Z

    APEX-103 - Add module and dag interface in API

commit c804774b86ae4c09dbd7ae4abb84d9fd64c510b6
Author: bhupeshchawda <bh...@gmail.com>
Date:   2015-10-06T07:04:24Z

    APEXCORE-194 Added support for proxy ports
    Added test cases.

commit 35b3c4e9544d89e57359844ccb9188aff0496461
Author: Tushar R. Gosavi <tu...@apache.org>
Date:   2015-10-06T08:18:53Z

    APEXCORE-105 Introduce module meta
    Inject properties through xml file on modules.

commit 3caeb0e38d3b66b58485dbb09b4666e6136ba11b
Author: chinmaykolhatkar <ch...@datatorrent.com>
Date:   2015-10-07T09:36:36Z

    APEXCORE-104 Added flattening of module into parent DAG

commit dff6017847628e13743c071da1bbe0bbbf4089b2
Author: shubham <sh...@github.com>
Date:   2015-11-17T06:40:08Z

    APEXCORE-144, APEXCORE-145 Rest api changes to view module information

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-3 Module Support REBASE...

Posted by tushargosavi <gi...@git.apache.org>.
Github user tushargosavi commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/189#issuecomment-166366057
  
    I am re-writing the commits on top of latest devel-3 with above changes, will push once done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-3 Module Support REBASE...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-core/pull/189


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-3 Module Support REBASE...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/189#discussion_r48163483
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1116,13 +1320,78 @@ public StreamMeta addStream(String id)
       public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks)
       {
         StreamMeta s = addStream(id);
    -    s.setSource(source);
    -    for (Operator.InputPort<?> sink: sinks) {
    -      s.addSink(sink);
    +    id = s.id;
    +    ArrayListMultimap<Operator.OutputPort<?>, Operator.InputPort<?>> streamMap = ArrayListMultimap.create();
    +    if (!(source instanceof ProxyOutputPort)) {
    +      s.setSource(source);
    +    }
    +    for (Operator.InputPort<?> sink : sinks) {
    +      if (source instanceof ProxyOutputPort || sink instanceof ProxyInputPort) {
    +        streamMap.put(source, sink);
    +        streamLinks.put(id, streamMap);
    +      } else {
    +        if (s.getSource() == null) {
    +          s.setSource(source);
    +        }
    +        s.addSink(sink);
    +      }
         }
         return s;
       }
     
    +  /**
    +   * This will be called once the Logical Dag is expanded, and the proxy input and proxy output ports are populated with the actual ports that they refer to
    +   * This method adds sources and sinks for the StreamMeta objects which were left empty in the addStream call.
    +   */
    +  public void applyStreamLinks()
    +  {
    +    for (String id : streamLinks.keySet()) {
    +      StreamMeta s = getStream(id);
    +      for (Map.Entry<Operator.OutputPort<?>, Operator.InputPort<?>> pair : streamLinks.get(id).entries()) {
    +        if (s.getSource() == null) {
    +          Operator.OutputPort<?> outputPort = pair.getKey();
    +          while (outputPort instanceof ProxyOutputPort) {
    +            outputPort = ((ProxyOutputPort<?>)outputPort).get();
    +          }
    +          s.setSource(outputPort);
    +        }
    +
    +        Operator.InputPort<?> inputPort = pair.getValue();
    +        while (inputPort instanceof ProxyInputPort) {
    +          inputPort = ((ProxyInputPort<?>)inputPort).get();
    +        }
    +        s.addSink(inputPort);
    +      }
    +    }
    +  }
    +
    +  @SuppressWarnings({ "unchecked", "rawtypes" })
    +  private void addDAGToCurrentDAG(ModuleMeta moduleMeta)
    +  {
    +    LogicalPlan subDag = moduleMeta.getDag();
    +    String subDAGName = moduleMeta.getName();
    +    String name;
    +    for (OperatorMeta operatorMeta : subDag.getAllOperators()) {
    +      name = subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName();
    +      this.addOperator(name, operatorMeta.getOperator());
    +      OperatorMeta operatorMetaNew = this.getOperatorMeta(name);
    +      operatorMetaNew.setModuleName(operatorMeta.getModuleName() == null ? subDAGName : subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getModuleName());
    +    }
    +
    +    for (StreamMeta streamMeta : subDag.getAllStreams()) {
    --- End diff --
    
    Why loop over all streams after having visited all operators already?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-3 Module Support REBASE...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/189#discussion_r48165286
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1116,13 +1320,78 @@ public StreamMeta addStream(String id)
       public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks)
       {
         StreamMeta s = addStream(id);
    -    s.setSource(source);
    -    for (Operator.InputPort<?> sink: sinks) {
    -      s.addSink(sink);
    +    id = s.id;
    +    ArrayListMultimap<Operator.OutputPort<?>, Operator.InputPort<?>> streamMap = ArrayListMultimap.create();
    +    if (!(source instanceof ProxyOutputPort)) {
    +      s.setSource(source);
    +    }
    +    for (Operator.InputPort<?> sink : sinks) {
    +      if (source instanceof ProxyOutputPort || sink instanceof ProxyInputPort) {
    +        streamMap.put(source, sink);
    +        streamLinks.put(id, streamMap);
    +      } else {
    +        if (s.getSource() == null) {
    +          s.setSource(source);
    +        }
    +        s.addSink(sink);
    +      }
         }
         return s;
       }
     
    +  /**
    +   * This will be called once the Logical Dag is expanded, and the proxy input and proxy output ports are populated with the actual ports that they refer to
    +   * This method adds sources and sinks for the StreamMeta objects which were left empty in the addStream call.
    +   */
    +  public void applyStreamLinks()
    +  {
    +    for (String id : streamLinks.keySet()) {
    +      StreamMeta s = getStream(id);
    +      for (Map.Entry<Operator.OutputPort<?>, Operator.InputPort<?>> pair : streamLinks.get(id).entries()) {
    +        if (s.getSource() == null) {
    +          Operator.OutputPort<?> outputPort = pair.getKey();
    +          while (outputPort instanceof ProxyOutputPort) {
    +            outputPort = ((ProxyOutputPort<?>)outputPort).get();
    +          }
    +          s.setSource(outputPort);
    +        }
    +
    +        Operator.InputPort<?> inputPort = pair.getValue();
    +        while (inputPort instanceof ProxyInputPort) {
    +          inputPort = ((ProxyInputPort<?>)inputPort).get();
    +        }
    +        s.addSink(inputPort);
    +      }
    +    }
    +  }
    +
    +  @SuppressWarnings({ "unchecked", "rawtypes" })
    +  private void addDAGToCurrentDAG(ModuleMeta moduleMeta)
    +  {
    +    LogicalPlan subDag = moduleMeta.getDag();
    +    String subDAGName = moduleMeta.getName();
    +    String name;
    +    for (OperatorMeta operatorMeta : subDag.getAllOperators()) {
    +      name = subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName();
    +      this.addOperator(name, operatorMeta.getOperator());
    +      OperatorMeta operatorMetaNew = this.getOperatorMeta(name);
    +      operatorMetaNew.setModuleName(operatorMeta.getModuleName() == null ? subDAGName : subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getModuleName());
    +    }
    +
    +    for (StreamMeta streamMeta : subDag.getAllStreams()) {
    +      OutputPortMeta sourceMeta = streamMeta.getSource();
    +      List<InputPort<?>> ports = new LinkedList<>();
    +      for (InputPortMeta inputPortMeta : streamMeta.getSinks()) {
    +        ports.add(inputPortMeta.getPortObject());
    +      }
    +      InputPort[] inputPorts = ports.toArray(new InputPort[]{});
    +
    +      name = subDAGName + MODULE_NAMESPACE_SEPARATOR + streamMeta.getName();
    +      StreamMeta streamMetaNew = this.addStream(name, sourceMeta.getPortObject(), inputPorts);
    +      streamMetaNew.setModuleName(streamMeta.getModuleName() == null ? subDAGName : subDAGName + "_" + streamMeta.getModuleName());
    --- End diff --
    
    Shouldn't the module name be same for all operators and streams? Why construct it in the loops?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-3 Module Support REBASE...

Posted by tushargosavi <gi...@git.apache.org>.
Github user tushargosavi commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/189#discussion_r48168838
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1116,13 +1320,78 @@ public StreamMeta addStream(String id)
       public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks)
       {
         StreamMeta s = addStream(id);
    -    s.setSource(source);
    -    for (Operator.InputPort<?> sink: sinks) {
    -      s.addSink(sink);
    +    id = s.id;
    +    ArrayListMultimap<Operator.OutputPort<?>, Operator.InputPort<?>> streamMap = ArrayListMultimap.create();
    +    if (!(source instanceof ProxyOutputPort)) {
    +      s.setSource(source);
    +    }
    +    for (Operator.InputPort<?> sink : sinks) {
    +      if (source instanceof ProxyOutputPort || sink instanceof ProxyInputPort) {
    +        streamMap.put(source, sink);
    +        streamLinks.put(id, streamMap);
    +      } else {
    +        if (s.getSource() == null) {
    +          s.setSource(source);
    +        }
    +        s.addSink(sink);
    +      }
         }
         return s;
       }
     
    +  /**
    +   * This will be called once the Logical Dag is expanded, and the proxy input and proxy output ports are populated with the actual ports that they refer to
    +   * This method adds sources and sinks for the StreamMeta objects which were left empty in the addStream call.
    +   */
    +  public void applyStreamLinks()
    +  {
    +    for (String id : streamLinks.keySet()) {
    +      StreamMeta s = getStream(id);
    +      for (Map.Entry<Operator.OutputPort<?>, Operator.InputPort<?>> pair : streamLinks.get(id).entries()) {
    +        if (s.getSource() == null) {
    +          Operator.OutputPort<?> outputPort = pair.getKey();
    +          while (outputPort instanceof ProxyOutputPort) {
    +            outputPort = ((ProxyOutputPort<?>)outputPort).get();
    +          }
    +          s.setSource(outputPort);
    +        }
    +
    +        Operator.InputPort<?> inputPort = pair.getValue();
    +        while (inputPort instanceof ProxyInputPort) {
    +          inputPort = ((ProxyInputPort<?>)inputPort).get();
    +        }
    +        s.addSink(inputPort);
    +      }
    +    }
    +  }
    +
    +  @SuppressWarnings({ "unchecked", "rawtypes" })
    +  private void addDAGToCurrentDAG(ModuleMeta moduleMeta)
    +  {
    +    LogicalPlan subDag = moduleMeta.getDag();
    +    String subDAGName = moduleMeta.getName();
    +    String name;
    +    for (OperatorMeta operatorMeta : subDag.getAllOperators()) {
    +      name = subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName();
    +      this.addOperator(name, operatorMeta.getOperator());
    +      OperatorMeta operatorMetaNew = this.getOperatorMeta(name);
    +      operatorMetaNew.setModuleName(operatorMeta.getModuleName() == null ? subDAGName : subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getModuleName());
    +    }
    +
    +    for (StreamMeta streamMeta : subDag.getAllStreams()) {
    +      OutputPortMeta sourceMeta = streamMeta.getSource();
    +      List<InputPort<?>> ports = new LinkedList<>();
    +      for (InputPortMeta inputPortMeta : streamMeta.getSinks()) {
    +        ports.add(inputPortMeta.getPortObject());
    +      }
    +      InputPort[] inputPorts = ports.toArray(new InputPort[]{});
    +
    +      name = subDAGName + MODULE_NAMESPACE_SEPARATOR + streamMeta.getName();
    +      StreamMeta streamMetaNew = this.addStream(name, sourceMeta.getPortObject(), inputPorts);
    +      streamMetaNew.setModuleName(streamMeta.getModuleName() == null ? subDAGName : subDAGName + "_" + streamMeta.getModuleName());
    --- End diff --
    
    The module name for operators will be different in case of nested modules. For example if 
    Operator O1 comes from module M1, and Operator O2 Come from module M2, which is inside module M1. Then O1 module name is "M1" and O2 module name is "M1$M2".
    
    While adding module M1 in to parent DAG, O1 module name was null and O2 module name was "M2".  When O1 got added to parent dag its module name becomes "M1", and O2's module names becomes "M1$M2"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-3 Module Support REBASE...

Posted by tushargosavi <gi...@git.apache.org>.
Github user tushargosavi commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/189#issuecomment-166231568
  
    I am working on removing these warnings. will send updated patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-3 Module Support REBASE...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/189#discussion_r48163580
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1116,13 +1320,78 @@ public StreamMeta addStream(String id)
       public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks)
       {
         StreamMeta s = addStream(id);
    -    s.setSource(source);
    -    for (Operator.InputPort<?> sink: sinks) {
    -      s.addSink(sink);
    +    id = s.id;
    +    ArrayListMultimap<Operator.OutputPort<?>, Operator.InputPort<?>> streamMap = ArrayListMultimap.create();
    +    if (!(source instanceof ProxyOutputPort)) {
    +      s.setSource(source);
    +    }
    +    for (Operator.InputPort<?> sink : sinks) {
    +      if (source instanceof ProxyOutputPort || sink instanceof ProxyInputPort) {
    +        streamMap.put(source, sink);
    +        streamLinks.put(id, streamMap);
    +      } else {
    +        if (s.getSource() == null) {
    +          s.setSource(source);
    +        }
    +        s.addSink(sink);
    +      }
         }
         return s;
       }
     
    +  /**
    +   * This will be called once the Logical Dag is expanded, and the proxy input and proxy output ports are populated with the actual ports that they refer to
    +   * This method adds sources and sinks for the StreamMeta objects which were left empty in the addStream call.
    +   */
    +  public void applyStreamLinks()
    +  {
    +    for (String id : streamLinks.keySet()) {
    +      StreamMeta s = getStream(id);
    +      for (Map.Entry<Operator.OutputPort<?>, Operator.InputPort<?>> pair : streamLinks.get(id).entries()) {
    +        if (s.getSource() == null) {
    +          Operator.OutputPort<?> outputPort = pair.getKey();
    +          while (outputPort instanceof ProxyOutputPort) {
    +            outputPort = ((ProxyOutputPort<?>)outputPort).get();
    +          }
    +          s.setSource(outputPort);
    +        }
    +
    +        Operator.InputPort<?> inputPort = pair.getValue();
    +        while (inputPort instanceof ProxyInputPort) {
    +          inputPort = ((ProxyInputPort<?>)inputPort).get();
    +        }
    +        s.addSink(inputPort);
    +      }
    +    }
    +  }
    +
    +  @SuppressWarnings({ "unchecked", "rawtypes" })
    +  private void addDAGToCurrentDAG(ModuleMeta moduleMeta)
    +  {
    +    LogicalPlan subDag = moduleMeta.getDag();
    +    String subDAGName = moduleMeta.getName();
    +    String name;
    +    for (OperatorMeta operatorMeta : subDag.getAllOperators()) {
    +      name = subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName();
    +      this.addOperator(name, operatorMeta.getOperator());
    +      OperatorMeta operatorMetaNew = this.getOperatorMeta(name);
    +      operatorMetaNew.setModuleName(operatorMeta.getModuleName() == null ? subDAGName : subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getModuleName());
    +    }
    +
    +    for (StreamMeta streamMeta : subDag.getAllStreams()) {
    +      OutputPortMeta sourceMeta = streamMeta.getSource();
    +      List<InputPort<?>> ports = new LinkedList<>();
    +      for (InputPortMeta inputPortMeta : streamMeta.getSinks()) {
    +        ports.add(inputPortMeta.getPortObject());
    +      }
    +      InputPort[] inputPorts = ports.toArray(new InputPort[]{});
    +
    +      name = subDAGName + MODULE_NAMESPACE_SEPARATOR + streamMeta.getName();
    +      StreamMeta streamMetaNew = this.addStream(name, sourceMeta.getPortObject(), inputPorts);
    +      streamMetaNew.setModuleName(streamMeta.getModuleName() == null ? subDAGName : subDAGName + "_" + streamMeta.getModuleName());
    --- End diff --
    
    Why the "_" separator?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-3 Module Support REBASE...

Posted by tushargosavi <gi...@git.apache.org>.
Github user tushargosavi commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/189#issuecomment-166531364
  
    @tweise I have pushed changes to your branch. waiting for travis to finish.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-3 Module Support REBASE...

Posted by tushargosavi <gi...@git.apache.org>.
Github user tushargosavi commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/189#discussion_r48167960
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -405,6 +413,7 @@ public void sendMetrics(Collection<String> metricNames)
         private String persistOperatorName;
         public Map<InputPortMeta, OperatorMeta> sinkSpecificPersistOperatorMap;
         public Map<InputPortMeta, InputPortMeta> sinkSpecificPersistInputPortMap;
    +    private String moduleName;  // Name of the module which has this stream. null if top level stream.
    --- End diff --
    
    Removed it 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-3 Module Support REBASE...

Posted by tushargosavi <gi...@git.apache.org>.
Github user tushargosavi commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/189#issuecomment-166447656
  
    I have made changes in my https://github.com/tushargosavi/incubator-apex-core/tree/pull189  branch. I will have to do force push in your branch because checkstyle changes are worked into original commits. Is it fine? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-3 Module Support REBASE...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/189#issuecomment-165904650
  
    @tushargosavi et al. There are 25 extra checkstyle violations after adding the commits from the feature branch. This branch is rebased already. Please start from here and fix the violations that were added through new code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-3 Module Support REBASE...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/189#discussion_r48163175
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -405,6 +413,7 @@ public void sendMetrics(Collection<String> metricNames)
         private String persistOperatorName;
         public Map<InputPortMeta, OperatorMeta> sinkSpecificPersistOperatorMap;
         public Map<InputPortMeta, InputPortMeta> sinkSpecificPersistInputPortMap;
    +    private String moduleName;  // Name of the module which has this stream. null if top level stream.
    --- End diff --
    
    Should not be required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-3 Module Support REBASE...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/189#issuecomment-166534898
  
    Merged, please resolve the associated JIRAs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-3 Module Support REBASE...

Posted by tushargosavi <gi...@git.apache.org>.
Github user tushargosavi commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/189#discussion_r48167974
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1116,13 +1320,78 @@ public StreamMeta addStream(String id)
       public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks)
       {
         StreamMeta s = addStream(id);
    -    s.setSource(source);
    -    for (Operator.InputPort<?> sink: sinks) {
    -      s.addSink(sink);
    +    id = s.id;
    +    ArrayListMultimap<Operator.OutputPort<?>, Operator.InputPort<?>> streamMap = ArrayListMultimap.create();
    +    if (!(source instanceof ProxyOutputPort)) {
    +      s.setSource(source);
    +    }
    +    for (Operator.InputPort<?> sink : sinks) {
    +      if (source instanceof ProxyOutputPort || sink instanceof ProxyInputPort) {
    +        streamMap.put(source, sink);
    +        streamLinks.put(id, streamMap);
    +      } else {
    +        if (s.getSource() == null) {
    +          s.setSource(source);
    +        }
    +        s.addSink(sink);
    +      }
         }
         return s;
       }
     
    +  /**
    +   * This will be called once the Logical Dag is expanded, and the proxy input and proxy output ports are populated with the actual ports that they refer to
    +   * This method adds sources and sinks for the StreamMeta objects which were left empty in the addStream call.
    +   */
    +  public void applyStreamLinks()
    +  {
    +    for (String id : streamLinks.keySet()) {
    +      StreamMeta s = getStream(id);
    +      for (Map.Entry<Operator.OutputPort<?>, Operator.InputPort<?>> pair : streamLinks.get(id).entries()) {
    +        if (s.getSource() == null) {
    +          Operator.OutputPort<?> outputPort = pair.getKey();
    +          while (outputPort instanceof ProxyOutputPort) {
    +            outputPort = ((ProxyOutputPort<?>)outputPort).get();
    +          }
    +          s.setSource(outputPort);
    +        }
    +
    +        Operator.InputPort<?> inputPort = pair.getValue();
    +        while (inputPort instanceof ProxyInputPort) {
    +          inputPort = ((ProxyInputPort<?>)inputPort).get();
    +        }
    +        s.addSink(inputPort);
    +      }
    +    }
    +  }
    +
    +  @SuppressWarnings({ "unchecked", "rawtypes" })
    +  private void addDAGToCurrentDAG(ModuleMeta moduleMeta)
    +  {
    +    LogicalPlan subDag = moduleMeta.getDag();
    +    String subDAGName = moduleMeta.getName();
    +    String name;
    +    for (OperatorMeta operatorMeta : subDag.getAllOperators()) {
    +      name = subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName();
    +      this.addOperator(name, operatorMeta.getOperator());
    +      OperatorMeta operatorMetaNew = this.getOperatorMeta(name);
    +      operatorMetaNew.setModuleName(operatorMeta.getModuleName() == null ? subDAGName : subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getModuleName());
    +    }
    +
    +    for (StreamMeta streamMeta : subDag.getAllStreams()) {
    +      OutputPortMeta sourceMeta = streamMeta.getSource();
    +      List<InputPort<?>> ports = new LinkedList<>();
    +      for (InputPortMeta inputPortMeta : streamMeta.getSinks()) {
    +        ports.add(inputPortMeta.getPortObject());
    +      }
    +      InputPort[] inputPorts = ports.toArray(new InputPort[]{});
    +
    +      name = subDAGName + MODULE_NAMESPACE_SEPARATOR + streamMeta.getName();
    +      StreamMeta streamMetaNew = this.addStream(name, sourceMeta.getPortObject(), inputPorts);
    +      streamMetaNew.setModuleName(streamMeta.getModuleName() == null ? subDAGName : subDAGName + "_" + streamMeta.getModuleName());
    --- End diff --
    
    Removed it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-3 Module Support REBASE...

Posted by tushargosavi <gi...@git.apache.org>.
Github user tushargosavi commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/189#issuecomment-166283164
  
    @tweise following commits fixes checkstyle violations.
    
    https://github.com/tushargosavi/incubator-apex-core/commit/7e211ce69a433fdc9bef72b3b0cba0a4fb7111b7
    
    I have these changes in APEXCORE-3 branch, with rebase on top of current apache/devel-3.
    https://github.com/tushargosavi/incubator-apex-core/tree/APEXCORE-3
    
    How to include this patch, Should I open a pull request against your devel-3 branch?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-3 Module Support REBASE...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/189#issuecomment-166354605
  
    CI build passes now. Please look over the 3 comments above. If there are any more changes, I would like to see those worked into the original commits. I will add you to this repo so you can push the changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---