You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by davidyan74 <gi...@git.apache.org> on 2015/12/10 00:54:20 UTC

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

GitHub user davidyan74 opened a pull request:

    https://github.com/apache/incubator-apex-core/pull/185

    APEXCORE-60 Iteration support in Apex Core

    Please review

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davidyan74/incubator-apex-core APEXCORE-60

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-core/pull/185.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #185
    
----
commit 3f42e2bc6c080a15d04339ef0fd6f23c40e7f5be
Author: David Yan <da...@datatorrent.com>
Date:   2015-12-09T23:52:26Z

    APEXCORE-60 Iteration support in Apex Core

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47731190
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -161,6 +161,13 @@
          */
         Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>());
     
    +    /**
    +     * Attribute of input port.
    +     * This is a read-only attribute to query whether the input port is connected to a DelayOperator
    +     * This is for iterative processing.
    +     */
    +    Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
    --- End diff --
    
    I chose to use an attribute because it will be available in child container (namely GenericNode.java).  If there is a way to get to the InputPortMeta in that code, I'd prefer to put it there but it seems that InputPortMeta is only available in stram.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48290556
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---
    @@ -277,6 +277,8 @@ public int getCount(boolean reset)
         gn.connectInputPort("ip1", reservoir1);
         gn.connectInputPort("ip2", reservoir2);
         gn.connectOutputPort("op", output);
    +    gn.firstWindowMillis = 0L;
    --- End diff --
    
    Fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48212355
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1495,6 +1509,11 @@ private void validateThreadLocal(OperatorMeta om) {
           return;
         }
     
    +    if (om.getOperator() instanceof Operator.DelayOperator) {
    +      String msg = String.format("Locality %s invalid for delay operator %s", Locality.THREAD_LOCAL, om);
    +      throw new ValidationException(msg);
    --- End diff --
    
    This is because if there is a THREAD_LOCAL for the loop, it would end up being an infinite recursive call within the thread.  However, it's technically possible to have THREAD_LOCAL for part of the cycle.  I have reopened this JIRA: https://issues.apache.org/jira/browse/APEXCORE-262.  We will tackle this in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48282574
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1495,6 +1509,11 @@ private void validateThreadLocal(OperatorMeta om) {
           return;
         }
     
    +    if (om.getOperator() instanceof Operator.DelayOperator) {
    +      String msg = String.format("Locality %s invalid for delay operator %s", Locality.THREAD_LOCAL, om);
    +      throw new ValidationException(msg);
    --- End diff --
    
    @davidyan74 : The thread_local should be valid till Delay Operator. So even if there is loop, the thread_local will be valid only delay operator as the output of delay operator has to go in next window.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47725228
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---
    @@ -327,8 +327,11 @@ public PhysicalPlan(LogicalPlan dag, PlanContext ctx) {
     
           boolean upstreamDeployed = true;
     
    -      for (StreamMeta s : n.getInputStreams().values()) {
    -        if (s.getSource() != null && !this.logicalToPTOperator.containsKey(s.getSource().getOperatorMeta())) {
    +      for (Map.Entry<InputPortMeta, StreamMeta> entry : n.getInputStreams().entrySet()) {
    +        InputPortMeta port = entry.getKey();
    +        StreamMeta s = entry.getValue();
    +        boolean delay = port.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR);
    --- End diff --
    
    I don't see `port` getting used anywhere else. Can we do `entry.getKey().getValue()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48774037
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---
    @@ -394,6 +463,15 @@ else if (tracker.ports[trackerIndex] == activePort) {
                   case END_STREAM:
                     activePort.remove();
                     buffers.remove();
    +                if (firstWindowId == -1) {
    +                  // this is for recovery from a checkpoint for DelayOperator
    +                  if (delay) {
    +                    // if it's a DelayOperator and this is the first RESET_WINDOW (start) or END_STREAM (recovery),
    +                    // fabricate the first window
    +                    fabricateFirstWindow((Operator.DelayOperator)operator, windowAhead);
    +                  }
    +                  firstWindowId = t.getWindowId();
    --- End diff --
    
    Again same Should this firstWindowId not be assigned when delay is true and firstWindowId == -1?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#issuecomment-169847906
  
    @gauravgopi123 @tweise Added recovery tests. Please let me know whether this is ready to be merged and if so, I'll squash some of the commits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47723719
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -161,6 +161,13 @@
          */
         Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>());
     
    +    /**
    +     * Attribute of input port.
    +     * This is a read-only attribute to query whether the input port is connected to a DelayOperator
    +     * This is for iterative processing.
    +     */
    +    Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
    --- End diff --
    
    Currently all the attributes for both input and output ports are under same umbrella which some times is confusing about which attribute applies to which type of port. Should we separate them out?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r49259156
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -1917,25 +1930,30 @@ public void updateRecoveryCheckpoints(PTOperator operator, UpdateCheckpointsCont
           long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
           maxCheckpoint = currentWindowId;
         }
    +    ctx.visited.add(operator);
     
         // DFS downstream operators
    -    for (PTOperator.PTOutput out : operator.getOutputs()) {
    -      for (PTOperator.PTInput sink : out.sinks) {
    -        PTOperator sinkOperator = sink.target;
    -        if (!ctx.visited.contains(sinkOperator)) {
    -          // downstream traversal
    -          updateRecoveryCheckpoints(sinkOperator, ctx);
    -        }
    -        // recovery window id cannot move backwards
    -        // when dynamically adding new operators
    -        if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId) {
    -          maxCheckpoint = Math.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint().windowId);
    -        }
    +    if (operator.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
    +      addVisited(operator, ctx);
    +    } else {
    --- End diff --
    
    @tweise I think I'm not doing this correctly and hence the out-of-sequence tuple in the unit test. My debugging indicates that the recovery checkpoints are not in sync for the operators that try to recover. Can you please review this and see what I'm doing wrong?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by sandeshh <gi...@git.apache.org>.
Github user sandeshh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48757578
  
    --- Diff: api/src/main/java/com/datatorrent/api/Operator.java ---
    @@ -99,6 +99,25 @@ public String toString()
       }
     
       /**
    +   * DelayOperator is an operator of which the outgoing streaming window id is incremented by *one* by the
    +   * engine, thus allowing loops in the "DAG". The output ports of a DelayOperator, if connected, *must*
    +   * immediately connect to an upstream operator in the data flow path. Note that at least one output port of
    +   * DelayOperator should be connected in order for the DelayOperator to serve its purpose.
    +   *
    +   * This is meant for iterative algorithms in the topology. A larger window increment can be simulated by an
    +   * implementation of this interface.
    +   */
    +  interface DelayOperator extends Operator
    +  {
    +    /**
    +     * This method gets called at the first window of the execution.
    +     * The implementation is expected to emit tuples for initialization and/or
    +     * recovery.
    +     */
    +    void firstWindow();
    --- End diff --
    
    Should this function have "Window ID" as an argument?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48221148
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---
    @@ -277,6 +277,8 @@ public int getCount(boolean reset)
         gn.connectInputPort("ip1", reservoir1);
         gn.connectInputPort("ip2", reservoir2);
         gn.connectOutputPort("op", output);
    +    gn.firstWindowMillis = 0L;
    --- End diff --
    
    387 line has firstWindowMillis initialized as 0 and here firstWindowMillis is initialized as 0L. Can we use same format?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47725745
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1613,6 +1636,37 @@ else if (stack.contains(successor)) {
         }
       }
     
    +  public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays)
    +  {
    +    stack.push(om);
    +
    +    // depth first successors traversal
    +    boolean isDelayOperator = om.getOperator() instanceof Operator.DelayOperator;
    +    if (isDelayOperator) {
    +      if (om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) != 1) {
    +        LOG.warn("detected DelayOperator having APPLICATION_WINDOW_COUNT not equal to 1");
    +        invalidDelays.add(Collections.singletonList(om.getName()));
    +      }
    +    }
    +
    +    for (StreamMeta downStream: om.outputStreams.values()) {
    +      for (InputPortMeta sink : downStream.sinks) {
    +        OperatorMeta successor = sink.getOperatorWrapper();
    +        if (isDelayOperator) {
    +          // Check whether all downstream operators are already visited in the path
    +          if (successor != null && !stack.contains(successor)) {
    +            LOG.warn("detected DelayOperator does not immediately output to a visited operator {}.{}->{}.{}",
    --- End diff --
    
    I would say yes.. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48784504
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---
    @@ -364,29 +413,49 @@ else if (!doCheckpoint) {
                       if (tracker.ports[trackerIndex] == null) {
                         tracker.ports[trackerIndex++] = activePort;
                         break;
    -                  }
    -                  else if (tracker.ports[trackerIndex] == activePort) {
    +                  } else if (tracker.ports[trackerIndex] == activePort) {
                         break;
                       }
     
                       trackerIndex++;
                     }
     
    -                if (trackerIndex == totalQueues) {
    -                  trackerIterator = resetTupleTracker.iterator();
    +                if (trackerIndex == regularQueues) {
    +                  Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator();
                       while (trackerIterator.hasNext()) {
                         if (trackerIterator.next().tuple.getBaseSeconds() <= baseSeconds) {
                           trackerIterator.remove();
                         }
                       }
    -                  for (int s = sinks.length; s-- > 0; ) {
    -                    sinks[s].put(t);
    +                  if (!delay) {
    +                    for (int s = sinks.length; s-- > 0; ) {
    +                      sinks[s].put(t);
    +                    }
    +                    controlTupleCount++;
                       }
    -                  controlTupleCount++;
    -
    -                  assert (activeQueues.isEmpty());
    -                  activeQueues.addAll(inputs.values());
    +                  if (!activeQueues.isEmpty()) {
    +                    // make sure they are all queues from DelayOperator
    +                    for (Map.Entry<String, SweepableReservoir> entry : activeQueues) {
    +                      if (!isInputPortConnectedToDelayOperator(entry.getKey())) {
    +                        assert (false);
    +                      }
    +                    }
    +                    activeQueues.clear();
    +                  }
    +                  activeQueues.addAll(inputs.entrySet());
                       expectingBeginWindow = activeQueues.size();
    +
    +                  if (firstWindowId == -1) {
    +                    if (delay) {
    +                      for (int s = sinks.length; s-- > 0; ) {
    +                        sinks[s].put(t);
    +                      }
    +                      // if it's a DelayOperator and this is the first RESET_WINDOW (start) or END_STREAM
    +                      // (recovery), fabricate the first window
    +                      fabricateFirstWindow((Operator.DelayOperator)operator, windowAhead);
    +                    }
    +                    firstWindowId = t.getWindowId();
    --- End diff --
    
    Even though firstWindowId is not used for now in the case of !delay, having firstWindow == -1 in case of !delay is also not ideal since it could be used in the future and can cause bugs because of confusion, and we are talking about a negligible assignment instruction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47780330
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -161,6 +161,13 @@
          */
         Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>());
     
    +    /**
    +     * Attribute of input port.
    +     * This is a read-only attribute to query whether the input port is connected to a DelayOperator
    +     * This is for iterative processing.
    +     */
    +    Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
    --- End diff --
    
    @davidyan74 : There are some internal Attributes defined in LogicalPlan. I think we should do the same for this as well. Define it as an internal attribute and use that. Only thing is that you now will have to first check if portContext.getValue() returns null or not


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47722554
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/SimpleDelayOperator.java ---
    @@ -0,0 +1,71 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.common.util;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +
    +/**
    + * SimpleDelayOperator
    + */
    +public class SimpleDelayOperator<T> implements Operator.DelayOperator
    --- End diff --
    
    What is the purpose of this class? I don't see any delay being implemented here...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47725142
  
    --- Diff: api/src/main/java/com/datatorrent/api/Operator.java ---
    @@ -99,6 +99,24 @@ public String toString()
       }
     
       /**
    +   * DelayOperator is an operator that increments the outgoing streaming window
    +   * id by one and allows loops in the "DAG". It is meant for iterative algorithms
    --- End diff --
    
    Yes, but not at the platform level.  An implementation of DelayOperator can implement offset that is greater than one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: DO NOT MERGE: APEXCORE-60 Iterat...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#issuecomment-174162667
  
    #212 fixes the original build failure. Now I see the second recovery test fail after first passes and that can be reproduced locally. I think we can close this and take up the remaining unit test issue as separately.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47727806
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---
    @@ -327,8 +327,11 @@ public PhysicalPlan(LogicalPlan dag, PlanContext ctx) {
     
           boolean upstreamDeployed = true;
     
    -      for (StreamMeta s : n.getInputStreams().values()) {
    -        if (s.getSource() != null && !this.logicalToPTOperator.containsKey(s.getSource().getOperatorMeta())) {
    +      for (Map.Entry<InputPortMeta, StreamMeta> entry : n.getInputStreams().entrySet()) {
    +        InputPortMeta port = entry.getKey();
    +        StreamMeta s = entry.getValue();
    +        boolean delay = port.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR);
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47724731
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1613,6 +1636,37 @@ else if (stack.contains(successor)) {
         }
       }
     
    +  public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays)
    +  {
    +    stack.push(om);
    +
    +    // depth first successors traversal
    +    boolean isDelayOperator = om.getOperator() instanceof Operator.DelayOperator;
    +    if (isDelayOperator) {
    +      if (om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) != 1) {
    +        LOG.warn("detected DelayOperator having APPLICATION_WINDOW_COUNT not equal to 1");
    --- End diff --
    
    Should it not be that APPLICATION_WINDOW_COUNT of delay operator can't be greater than the down stream operator??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47822097
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1503,6 +1517,16 @@ private void validateThreadLocal(OperatorMeta om) {
             throw new ValidationException(msg);
           }
     
    +      if (sm.source.operatorMeta.getOperator() instanceof Operator.DelayOperator) {
    +        String msg = String.format("Locality %s invalid for delay operator %s", Locality.THREAD_LOCAL, sm.source.operatorMeta);
    +        throw new ValidationException(msg);
    +      }
    +      if (om.getOperator() instanceof Operator.DelayOperator) {
    --- End diff --
    
    Should this be outside the `for` loop?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47783618
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/SimpleDelayOperator.java ---
    @@ -0,0 +1,59 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.common.util;
    +
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +
    +/**
    + * SimpleDelayOperator. This is an implementation of the DelayOperator that has one input port and one output
    + * port, and does a simple pass-through from the input port to the output port.  Subclass of this operator can
    + * override this behavior by overriding processTuple(T tuple).
    + *
    + * Since the firstWindow method does not do anything, using this operator as-is means data loss during recovery.  In
    + * order to achieve zero data loss during recovery, implementations must persist relevant tuples before the recovery
    + * checkpoint for emitting during the first window after recovery.
    + *
    + * Note that the engine will automatically does a +1 on the output window ID since it is a DelayOperator.
    + */
    +public class SimpleDelayOperator<T> extends BaseOperator implements Operator.DelayOperator
    --- End diff --
    
    Does it make sense to name it SimpleNoOpDeplayOperator?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48768048
  
    --- Diff: api/src/main/java/com/datatorrent/api/Operator.java ---
    @@ -99,6 +99,25 @@ public String toString()
       }
     
       /**
    +   * DelayOperator is an operator of which the outgoing streaming window id is incremented by *one* by the
    +   * engine, thus allowing loops in the "DAG". The output ports of a DelayOperator, if connected, *must*
    +   * immediately connect to an upstream operator in the data flow path. Note that at least one output port of
    +   * DelayOperator should be connected in order for the DelayOperator to serve its purpose.
    +   *
    +   * This is meant for iterative algorithms in the topology. A larger window increment can be simulated by an
    +   * implementation of this interface.
    +   */
    +  interface DelayOperator extends Operator
    +  {
    +    /**
    +     * This method gets called at the first window of the execution.
    +     * The implementation is expected to emit tuples for initialization and/or
    +     * recovery.
    +     */
    +    void firstWindow();
    --- End diff --
    
    We concluded that the window id was not needed since the implementation could easily get that by storing the window id as part of the checkpoint state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48211484
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1495,6 +1509,11 @@ private void validateThreadLocal(OperatorMeta om) {
           return;
         }
     
    +    if (om.getOperator() instanceof Operator.DelayOperator) {
    +      String msg = String.format("Locality %s invalid for delay operator %s", Locality.THREAD_LOCAL, om);
    +      throw new ValidationException(msg);
    --- End diff --
    
    @davidyan74 : Why can't delay operator have thread local locality?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47808323
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/SimpleDelayOperator.java ---
    @@ -0,0 +1,59 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.common.util;
    +
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +
    +/**
    + * SimpleDelayOperator. This is an implementation of the DelayOperator that has one input port and one output
    + * port, and does a simple pass-through from the input port to the output port.  Subclass of this operator can
    + * override this behavior by overriding processTuple(T tuple).
    + *
    + * Since the firstWindow method does not do anything, using this operator as-is means data loss during recovery.  In
    + * order to achieve zero data loss during recovery, implementations must persist relevant tuples before the recovery
    + * checkpoint for emitting during the first window after recovery.
    + *
    + * Note that the engine will automatically does a +1 on the output window ID since it is a DelayOperator.
    + */
    +public class SimpleDelayOperator<T> extends BaseOperator implements Operator.DelayOperator
    --- End diff --
    
    I think renaming it to SimpleNoOpDelayOperator may be a little strange since I expect it will be a base class for most DelayOperator implementations.  How about BaseDelayOperator?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47725499
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1613,6 +1636,37 @@ else if (stack.contains(successor)) {
         }
       }
     
    +  public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays)
    +  {
    +    stack.push(om);
    +
    +    // depth first successors traversal
    +    boolean isDelayOperator = om.getOperator() instanceof Operator.DelayOperator;
    +    if (isDelayOperator) {
    +      if (om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) != 1) {
    +        LOG.warn("detected DelayOperator having APPLICATION_WINDOW_COUNT not equal to 1");
    --- End diff --
    
    My understanding is that the APPLICATION_WINDOW_COUNT of delay operator should be independent of the downstream. Can you shed some lights on this since you have implemented sliding window and probably have better understanding than me in terms of windowing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48784528
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---
    @@ -394,6 +463,15 @@ else if (tracker.ports[trackerIndex] == activePort) {
                   case END_STREAM:
                     activePort.remove();
                     buffers.remove();
    +                if (firstWindowId == -1) {
    +                  // this is for recovery from a checkpoint for DelayOperator
    +                  if (delay) {
    +                    // if it's a DelayOperator and this is the first RESET_WINDOW (start) or END_STREAM (recovery),
    +                    // fabricate the first window
    +                    fabricateFirstWindow((Operator.DelayOperator)operator, windowAhead);
    +                  }
    +                  firstWindowId = t.getWindowId();
    --- End diff --
    
    See reply above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47727192
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -161,6 +161,13 @@
          */
         Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>());
     
    +    /**
    +     * Attribute of input port.
    +     * This is a read-only attribute to query whether the input port is connected to a DelayOperator
    +     * This is for iterative processing.
    +     */
    +    Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
    --- End diff --
    
    Separating them means we need to have separate InputPortContext and OutputPortContext.  I think that would be a different ticket altogether.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47725219
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/SimpleDelayOperator.java ---
    @@ -0,0 +1,71 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.common.util;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +
    +/**
    + * SimpleDelayOperator
    + */
    +public class SimpleDelayOperator<T> implements Operator.DelayOperator
    --- End diff --
    
    This is a pass-thru DelayOperator.  The actual delay is implemented in the engine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48773375
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---
    @@ -364,29 +413,49 @@ else if (!doCheckpoint) {
                       if (tracker.ports[trackerIndex] == null) {
                         tracker.ports[trackerIndex++] = activePort;
                         break;
    -                  }
    -                  else if (tracker.ports[trackerIndex] == activePort) {
    +                  } else if (tracker.ports[trackerIndex] == activePort) {
                         break;
                       }
     
                       trackerIndex++;
                     }
     
    -                if (trackerIndex == totalQueues) {
    -                  trackerIterator = resetTupleTracker.iterator();
    +                if (trackerIndex == regularQueues) {
    +                  Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator();
                       while (trackerIterator.hasNext()) {
                         if (trackerIterator.next().tuple.getBaseSeconds() <= baseSeconds) {
                           trackerIterator.remove();
                         }
                       }
    -                  for (int s = sinks.length; s-- > 0; ) {
    -                    sinks[s].put(t);
    +                  if (!delay) {
    +                    for (int s = sinks.length; s-- > 0; ) {
    +                      sinks[s].put(t);
    +                    }
    +                    controlTupleCount++;
                       }
    -                  controlTupleCount++;
    -
    -                  assert (activeQueues.isEmpty());
    -                  activeQueues.addAll(inputs.values());
    +                  if (!activeQueues.isEmpty()) {
    +                    // make sure they are all queues from DelayOperator
    +                    for (Map.Entry<String, SweepableReservoir> entry : activeQueues) {
    +                      if (!isInputPortConnectedToDelayOperator(entry.getKey())) {
    +                        assert (false);
    +                      }
    +                    }
    +                    activeQueues.clear();
    +                  }
    +                  activeQueues.addAll(inputs.entrySet());
                       expectingBeginWindow = activeQueues.size();
    +
    +                  if (firstWindowId == -1) {
    +                    if (delay) {
    +                      for (int s = sinks.length; s-- > 0; ) {
    +                        sinks[s].put(t);
    +                      }
    +                      // if it's a DelayOperator and this is the first RESET_WINDOW (start) or END_STREAM
    --- End diff --
    
    @davidyan74 : Do we need to send reset_tuple downstream? I don't we need it. If yes then do we not need to do controlTupleCount++;?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r49281400
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -1917,25 +1930,30 @@ public void updateRecoveryCheckpoints(PTOperator operator, UpdateCheckpointsCont
           long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
           maxCheckpoint = currentWindowId;
         }
    +    ctx.visited.add(operator);
     
         // DFS downstream operators
    -    for (PTOperator.PTOutput out : operator.getOutputs()) {
    -      for (PTOperator.PTInput sink : out.sinks) {
    -        PTOperator sinkOperator = sink.target;
    -        if (!ctx.visited.contains(sinkOperator)) {
    -          // downstream traversal
    -          updateRecoveryCheckpoints(sinkOperator, ctx);
    -        }
    -        // recovery window id cannot move backwards
    -        // when dynamically adding new operators
    -        if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId) {
    -          maxCheckpoint = Math.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint().windowId);
    -        }
    +    if (operator.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
    +      addVisited(operator, ctx);
    +    } else {
    --- End diff --
    
    @davidyan74 I have been working on making the recovery checkpoints the same in case of iteration or idempotent operators (APEXCORE-279). Please wait for the pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48784203
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---
    @@ -364,29 +413,49 @@ else if (!doCheckpoint) {
                       if (tracker.ports[trackerIndex] == null) {
                         tracker.ports[trackerIndex++] = activePort;
                         break;
    -                  }
    -                  else if (tracker.ports[trackerIndex] == activePort) {
    +                  } else if (tracker.ports[trackerIndex] == activePort) {
                         break;
                       }
     
                       trackerIndex++;
                     }
     
    -                if (trackerIndex == totalQueues) {
    -                  trackerIterator = resetTupleTracker.iterator();
    +                if (trackerIndex == regularQueues) {
    +                  Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator();
                       while (trackerIterator.hasNext()) {
                         if (trackerIterator.next().tuple.getBaseSeconds() <= baseSeconds) {
                           trackerIterator.remove();
                         }
                       }
    -                  for (int s = sinks.length; s-- > 0; ) {
    -                    sinks[s].put(t);
    +                  if (!delay) {
    +                    for (int s = sinks.length; s-- > 0; ) {
    +                      sinks[s].put(t);
    +                    }
    +                    controlTupleCount++;
                       }
    -                  controlTupleCount++;
    -
    -                  assert (activeQueues.isEmpty());
    -                  activeQueues.addAll(inputs.values());
    +                  if (!activeQueues.isEmpty()) {
    +                    // make sure they are all queues from DelayOperator
    +                    for (Map.Entry<String, SweepableReservoir> entry : activeQueues) {
    +                      if (!isInputPortConnectedToDelayOperator(entry.getKey())) {
    +                        assert (false);
    +                      }
    +                    }
    +                    activeQueues.clear();
    +                  }
    +                  activeQueues.addAll(inputs.entrySet());
                       expectingBeginWindow = activeQueues.size();
    +
    +                  if (firstWindowId == -1) {
    +                    if (delay) {
    +                      for (int s = sinks.length; s-- > 0; ) {
    +                        sinks[s].put(t);
    +                      }
    +                      // if it's a DelayOperator and this is the first RESET_WINDOW (start) or END_STREAM
    --- End diff --
    
    We do need to send RESET_TUPLE downstream, because it's the reset tuple that sets the upper 32 bit (baseSeconds) within the buffer server subscriber downstream.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: DO NOT MERGE: APEXCORE-60 Iterat...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 closed the pull request at:

    https://github.com/apache/incubator-apex-core/pull/185


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47724883
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1613,6 +1636,37 @@ else if (stack.contains(successor)) {
         }
       }
     
    +  public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays)
    +  {
    +    stack.push(om);
    +
    +    // depth first successors traversal
    +    boolean isDelayOperator = om.getOperator() instanceof Operator.DelayOperator;
    +    if (isDelayOperator) {
    +      if (om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) != 1) {
    +        LOG.warn("detected DelayOperator having APPLICATION_WINDOW_COUNT not equal to 1");
    +        invalidDelays.add(Collections.singletonList(om.getName()));
    +      }
    +    }
    +
    +    for (StreamMeta downStream: om.outputStreams.values()) {
    +      for (InputPortMeta sink : downStream.sinks) {
    +        OperatorMeta successor = sink.getOperatorWrapper();
    +        if (isDelayOperator) {
    +          // Check whether all downstream operators are already visited in the path
    +          if (successor != null && !stack.contains(successor)) {
    +            LOG.warn("detected DelayOperator does not immediately output to a visited operator {}.{}->{}.{}",
    --- End diff --
    
    should the log level be error as if there is invalid delay operator, validation throws ValidationException?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47820937
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1607,12 +1636,43 @@ else if (stack.contains(successor)) {
           }
           // strongly connected (cycle) if more than one node in stack
           if (connectedIds.size() > 1) {
    -        LOG.debug("detected cycle from node {}: {}", om.name, connectedIds);
    +        LOG.error("detected cycle from node {}: {}", om.name, connectedIds);
             cycles.add(connectedIds);
           }
         }
       }
     
    +  public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays)
    +  {
    +    stack.push(om);
    +
    +    // depth first successors traversal
    +    boolean isDelayOperator = om.getOperator() instanceof Operator.DelayOperator;
    +    if (isDelayOperator) {
    +      if (om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) != 1) {
    +        LOG.error("detected DelayOperator having APPLICATION_WINDOW_COUNT not equal to 1");
    +        invalidDelays.add(Collections.singletonList(om.getName()));
    +      }
    +    }
    +
    +    for (StreamMeta downStream: om.outputStreams.values()) {
    +      for (InputPortMeta sink : downStream.sinks) {
    +        OperatorMeta successor = sink.getOperatorWrapper();
    +        if (isDelayOperator) {
    +          // Check whether all downstream operators are already visited in the path
    +          if (successor != null && !stack.contains(successor)) {
    --- End diff --
    
    It works only when you have atleast one input operator, but in some scenarios it may not be needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#issuecomment-169851126
  
    don't merge it yet.  I'm taking a look at this unit test failure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47822836
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1503,6 +1517,16 @@ private void validateThreadLocal(OperatorMeta om) {
             throw new ValidationException(msg);
           }
     
    +      if (sm.source.operatorMeta.getOperator() instanceof Operator.DelayOperator) {
    +        String msg = String.format("Locality %s invalid for delay operator %s", Locality.THREAD_LOCAL, sm.source.operatorMeta);
    +        throw new ValidationException(msg);
    +      }
    +      if (om.getOperator() instanceof Operator.DelayOperator) {
    --- End diff --
    
    thanks, fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47822987
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1607,12 +1636,43 @@ else if (stack.contains(successor)) {
           }
           // strongly connected (cycle) if more than one node in stack
           if (connectedIds.size() > 1) {
    -        LOG.debug("detected cycle from node {}: {}", om.name, connectedIds);
    +        LOG.error("detected cycle from node {}: {}", om.name, connectedIds);
             cycles.add(connectedIds);
           }
         }
       }
     
    +  public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays)
    +  {
    +    stack.push(om);
    +
    +    // depth first successors traversal
    +    boolean isDelayOperator = om.getOperator() instanceof Operator.DelayOperator;
    +    if (isDelayOperator) {
    +      if (om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) != 1) {
    +        LOG.error("detected DelayOperator having APPLICATION_WINDOW_COUNT not equal to 1");
    +        invalidDelays.add(Collections.singletonList(om.getName()));
    +      }
    +    }
    +
    +    for (StreamMeta downStream: om.outputStreams.values()) {
    +      for (InputPortMeta sink : downStream.sinks) {
    +        OperatorMeta successor = sink.getOperatorWrapper();
    +        if (isDelayOperator) {
    +          // Check whether all downstream operators are already visited in the path
    +          if (successor != null && !stack.contains(successor)) {
    --- End diff --
    
    Yes, changing this would mean regular operators having a window generator.  We will target this in the next iteration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48099738
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java ---
    @@ -0,0 +1,57 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.common.util;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * DefaultDelayOperator. This is the version of BaseDelayOperator that provides no data loss during recovery. It
    + * incurs a run-time cost per tuple, and all tuples of the checkpoint window will be part of the checkpoint state.
    + * Therefore if your application can tolerate data loss at recovery, BaseDelayOperator should be used instead.
    + */
    +public class DefaultDelayOperator<T> extends BaseDelayOperator<T>
    --- End diff --
    
    WindowDataManager should have much better performance than this since it spreads the writing of the tuples throughout the entire window.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47782896
  
    --- Diff: api/src/main/java/com/datatorrent/api/Operator.java ---
    @@ -99,6 +99,24 @@ public String toString()
       }
     
       /**
    +   * DelayOperator is an operator that increments the outgoing streaming window
    +   * id by one and allows loops in the "DAG". It is meant for iterative algorithms
    --- End diff --
    
    I think we need to fix the java doc to reflect that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47782134
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -161,6 +161,13 @@
          */
         Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>());
     
    +    /**
    +     * Attribute of input port.
    +     * This is a read-only attribute to query whether the input port is connected to a DelayOperator
    +     * This is for iterative processing.
    +     */
    +    Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
    --- End diff --
    
    @davidyan74 : I checked the implementation of InputPortMeta.getValue() and looks like you don't need to check for the null..
    
    I made changes in my local branch (https://github.com/gauravgopi123/incubator-apex-core/commit/9f4cf5cd3e4895c268c1d76f436e4db2223bfd5d) and ran tests and it worked fine
    
    Does that work for you?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48773751
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---
    @@ -364,29 +413,49 @@ else if (!doCheckpoint) {
                       if (tracker.ports[trackerIndex] == null) {
                         tracker.ports[trackerIndex++] = activePort;
                         break;
    -                  }
    -                  else if (tracker.ports[trackerIndex] == activePort) {
    +                  } else if (tracker.ports[trackerIndex] == activePort) {
                         break;
                       }
     
                       trackerIndex++;
                     }
     
    -                if (trackerIndex == totalQueues) {
    -                  trackerIterator = resetTupleTracker.iterator();
    +                if (trackerIndex == regularQueues) {
    +                  Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator();
                       while (trackerIterator.hasNext()) {
                         if (trackerIterator.next().tuple.getBaseSeconds() <= baseSeconds) {
                           trackerIterator.remove();
                         }
                       }
    -                  for (int s = sinks.length; s-- > 0; ) {
    -                    sinks[s].put(t);
    +                  if (!delay) {
    +                    for (int s = sinks.length; s-- > 0; ) {
    +                      sinks[s].put(t);
    +                    }
    +                    controlTupleCount++;
                       }
    -                  controlTupleCount++;
    -
    -                  assert (activeQueues.isEmpty());
    -                  activeQueues.addAll(inputs.values());
    +                  if (!activeQueues.isEmpty()) {
    +                    // make sure they are all queues from DelayOperator
    +                    for (Map.Entry<String, SweepableReservoir> entry : activeQueues) {
    +                      if (!isInputPortConnectedToDelayOperator(entry.getKey())) {
    +                        assert (false);
    +                      }
    +                    }
    +                    activeQueues.clear();
    +                  }
    +                  activeQueues.addAll(inputs.entrySet());
                       expectingBeginWindow = activeQueues.size();
    +
    +                  if (firstWindowId == -1) {
    +                    if (delay) {
    +                      for (int s = sinks.length; s-- > 0; ) {
    +                        sinks[s].put(t);
    +                      }
    +                      // if it's a DelayOperator and this is the first RESET_WINDOW (start) or END_STREAM
    +                      // (recovery), fabricate the first window
    +                      fabricateFirstWindow((Operator.DelayOperator)operator, windowAhead);
    +                    }
    +                    firstWindowId = t.getWindowId();
    --- End diff --
    
    Should this `firstWindowId` not be assigned when `delay` is `true` and `firstWindowId == -1`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47722655
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/SimpleDelayOperator.java ---
    @@ -0,0 +1,71 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.common.util;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +
    +/**
    + * SimpleDelayOperator
    + */
    +public class SimpleDelayOperator<T> implements Operator.DelayOperator
    --- End diff --
    
    Can this be done as follows to make it cleaner?
    `public class SimpleDelayOperator<T> extends BaseOperator implements Operator.DelayOperator`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48895909
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---
    @@ -364,29 +413,49 @@ else if (!doCheckpoint) {
                       if (tracker.ports[trackerIndex] == null) {
                         tracker.ports[trackerIndex++] = activePort;
                         break;
    -                  }
    -                  else if (tracker.ports[trackerIndex] == activePort) {
    +                  } else if (tracker.ports[trackerIndex] == activePort) {
                         break;
                       }
     
                       trackerIndex++;
                     }
     
    -                if (trackerIndex == totalQueues) {
    -                  trackerIterator = resetTupleTracker.iterator();
    +                if (trackerIndex == regularQueues) {
    +                  Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator();
                       while (trackerIterator.hasNext()) {
                         if (trackerIterator.next().tuple.getBaseSeconds() <= baseSeconds) {
                           trackerIterator.remove();
                         }
                       }
    -                  for (int s = sinks.length; s-- > 0; ) {
    -                    sinks[s].put(t);
    +                  if (!delay) {
    +                    for (int s = sinks.length; s-- > 0; ) {
    +                      sinks[s].put(t);
    +                    }
    +                    controlTupleCount++;
                       }
    -                  controlTupleCount++;
    -
    -                  assert (activeQueues.isEmpty());
    -                  activeQueues.addAll(inputs.values());
    +                  if (!activeQueues.isEmpty()) {
    +                    // make sure they are all queues from DelayOperator
    +                    for (Map.Entry<String, SweepableReservoir> entry : activeQueues) {
    +                      if (!isInputPortConnectedToDelayOperator(entry.getKey())) {
    +                        assert (false);
    +                      }
    +                    }
    +                    activeQueues.clear();
    +                  }
    +                  activeQueues.addAll(inputs.entrySet());
                       expectingBeginWindow = activeQueues.size();
    +
    +                  if (firstWindowId == -1) {
    +                    if (delay) {
    +                      for (int s = sinks.length; s-- > 0; ) {
    +                        sinks[s].put(t);
    +                      }
    +                      // if it's a DelayOperator and this is the first RESET_WINDOW (start) or END_STREAM
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47725330
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -1907,6 +1920,12 @@ public void updateRecoveryCheckpoints(PTOperator operator, UpdateCheckpointsCont
         if (operator.getState() == PTOperator.State.ACTIVE && (ctx.currentTms - operator.stats.lastWindowIdChangeTms) > operator.stats.windowProcessingTimeoutMillis) {
           // if the checkpoint is ahead, then it is not blocked but waiting for activation (state-less recovery, at-most-once)
           if (ctx.committedWindowId.longValue() >= operator.getRecoveryCheckpoint().windowId) {
    +        LOG.info("Marking operator {} blocked: time since last windowId change={}, timeout={}, committed windowId={}, recovery windowId={}",
    --- End diff --
    
    This is actually merely adding the logging since I think the condition warrants it.  I can make this a warn if more appropriate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47725901
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java ---
    @@ -115,6 +115,142 @@ public void testCycleDetection() {
     
       }
     
    +  @Test
    +  public void testInvalidDelayDetection()
    +  {
    +    LogicalPlan dag = new LogicalPlan();
    +
    +    GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
    +    GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
    +    GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
    +    SimpleDelayOperator opDelay = dag.addOperator("opDelay", SimpleDelayOperator.class);
    +
    +    dag.addStream("BtoC", opB.outport1, opC.inport1);
    +    dag.addStream("CtoD", opC.outport1, opD.inport1);
    +    dag.addStream("CtoDelay", opC.outport2, opDelay.input);
    +    dag.addStream("DelayToD", opDelay.output, opD.inport2);
    +
    +    List<List<String>> invalidDelays = new ArrayList<>();
    +    dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
    +    assertEquals("operator invalid delay", 1, invalidDelays.size());
    +
    +    try {
    +      dag.validate();
    +      fail("validation should fail");
    +    } catch (ValidationException e) {
    +      // expected
    +    }
    +
    +    dag = new LogicalPlan();
    +
    +    opB = dag.addOperator("B", GenericTestOperator.class);
    +    opC = dag.addOperator("C", GenericTestOperator.class);
    +    opD = dag.addOperator("D", GenericTestOperator.class);
    +    opDelay = dag.addOperator("opDelay", SimpleDelayOperator.class);
    +    dag.setAttribute(opDelay, OperatorContext.APPLICATION_WINDOW_COUNT, 2);
    +    dag.addStream("BtoC", opB.outport1, opC.inport1);
    +    dag.addStream("CtoD", opC.outport1, opD.inport1);
    +    dag.addStream("CtoDelay", opC.outport2, opDelay.input);
    +    dag.addStream("DelayToC", opDelay.output, opC.inport2);
    +
    +    invalidDelays = new ArrayList<>();
    +    dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
    +    assertEquals("operator invalid delay", 1, invalidDelays.size());
    +
    +    try {
    +      dag.validate();
    +      fail("validation should fail");
    +    } catch (ValidationException e) {
    +      // expected
    +    }
    +
    +  }
    +
    +  @Test
    +  public void testIteration()
    +  {
    +    LogicalPlan dag = new LogicalPlan();
    +
    +    TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
    +    GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
    +    GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
    +    GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
    +    SimpleDelayOperator opDelay = dag.addOperator("opDelay", SimpleDelayOperator.class);
    +
    +    dag.addStream("AtoB", opA.outport, opB.inport1);
    +    dag.addStream("BtoC", opB.outport1, opC.inport1);
    +    dag.addStream("CtoD", opC.outport1, opD.inport1);
    +    dag.addStream("CtoDelay", opC.outport2, opDelay.input);
    +    dag.addStream("DelayToB", opDelay.output, opB.inport2);
    +
    +    try {
    +      final StramLocalCluster localCluster = new StramLocalCluster(dag);
    +      localCluster.runAsync();
    +      Thread.sleep(10000);
    +      localCluster.shutdown();
    +    } catch (InterruptedException ex) {
    +      // ignore
    +    } catch (Exception ex) {
    +      throw new RuntimeException(ex);
    +    }
    +  }
    +
    +  public static class FibonacciOperator extends BaseOperator
    +  {
    +    public long currentNumber = 1;
    +    private transient long tempNum;
    +    public transient DefaultInputPort<Object> dummyInputPort = new DefaultInputPort<Object>()
    +    {
    +      @Override
    +      public void process(Object tuple)
    +      {
    +      }
    +    };
    +    public transient DefaultInputPort<Long> input = new DefaultInputPort<Long>()
    +    {
    +      @Override
    +      public void process(Long tuple)
    +      {
    +        tempNum = tuple;
    +      }
    +    };
    +    public transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
    +
    +
    +    @Override
    +    public void endWindow()
    +    {
    +      output.emit(currentNumber);
    +      System.out.println("==============> " + currentNumber);
    +      currentNumber += tempNum;
    +    }
    +  }
    +
    +  @Test
    +  public void testFibonacci()
    +  {
    +    LogicalPlan dag = new LogicalPlan();
    +
    +    TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class);
    +    FibonacciOperator fib = dag.addOperator("FIB", FibonacciOperator.class);
    +    SimpleDelayOperator opDelay = dag.addOperator("opDelay", SimpleDelayOperator.class);
    +
    +    dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort);
    +    dag.addStream("operator_to_delay", fib.output, opDelay.input);
    +    dag.addStream("delay_to_operator", opDelay.output, fib.input);
    +
    +    try {
    +      final StramLocalCluster localCluster = new StramLocalCluster(dag);
    +      localCluster.runAsync();
    +      Thread.sleep(10000);
    --- End diff --
    
    Please implement proper test termination.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47725662
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1613,6 +1636,37 @@ else if (stack.contains(successor)) {
         }
       }
     
    +  public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays)
    +  {
    +    stack.push(om);
    +
    +    // depth first successors traversal
    +    boolean isDelayOperator = om.getOperator() instanceof Operator.DelayOperator;
    +    if (isDelayOperator) {
    +      if (om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) != 1) {
    +        LOG.warn("detected DelayOperator having APPLICATION_WINDOW_COUNT not equal to 1");
    +        invalidDelays.add(Collections.singletonList(om.getName()));
    +      }
    +    }
    +
    +    for (StreamMeta downStream: om.outputStreams.values()) {
    +      for (InputPortMeta sink : downStream.sinks) {
    +        OperatorMeta successor = sink.getOperatorWrapper();
    +        if (isDelayOperator) {
    +          // Check whether all downstream operators are already visited in the path
    +          if (successor != null && !stack.contains(successor)) {
    +            LOG.warn("detected DelayOperator does not immediately output to a visited operator {}.{}->{}.{}",
    --- End diff --
    
    It was actually a debug when detected cycles in existing code.  should we change both to error()? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47727089
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -1907,6 +1920,12 @@ public void updateRecoveryCheckpoints(PTOperator operator, UpdateCheckpointsCont
         if (operator.getState() == PTOperator.State.ACTIVE && (ctx.currentTms - operator.stats.lastWindowIdChangeTms) > operator.stats.windowProcessingTimeoutMillis) {
           // if the checkpoint is ahead, then it is not blocked but waiting for activation (state-less recovery, at-most-once)
           if (ctx.committedWindowId.longValue() >= operator.getRecoveryCheckpoint().windowId) {
    +        LOG.info("Marking operator {} blocked: time since last windowId change={}, timeout={}, committed windowId={}, recovery windowId={}",
    --- End diff --
    
    It is not related.  I can open another PR for logging such condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#issuecomment-163440971
  
    Would like to get feedback especially from @tweise @243826 @gauravgopi123 @PramodSSImmaneni and @vrozov 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47819162
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1607,12 +1636,43 @@ else if (stack.contains(successor)) {
           }
           // strongly connected (cycle) if more than one node in stack
           if (connectedIds.size() > 1) {
    -        LOG.debug("detected cycle from node {}: {}", om.name, connectedIds);
    +        LOG.error("detected cycle from node {}: {}", om.name, connectedIds);
             cycles.add(connectedIds);
           }
         }
       }
     
    +  public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays)
    +  {
    +    stack.push(om);
    +
    +    // depth first successors traversal
    +    boolean isDelayOperator = om.getOperator() instanceof Operator.DelayOperator;
    +    if (isDelayOperator) {
    +      if (om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) != 1) {
    +        LOG.error("detected DelayOperator having APPLICATION_WINDOW_COUNT not equal to 1");
    +        invalidDelays.add(Collections.singletonList(om.getName()));
    +      }
    +    }
    +
    +    for (StreamMeta downStream: om.outputStreams.values()) {
    +      for (InputPortMeta sink : downStream.sinks) {
    +        OperatorMeta successor = sink.getOperatorWrapper();
    +        if (isDelayOperator) {
    +          // Check whether all downstream operators are already visited in the path
    +          if (successor != null && !stack.contains(successor)) {
    --- End diff --
    
    But successor would be null, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48895870
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---
    @@ -212,30 +225,60 @@ public final void run()
         long spinMillis = context.getValue(OperatorContext.SPIN_MILLIS);
         final boolean handleIdleTime = operator instanceof IdleTimeHandler;
         int totalQueues = inputs.size();
    +    int regularQueues = totalQueues;
    +    // regularQueues is the number of queues that are not connected to a DelayOperator
    +    for (String portName : inputs.keySet()) {
    +      if (isInputPortConnectedToDelayOperator(portName)) {
    +        regularQueues--;
    +      }
    +    }
     
    -    ArrayList<SweepableReservoir> activeQueues = new ArrayList<SweepableReservoir>();
    -    activeQueues.addAll(inputs.values());
    +    ArrayList<Map.Entry<String, SweepableReservoir>> activeQueues = new ArrayList<>();
    +    activeQueues.addAll(inputs.entrySet());
     
         int expectingBeginWindow = activeQueues.size();
         int receivedEndWindow = 0;
    +    long firstWindowId = -1;
     
         TupleTracker tracker;
         LinkedList<TupleTracker> resetTupleTracker = new LinkedList<TupleTracker>();
    -
         try {
           do {
    -        Iterator<SweepableReservoir> buffers = activeQueues.iterator();
    +        Iterator<Map.Entry<String, SweepableReservoir>> buffers = activeQueues.iterator();
       activequeue:
             while (buffers.hasNext()) {
    -          SweepableReservoir activePort = buffers.next();
    +          Map.Entry<String, SweepableReservoir> activePortEntry = buffers.next();
    +          SweepableReservoir activePort = activePortEntry.getValue();
               Tuple t = activePort.sweep();
    +          boolean needResetWindow = false;
               if (t != null) {
    +            boolean delay = (operator instanceof Operator.DelayOperator);
    +            long windowAhead = 0;
    +            if (delay) {
    +              windowAhead = WindowGenerator.getAheadWindowId(t.getWindowId(), firstWindowMillis, windowWidthMillis, 1);
    +              if (WindowGenerator.getBaseSecondsFromWindowId(windowAhead) > t.getBaseSeconds()) {
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by amberarrow <gi...@git.apache.org>.
Github user amberarrow commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47788702
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/SimpleDelayOperator.java ---
    @@ -0,0 +1,59 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.common.util;
    +
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +
    +/**
    + * SimpleDelayOperator. This is an implementation of the DelayOperator that has one input port and one output
    + * port, and does a simple pass-through from the input port to the output port.  Subclass of this operator can
    + * override this behavior by overriding processTuple(T tuple).
    + *
    + * Since the firstWindow method does not do anything, using this operator as-is means data loss during recovery.  In
    + * order to achieve zero data loss during recovery, implementations must persist relevant tuples before the recovery
    + * checkpoint for emitting during the first window after recovery.
    + *
    + * Note that the engine will automatically does a +1 on the output window ID since it is a DelayOperator.
    --- End diff --
    
    Yes agree; I actually prefer "engine automatically increments the output window ID" but if we want a compact version I like your first alternative: "engine automatically does"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47810291
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -161,6 +161,13 @@
          */
         Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>());
     
    +    /**
    +     * Attribute of input port.
    +     * This is a read-only attribute to query whether the input port is connected to a DelayOperator
    +     * This is for iterative processing.
    +     */
    +    Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
    --- End diff --
    
    Thanks @gauravgopi123 .  I pulled your commit to the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47727794
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1613,6 +1636,37 @@ else if (stack.contains(successor)) {
         }
       }
     
    +  public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays)
    +  {
    +    stack.push(om);
    +
    +    // depth first successors traversal
    +    boolean isDelayOperator = om.getOperator() instanceof Operator.DelayOperator;
    +    if (isDelayOperator) {
    +      if (om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) != 1) {
    +        LOG.warn("detected DelayOperator having APPLICATION_WINDOW_COUNT not equal to 1");
    +        invalidDelays.add(Collections.singletonList(om.getName()));
    +      }
    +    }
    +
    +    for (StreamMeta downStream: om.outputStreams.values()) {
    +      for (InputPortMeta sink : downStream.sinks) {
    +        OperatorMeta successor = sink.getOperatorWrapper();
    +        if (isDelayOperator) {
    +          // Check whether all downstream operators are already visited in the path
    +          if (successor != null && !stack.contains(successor)) {
    +            LOG.warn("detected DelayOperator does not immediately output to a visited operator {}.{}->{}.{}",
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47810378
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/SimpleDelayOperator.java ---
    @@ -0,0 +1,59 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.common.util;
    +
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +
    +/**
    + * SimpleDelayOperator. This is an implementation of the DelayOperator that has one input port and one output
    + * port, and does a simple pass-through from the input port to the output port.  Subclass of this operator can
    + * override this behavior by overriding processTuple(T tuple).
    + *
    + * Since the firstWindow method does not do anything, using this operator as-is means data loss during recovery.  In
    + * order to achieve zero data loss during recovery, implementations must persist relevant tuples before the recovery
    + * checkpoint for emitting during the first window after recovery.
    + *
    + * Note that the engine will automatically does a +1 on the output window ID since it is a DelayOperator.
    + */
    +public class SimpleDelayOperator<T> extends BaseOperator implements Operator.DelayOperator
    --- End diff --
    
    Sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48219663
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---
    @@ -940,17 +942,17 @@ private void updateStreamMappings(PMapping m)
                     PTOperator slidingUnifier = StreamMapping.createSlidingUnifier(sourceOut.logicalStream, this,
                       sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT), slidingWindowCount);
                     StreamMapping.addInput(slidingUnifier, sourceOut, null);
    -                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0));
    +                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
                     sourceMapping.outputStreams.get(ipm.getValue().getSource()).slidingUnifiers.add(slidingUnifier);
                   }
                   else {
    -                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut);
    +                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
                   }
                   oper.inputs.add(input);
                 }
               }
             }
    -      } else {
    +      } else if (sourceMapping != null) {
    --- End diff --
    
    Why is this additional required? sourceMapping is used in `if` condition too where this check is not being made


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by sandeshh <gi...@git.apache.org>.
Github user sandeshh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47980510
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java ---
    @@ -0,0 +1,57 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.common.util;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * DefaultDelayOperator. This is the version of BaseDelayOperator that provides no data loss during recovery. It
    + * incurs a run-time cost per tuple, and all tuples of the checkpoint window will be part of the checkpoint state.
    + * Therefore if your application can tolerate data loss at recovery, BaseDelayOperator should be used instead.
    + */
    +public class DefaultDelayOperator<T> extends BaseDelayOperator<T>
    --- End diff --
    
    Do we need an implementation of the Delay Operator with windowDataManager ? Performance should be same for both the implementation. Only advantage I can think of for windowDataManager implementation is for delay of more than 1 window, it doesn't have to keep tuples of multiple windows in memory.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47809305
  
    --- Diff: api/src/main/java/com/datatorrent/api/Operator.java ---
    @@ -99,6 +99,24 @@ public String toString()
       }
     
       /**
    +   * DelayOperator is an operator that increments the outgoing streaming window
    +   * id by one and allows loops in the "DAG". It is meant for iterative algorithms
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47808468
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/SimpleDelayOperator.java ---
    @@ -0,0 +1,59 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.common.util;
    +
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +
    +/**
    + * SimpleDelayOperator. This is an implementation of the DelayOperator that has one input port and one output
    + * port, and does a simple pass-through from the input port to the output port.  Subclass of this operator can
    + * override this behavior by overriding processTuple(T tuple).
    + *
    + * Since the firstWindow method does not do anything, using this operator as-is means data loss during recovery.  In
    + * order to achieve zero data loss during recovery, implementations must persist relevant tuples before the recovery
    + * checkpoint for emitting during the first window after recovery.
    + *
    + * Note that the engine will automatically does a +1 on the output window ID since it is a DelayOperator.
    --- End diff --
    
    done.  need to brush up my grammar. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47723181
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -1907,6 +1920,12 @@ public void updateRecoveryCheckpoints(PTOperator operator, UpdateCheckpointsCont
         if (operator.getState() == PTOperator.State.ACTIVE && (ctx.currentTms - operator.stats.lastWindowIdChangeTms) > operator.stats.windowProcessingTimeoutMillis) {
           // if the checkpoint is ahead, then it is not blocked but waiting for activation (state-less recovery, at-most-once)
           if (ctx.committedWindowId.longValue() >= operator.getRecoveryCheckpoint().windowId) {
    +        LOG.info("Marking operator {} blocked: time since last windowId change={}, timeout={}, committed windowId={}, recovery windowId={}",
    --- End diff --
    
    What is the impact of an operator being blocked? Should log level be warn if it impacts performance?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47722258
  
    --- Diff: api/src/main/java/com/datatorrent/api/Operator.java ---
    @@ -99,6 +99,24 @@ public String toString()
       }
     
       /**
    +   * DelayOperator is an operator that increments the outgoing streaming window
    +   * id by one and allows loops in the "DAG". It is meant for iterative algorithms
    --- End diff --
    
    can delay not be more than 1 window?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48285212
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1495,6 +1509,11 @@ private void validateThreadLocal(OperatorMeta om) {
           return;
         }
     
    +    if (om.getOperator() instanceof Operator.DelayOperator) {
    +      String msg = String.format("Locality %s invalid for delay operator %s", Locality.THREAD_LOCAL, om);
    +      throw new ValidationException(msg);
    --- End diff --
    
    Yes, but since the DelayOperator can actually go back to itself, this will have some complication.  We will address this in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48784895
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---
    @@ -364,29 +413,49 @@ else if (!doCheckpoint) {
                       if (tracker.ports[trackerIndex] == null) {
                         tracker.ports[trackerIndex++] = activePort;
                         break;
    -                  }
    -                  else if (tracker.ports[trackerIndex] == activePort) {
    +                  } else if (tracker.ports[trackerIndex] == activePort) {
                         break;
                       }
     
                       trackerIndex++;
                     }
     
    -                if (trackerIndex == totalQueues) {
    -                  trackerIterator = resetTupleTracker.iterator();
    +                if (trackerIndex == regularQueues) {
    +                  Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator();
                       while (trackerIterator.hasNext()) {
                         if (trackerIterator.next().tuple.getBaseSeconds() <= baseSeconds) {
                           trackerIterator.remove();
                         }
                       }
    -                  for (int s = sinks.length; s-- > 0; ) {
    -                    sinks[s].put(t);
    +                  if (!delay) {
    +                    for (int s = sinks.length; s-- > 0; ) {
    +                      sinks[s].put(t);
    +                    }
    +                    controlTupleCount++;
                       }
    -                  controlTupleCount++;
    -
    -                  assert (activeQueues.isEmpty());
    -                  activeQueues.addAll(inputs.values());
    +                  if (!activeQueues.isEmpty()) {
    +                    // make sure they are all queues from DelayOperator
    +                    for (Map.Entry<String, SweepableReservoir> entry : activeQueues) {
    +                      if (!isInputPortConnectedToDelayOperator(entry.getKey())) {
    +                        assert (false);
    +                      }
    +                    }
    +                    activeQueues.clear();
    +                  }
    +                  activeQueues.addAll(inputs.entrySet());
                       expectingBeginWindow = activeQueues.size();
    +
    +                  if (firstWindowId == -1) {
    +                    if (delay) {
    +                      for (int s = sinks.length; s-- > 0; ) {
    +                        sinks[s].put(t);
    +                      }
    +                      // if it's a DelayOperator and this is the first RESET_WINDOW (start) or END_STREAM
    --- End diff --
    
    do we not need to do controlTupleCount++ as happening in `if` loop at line 430?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r49414079
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -1917,25 +1930,30 @@ public void updateRecoveryCheckpoints(PTOperator operator, UpdateCheckpointsCont
           long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
           maxCheckpoint = currentWindowId;
         }
    +    ctx.visited.add(operator);
     
         // DFS downstream operators
    -    for (PTOperator.PTOutput out : operator.getOutputs()) {
    -      for (PTOperator.PTInput sink : out.sinks) {
    -        PTOperator sinkOperator = sink.target;
    -        if (!ctx.visited.contains(sinkOperator)) {
    -          // downstream traversal
    -          updateRecoveryCheckpoints(sinkOperator, ctx);
    -        }
    -        // recovery window id cannot move backwards
    -        // when dynamically adding new operators
    -        if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId) {
    -          maxCheckpoint = Math.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint().windowId);
    -        }
    +    if (operator.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
    +      addVisited(operator, ctx);
    +    } else {
    --- End diff --
    
    It's not working because the recovery checkpoint of the operator where the delay loop joins can be older than the downstream operators. Therefore, when traversing the loop, upstream checkpoints needs to be taken into consideration, which is part of the broader solution Pramod refers to. Looking into this further, would like to clean up the special case handling for delay operator also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47729009
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -161,6 +161,13 @@
          */
         Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>());
     
    +    /**
    +     * Attribute of input port.
    +     * This is a read-only attribute to query whether the input port is connected to a DelayOperator
    +     * This is for iterative processing.
    +     */
    +    Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
    --- End diff --
    
    Should this be even an attribute? This set internally by platform so why expose it as an attribute..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: DO NOT MERGE: APEXCORE-60 Iterat...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#issuecomment-174125475
  
    I'm keeping this open because I'm using this PR to for recovery testing in Travis since those tests only fail within Travis. The PR that is supposed to be merged is here: https://github.com/apache/incubator-apex-core/pull/211


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47729596
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java ---
    @@ -115,6 +115,142 @@ public void testCycleDetection() {
     
       }
     
    +  @Test
    +  public void testInvalidDelayDetection()
    +  {
    +    LogicalPlan dag = new LogicalPlan();
    +
    +    GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
    +    GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
    +    GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
    +    SimpleDelayOperator opDelay = dag.addOperator("opDelay", SimpleDelayOperator.class);
    +
    +    dag.addStream("BtoC", opB.outport1, opC.inport1);
    +    dag.addStream("CtoD", opC.outport1, opD.inport1);
    +    dag.addStream("CtoDelay", opC.outport2, opDelay.input);
    +    dag.addStream("DelayToD", opDelay.output, opD.inport2);
    +
    +    List<List<String>> invalidDelays = new ArrayList<>();
    +    dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
    +    assertEquals("operator invalid delay", 1, invalidDelays.size());
    +
    +    try {
    +      dag.validate();
    +      fail("validation should fail");
    +    } catch (ValidationException e) {
    +      // expected
    +    }
    +
    +    dag = new LogicalPlan();
    +
    +    opB = dag.addOperator("B", GenericTestOperator.class);
    +    opC = dag.addOperator("C", GenericTestOperator.class);
    +    opD = dag.addOperator("D", GenericTestOperator.class);
    +    opDelay = dag.addOperator("opDelay", SimpleDelayOperator.class);
    +    dag.setAttribute(opDelay, OperatorContext.APPLICATION_WINDOW_COUNT, 2);
    +    dag.addStream("BtoC", opB.outport1, opC.inport1);
    +    dag.addStream("CtoD", opC.outport1, opD.inport1);
    +    dag.addStream("CtoDelay", opC.outport2, opDelay.input);
    +    dag.addStream("DelayToC", opDelay.output, opC.inport2);
    +
    +    invalidDelays = new ArrayList<>();
    +    dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
    +    assertEquals("operator invalid delay", 1, invalidDelays.size());
    +
    +    try {
    +      dag.validate();
    +      fail("validation should fail");
    +    } catch (ValidationException e) {
    +      // expected
    +    }
    +
    +  }
    +
    +  @Test
    +  public void testIteration()
    +  {
    +    LogicalPlan dag = new LogicalPlan();
    +
    +    TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
    +    GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
    +    GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
    +    GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
    +    SimpleDelayOperator opDelay = dag.addOperator("opDelay", SimpleDelayOperator.class);
    +
    +    dag.addStream("AtoB", opA.outport, opB.inport1);
    +    dag.addStream("BtoC", opB.outport1, opC.inport1);
    +    dag.addStream("CtoD", opC.outport1, opD.inport1);
    +    dag.addStream("CtoDelay", opC.outport2, opDelay.input);
    +    dag.addStream("DelayToB", opDelay.output, opB.inport2);
    +
    +    try {
    +      final StramLocalCluster localCluster = new StramLocalCluster(dag);
    +      localCluster.runAsync();
    +      Thread.sleep(10000);
    +      localCluster.shutdown();
    +    } catch (InterruptedException ex) {
    +      // ignore
    +    } catch (Exception ex) {
    +      throw new RuntimeException(ex);
    +    }
    +  }
    +
    +  public static class FibonacciOperator extends BaseOperator
    +  {
    +    public long currentNumber = 1;
    +    private transient long tempNum;
    +    public transient DefaultInputPort<Object> dummyInputPort = new DefaultInputPort<Object>()
    +    {
    +      @Override
    +      public void process(Object tuple)
    +      {
    +      }
    +    };
    +    public transient DefaultInputPort<Long> input = new DefaultInputPort<Long>()
    +    {
    +      @Override
    +      public void process(Long tuple)
    +      {
    +        tempNum = tuple;
    +      }
    +    };
    +    public transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
    +
    +
    +    @Override
    +    public void endWindow()
    +    {
    +      output.emit(currentNumber);
    +      System.out.println("==============> " + currentNumber);
    +      currentNumber += tempNum;
    +    }
    +  }
    +
    +  @Test
    +  public void testFibonacci()
    +  {
    +    LogicalPlan dag = new LogicalPlan();
    +
    +    TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class);
    +    FibonacciOperator fib = dag.addOperator("FIB", FibonacciOperator.class);
    +    SimpleDelayOperator opDelay = dag.addOperator("opDelay", SimpleDelayOperator.class);
    +
    +    dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort);
    +    dag.addStream("operator_to_delay", fib.output, opDelay.input);
    +    dag.addStream("delay_to_operator", opDelay.output, fib.input);
    +
    +    try {
    +      final StramLocalCluster localCluster = new StramLocalCluster(dag);
    +      localCluster.runAsync();
    +      Thread.sleep(10000);
    --- End diff --
    
    Done. Please review the new test cases


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48897460
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---
    @@ -343,12 +389,15 @@ else if (!doCheckpoint) {
                      * we will receive tuples which are equal to the number of input streams.
                      */
                     activePort.remove();
    -                buffers.remove();
    +                if (isInputPortConnectedToDelayOperator(activePortEntry.getKey())) {
    +                  break; // breaking out of the switch/case
    +                }
     
    +                buffers.remove();
    --- End diff --
    
    if ```buffers.remove()``` is before the ```if``` condition, the port will not be considered as active any more for this window, which may already have started since the other ports may have sent BEGIN_WINDOW before we get the RESET_WINDOW from the port connected to the delay operator.  In that case, the operator would get stuck.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48768935
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---
    @@ -212,30 +225,60 @@ public final void run()
         long spinMillis = context.getValue(OperatorContext.SPIN_MILLIS);
         final boolean handleIdleTime = operator instanceof IdleTimeHandler;
         int totalQueues = inputs.size();
    +    int regularQueues = totalQueues;
    +    // regularQueues is the number of queues that are not connected to a DelayOperator
    +    for (String portName : inputs.keySet()) {
    +      if (isInputPortConnectedToDelayOperator(portName)) {
    +        regularQueues--;
    +      }
    +    }
     
    -    ArrayList<SweepableReservoir> activeQueues = new ArrayList<SweepableReservoir>();
    -    activeQueues.addAll(inputs.values());
    +    ArrayList<Map.Entry<String, SweepableReservoir>> activeQueues = new ArrayList<>();
    +    activeQueues.addAll(inputs.entrySet());
     
         int expectingBeginWindow = activeQueues.size();
         int receivedEndWindow = 0;
    +    long firstWindowId = -1;
     
         TupleTracker tracker;
         LinkedList<TupleTracker> resetTupleTracker = new LinkedList<TupleTracker>();
    -
         try {
           do {
    -        Iterator<SweepableReservoir> buffers = activeQueues.iterator();
    +        Iterator<Map.Entry<String, SweepableReservoir>> buffers = activeQueues.iterator();
       activequeue:
             while (buffers.hasNext()) {
    -          SweepableReservoir activePort = buffers.next();
    +          Map.Entry<String, SweepableReservoir> activePortEntry = buffers.next();
    +          SweepableReservoir activePort = activePortEntry.getValue();
               Tuple t = activePort.sweep();
    +          boolean needResetWindow = false;
               if (t != null) {
    +            boolean delay = (operator instanceof Operator.DelayOperator);
    +            long windowAhead = 0;
    +            if (delay) {
    +              windowAhead = WindowGenerator.getAheadWindowId(t.getWindowId(), firstWindowMillis, windowWidthMillis, 1);
    +              if (WindowGenerator.getBaseSecondsFromWindowId(windowAhead) > t.getBaseSeconds()) {
    --- End diff --
    
    Why is this `if` condition needed here? needResetWindow is only used in BEGIN_WINDOW tuple type (line 271). Have this `if` condition there itself


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47725733
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/SimpleDelayOperator.java ---
    @@ -0,0 +1,71 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.common.util;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +
    +/**
    + * SimpleDelayOperator
    + */
    +public class SimpleDelayOperator<T> implements Operator.DelayOperator
    --- End diff --
    
    Yes, good idea.  I'll change it to 
    ```
    public class SimpleDelayOperator<T> extends BaseOperator implements Operator.DelayOperator
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47783467
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/SimpleDelayOperator.java ---
    @@ -0,0 +1,59 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.common.util;
    +
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +
    +/**
    + * SimpleDelayOperator. This is an implementation of the DelayOperator that has one input port and one output
    + * port, and does a simple pass-through from the input port to the output port.  Subclass of this operator can
    + * override this behavior by overriding processTuple(T tuple).
    + *
    + * Since the firstWindow method does not do anything, using this operator as-is means data loss during recovery.  In
    + * order to achieve zero data loss during recovery, implementations must persist relevant tuples before the recovery
    + * checkpoint for emitting during the first window after recovery.
    + *
    + * Note that the engine will automatically does a +1 on the output window ID since it is a DelayOperator.
    --- End diff --
    
    Should `engine will automatically does` be rephrased to `engine automatically does` or `engine will automatically do`? 
    @amberarrow: any thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by sandeshh <gi...@git.apache.org>.
Github user sandeshh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48100056
  
    --- Diff: common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java ---
    @@ -0,0 +1,57 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.common.util;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * DefaultDelayOperator. This is the version of BaseDelayOperator that provides no data loss during recovery. It
    + * incurs a run-time cost per tuple, and all tuples of the checkpoint window will be part of the checkpoint state.
    + * Therefore if your application can tolerate data loss at recovery, BaseDelayOperator should be used instead.
    + */
    +public class DefaultDelayOperator<T> extends BaseDelayOperator<T>
    --- End diff --
    
    Writing the tuples is done in the EndWindow when using WindowDataManager, so the performance should be same for both.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47818666
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1607,12 +1636,43 @@ else if (stack.contains(successor)) {
           }
           // strongly connected (cycle) if more than one node in stack
           if (connectedIds.size() > 1) {
    -        LOG.debug("detected cycle from node {}: {}", om.name, connectedIds);
    +        LOG.error("detected cycle from node {}: {}", om.name, connectedIds);
             cycles.add(connectedIds);
           }
         }
       }
     
    +  public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays)
    +  {
    +    stack.push(om);
    +
    +    // depth first successors traversal
    +    boolean isDelayOperator = om.getOperator() instanceof Operator.DelayOperator;
    +    if (isDelayOperator) {
    +      if (om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) != 1) {
    +        LOG.error("detected DelayOperator having APPLICATION_WINDOW_COUNT not equal to 1");
    +        invalidDelays.add(Collections.singletonList(om.getName()));
    +      }
    +    }
    +
    +    for (StreamMeta downStream: om.outputStreams.values()) {
    +      for (InputPortMeta sink : downStream.sinks) {
    +        OperatorMeta successor = sink.getOperatorWrapper();
    +        if (isDelayOperator) {
    +          // Check whether all downstream operators are already visited in the path
    +          if (successor != null && !stack.contains(successor)) {
    --- End diff --
    
    I have a dag A->D where A is an operator and D is delay Operator. This is valid dag. If findInvalidDelays is called with D's meta then !stack.contains(successor) will fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48777786
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---
    @@ -343,12 +389,15 @@ else if (!doCheckpoint) {
                      * we will receive tuples which are equal to the number of input streams.
                      */
                     activePort.remove();
    -                buffers.remove();
    +                if (isInputPortConnectedToDelayOperator(activePortEntry.getKey())) {
    +                  break; // breaking out of the switch/case
    +                }
     
    +                buffers.remove();
    --- End diff --
    
    Should this not be before `if` condition above?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r47725699
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---
    @@ -1907,6 +1920,12 @@ public void updateRecoveryCheckpoints(PTOperator operator, UpdateCheckpointsCont
         if (operator.getState() == PTOperator.State.ACTIVE && (ctx.currentTms - operator.stats.lastWindowIdChangeTms) > operator.stats.windowProcessingTimeoutMillis) {
           // if the checkpoint is ahead, then it is not blocked but waiting for activation (state-less recovery, at-most-once)
           if (ctx.committedWindowId.longValue() >= operator.getRecoveryCheckpoint().windowId) {
    +        LOG.info("Marking operator {} blocked: time since last windowId change={}, timeout={}, committed windowId={}, recovery windowId={}",
    --- End diff --
    
    How is this related to iteration support? Looks like some code was moved around.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEXCORE-60 Iteration support in...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/185#discussion_r48290536
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---
    @@ -940,17 +942,17 @@ private void updateStreamMappings(PMapping m)
                     PTOperator slidingUnifier = StreamMapping.createSlidingUnifier(sourceOut.logicalStream, this,
                       sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT), slidingWindowCount);
                     StreamMapping.addInput(slidingUnifier, sourceOut, null);
    -                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0));
    +                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
                     sourceMapping.outputStreams.get(ipm.getValue().getSource()).slidingUnifiers.add(slidingUnifier);
                   }
                   else {
    -                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut);
    +                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
                   }
                   oper.inputs.add(input);
                 }
               }
             }
    -      } else {
    +      } else if (sourceMapping != null) {
    --- End diff --
    
    Fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---