You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by PramodSSImmaneni <gi...@git.apache.org> on 2015/11/16 00:25:49 UTC

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

GitHub user PramodSSImmaneni opened a pull request:

    https://github.com/apache/incubator-apex-core/pull/168

    Providing a way for operator to check how many windows till checkpoint

    APEX-246 implementation

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/PramodSSImmaneni/incubator-apex-core checkpoint-distance

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-core/pull/168.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #168
    
----
commit 5897ac155455a514224eb422ab68ab4deb013e5e
Author: Pramod Immaneni <pr...@datatorrent.com>
Date:   2015-11-12T17:59:34Z

    Providing a way for operator to check how many windows till checkpoint, #comment APEX-246

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r45008466
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -301,6 +301,12 @@
          */
         int getId();
     
    +    /**
    +     * Return the number of windows before the next checkpoint including the current window.
    +     * @return Number of windows from checkpoint, 1 if the checkpoint will be after the current window
    +     */
    +    int getWindowsFromCheckpoint();
    --- End diff --
    
    Adding a method to an interface is not backwards compatible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r45105004
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -301,6 +301,12 @@
          */
         int getId();
     
    +    /**
    +     * Return the number of windows before the next checkpoint including the current window.
    +     * @return Number of windows from checkpoint, 1 if the checkpoint will be after the current window
    +     */
    +    int getWindowsFromCheckpoint();
    --- End diff --
    
    I think using an attribute is odd since this attribute would change all the time (every single window), as opposed to other attributes, which are mostly constant throughout the application execution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r45093254
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/Node.java ---
    @@ -600,6 +605,19 @@ public void activate()
           CHECKPOINT_WINDOW_COUNT = 1;
         }
     
    +    int dagChkptWndwCnt = context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT);
    +    if (PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
    +      int chkOffset = dagChkptWndwCnt % CHECKPOINT_WINDOW_COUNT;
    +      if (chkOffset != 0) {
    +        EFFECTIVE_CHECKPOINT_WINDOW_COUNT = dagChkptWndwCnt + CHECKPOINT_WINDOW_COUNT - chkOffset;
    +      } else {
    +        EFFECTIVE_CHECKPOINT_WINDOW_COUNT = dagChkptWndwCnt;
    +      }
    +    } else {
    +      EFFECTIVE_CHECKPOINT_WINDOW_COUNT = 1;
    +    }
    --- End diff --
    
    @ilooner Normally windowsFromCheckpoint does not have to be saved as the operator is going to recover at a checkpoint. However I think we may have to save a few other things like checkpointWindowCount as well because we are saying that we are going to checkpoint at a window divisible by the operator checkpoint window count and this count should be from the beginning of the application and not necessarily a restart. I will look at this comprehensively.
    
    Regarding your second point you could at the beginWindow check the windows from checkpoint and use that information later. Are you thinking about cases where application window spans multiple streaming window? In the first phase implementation of iteration support we had talked about limiting it to cases where application window was same as streaming window if you remember. In future when we add support for full application windows we will need the callbacks you are suggesting so we know when the streaming window starts so as to be able to not only save but inject those tuples at the right streaming window for ingestion. However you will still need to that how far the streaming window is from the checkpoint and you will need a method like this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#issuecomment-157227774
  
    Also fault tolerance was not considered. windowsFromCheckpoint must be stored as part of the checkpoint state like applicationWindowCount and restored to the node on recovery like applicationWindowCount. Please see StreamingContainer line 896 to see how applicationWindowCount is restored to the node on recovery.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#issuecomment-177265022
  
    What happened to this one. Should I be opening this against master for 3.4



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r45089515
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/Node.java ---
    @@ -600,6 +605,19 @@ public void activate()
           CHECKPOINT_WINDOW_COUNT = 1;
         }
     
    +    int dagChkptWndwCnt = context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT);
    +    if (PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
    +      int chkOffset = dagChkptWndwCnt % CHECKPOINT_WINDOW_COUNT;
    +      if (chkOffset != 0) {
    +        EFFECTIVE_CHECKPOINT_WINDOW_COUNT = dagChkptWndwCnt + CHECKPOINT_WINDOW_COUNT - chkOffset;
    +      } else {
    +        EFFECTIVE_CHECKPOINT_WINDOW_COUNT = dagChkptWndwCnt;
    +      }
    +    } else {
    +      EFFECTIVE_CHECKPOINT_WINDOW_COUNT = 1;
    +    }
    +    context.setWindowsFromCheckpoint(EFFECTIVE_CHECKPOINT_WINDOW_COUNT);
    +
    --- End diff --
    
    Agreed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r45089397
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -301,6 +301,12 @@
          */
         int getId();
     
    +    /**
    +     * Return the number of windows before the next checkpoint including the current window.
    +     * @return Number of windows from checkpoint, 1 if the checkpoint will be after the current window
    +     */
    +    int getWindowsFromCheckpoint();
    --- End diff --
    
    @gauravgopi123 I had debated 0 or 1 myself. The checkpoint is not in current window but immediately after the end of the window so for the user I thought it would be more intuitive to go with 1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r46322070
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---
    @@ -79,17 +87,32 @@ public void endWindow()
         @Override
         public void setup(Context.OperatorContext context)
         {
    -      throw new UnsupportedOperationException("Not supported yet.");
    +      this.context = context;
         }
     
         @Override
         public void teardown()
         {
    -      throw new UnsupportedOperationException("Not supported yet.");
         }
     
       }
     
    +  public static class CheckpointDistanceOperator extends GenericOperator {
    --- End diff --
    
    Open brace on a definition should be on a new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-core/pull/168


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r46321968
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---
    @@ -296,4 +319,105 @@ public void run()
         Assert.assertTrue("End window not called", go.endWindowId != go.beginWindowId);
       }
     
    +  @Test
    +  public void testDefaultCheckPointDistance() throws InterruptedException
    +  {
    +    testCheckpointDistance(Context.DAGContext.CHECKPOINT_WINDOW_COUNT.defaultValue, Context.OperatorContext.CHECKPOINT_WINDOW_COUNT.defaultValue);
    +  }
    +
    +  @Test
    +  public void testDAGGreaterCheckPointDistance() throws InterruptedException
    +  {
    +    testCheckpointDistance(7, 5);
    +  }
    +
    +  @Test
    +  public void testOpGreaterCheckPointDistance() throws InterruptedException
    +  {
    +    testCheckpointDistance(3, 5);
    +  }
    +
    +  private void testCheckpointDistance(int dagCheckPoint, int opCheckPoint) throws InterruptedException
    +  {
    +    int windowWidth = 50;
    +    long sleeptime = 25L;
    +    int maxWindows = 60;
    +    // Adding some extra time for the windows to finish
    +    long maxSleep = windowWidth * maxWindows + 5000;
    +
    +    ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, "default");
    +    final WindowGenerator windowGenerator = new WindowGenerator(executorService, 1024);
    +    windowGenerator.setWindowWidth(windowWidth);
    +    windowGenerator.setFirstWindow(executorService.getCurrentTimeMillis());
    +    windowGenerator.setCheckpointCount(dagCheckPoint, 0);
    +    //GenericOperator go = new GenericOperator();
    +    CheckpointDistanceOperator go = new CheckpointDistanceOperator();
    +    go.maxWindows = maxWindows;
    +
    +    List<Integer> checkpoints = new ArrayList<Integer>();
    +
    +    int window = 0;
    +    while (window < maxWindows) {
    +      window = (int)Math.ceil((double)(window + 1)/dagCheckPoint) * dagCheckPoint;
    +      window = (int)Math.ceil((double)window/opCheckPoint) * opCheckPoint;
    +      checkpoints.add(window);
    +    }
    +
    +    final StreamContext stcontext = new StreamContext("s1");
    +    DefaultAttributeMap attrMap = new DefaultAttributeMap();
    +    attrMap.put(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, dagCheckPoint);
    +    attrMap.put(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, opCheckPoint);
    +    final OperatorContext context = new com.datatorrent.stram.engine.OperatorContext(0, attrMap, null);
    +    final GenericNode gn = new GenericNode(go, context);
    +    gn.setId(1);
    +
    +    //DefaultReservoir reservoir1 = new DefaultReservoir("ip1Res", 1024);
    +    //DefaultReservoir reservoir2 = new DefaultReservoir("ip2Res", 1024);
    +
    +    //gn.connectInputPort("ip1", reservoir1);
    +    //gn.connectInputPort("ip2", reservoir2);
    +    gn.connectInputPort("ip1", windowGenerator.acquireReservoir("ip1", 1024));
    +    gn.connectInputPort("ip1", windowGenerator.acquireReservoir("ip2", 1024));
    +    gn.connectOutputPort("op", Sink.BLACKHOLE);
    +
    +    final AtomicBoolean ab = new AtomicBoolean(false);
    +    Thread t = new Thread()
    +    {
    +      @Override
    +      public void run()
    +      {
    +        gn.setup(context);
    +        windowGenerator.activate(stcontext);
    +        gn.activate();
    +        ab.set(true);
    +        gn.run();
    +        windowGenerator.deactivate();
    +        gn.deactivate();
    +        gn.teardown();
    +      }
    +    };
    +    t.start();
    +
    +    long interval = 0;
    +    do {
    +      Thread.sleep(sleeptime);
    +      interval += sleeptime;
    +    }
    +    //while ((ab.get() == false) && (interval < maxSleep));
    +    while ((go.numWindows < maxWindows) && (interval < maxSleep));
    --- End diff --
    
    while should be right after the closing curly brace as per the new style rule


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#issuecomment-157234462
  
    I also have some concerns for the intended usage of this interface. I think to determine the tuples obtained from the last streaming window before a checkpoint you will have to call context.getWindowsFromCheckpoint() for every tuple received on an input port. If everyone is okay with doing it this way, I'm fine with it, but I think it makes more sense to have a separate interface that allows you to have beginStreamingWindow and endStreamingWindow callbacks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r46322120
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -97,6 +98,15 @@ public int getId()
         return id;
       }
     
    +  @Override
    +  public int getWindowsFromCheckpoint() {
    --- End diff --
    
    Open brace on a definition should be on a new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r45096690
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -301,6 +301,12 @@
          */
         int getId();
     
    +    /**
    +     * Return the number of windows before the next checkpoint including the current window.
    +     * @return Number of windows from checkpoint, 1 if the checkpoint will be after the current window
    +     */
    +    int getWindowsFromCheckpoint();
    --- End diff --
    
    @chandnisingh semver didn't flag it. In theory it is incompatible, in practice it isn't since users use this interface as opposed to implementing it today. Given that this is in API and there is no such distinction advertised today I will consider moving it to something like an attribute.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r45089129
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -301,6 +301,12 @@
          */
         int getId();
     
    +    /**
    +     * Return the number of windows before the next checkpoint including the current window.
    +     * @return Number of windows from checkpoint, 1 if the checkpoint will be after the current window
    +     */
    +    int getWindowsFromCheckpoint();
    --- End diff --
    
    @ilooner the implementation for this context is in the engine itself so the exposure is contained. There are one or two implementations in the malhar tests but those won't affect the runtime of the app. If we still think this is a problem this can be made an attribute.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r45093134
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -301,6 +301,12 @@
          */
         int getId();
     
    +    /**
    +     * Return the number of windows before the next checkpoint including the current window.
    +     * @return Number of windows from checkpoint, 1 if the checkpoint will be after the current window
    +     */
    +    int getWindowsFromCheckpoint();
    --- End diff --
    
    @PramodSSImmaneni This is a backward incompatible change. Did you consider to extend this interface, add say DelayContext and then add the method in there?
    
    Also semver plugin should be catching this right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#issuecomment-177274761
  
    Yes, closed all stale PRs when moving to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r46322143
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/Node.java ---
    @@ -526,12 +531,21 @@ void checkpoint(long windowId)
           }
         }
     
    +    calculateNextCheckpointWindow();
         checkpoint = new Checkpoint(windowId, applicationWindowCount, checkpointWindowCount);
         if (operator instanceof Operator.CheckpointListener) {
           ((Operator.CheckpointListener) operator).checkpointed(windowId);
         }
       }
     
    +  protected void calculateNextCheckpointWindow() {
    --- End diff --
    
    Open brace on a definition should be on a new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#issuecomment-158758779
  
    Addressed majority of the comments. Need to decide if the method is fine


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r45023776
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/Node.java ---
    @@ -600,6 +605,19 @@ public void activate()
           CHECKPOINT_WINDOW_COUNT = 1;
         }
     
    +    int dagChkptWndwCnt = context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT);
    +    if (PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
    +      int chkOffset = dagChkptWndwCnt % CHECKPOINT_WINDOW_COUNT;
    +      if (chkOffset != 0) {
    +        EFFECTIVE_CHECKPOINT_WINDOW_COUNT = dagChkptWndwCnt + CHECKPOINT_WINDOW_COUNT - chkOffset;
    +      } else {
    +        EFFECTIVE_CHECKPOINT_WINDOW_COUNT = dagChkptWndwCnt;
    +      }
    +    } else {
    +      EFFECTIVE_CHECKPOINT_WINDOW_COUNT = 1;
    +    }
    --- End diff --
    
    @PramodSSImmaneni Should EFFECTIVE_CHECKPOINT_WINDOW_COUNT and CHECKPOINT_WINDOW_COUNT not be same value? There are couple of places where ++checkpointWindowCount == CHECKPOINT_WINDOW_COUNT is checked to make checkpoint call


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r46322128
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -97,6 +98,15 @@ public int getId()
         return id;
       }
     
    +  @Override
    +  public int getWindowsFromCheckpoint() {
    +    return windowsFromCheckpoint;
    +  }
    +
    +  public void setWindowsFromCheckpoint(int windowsFromCheckpoint) {
    --- End diff --
    
    Open brace on a definition should be on a new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#issuecomment-157219514
  
    I think it looks good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r45090189
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/Node.java ---
    @@ -600,6 +605,19 @@ public void activate()
           CHECKPOINT_WINDOW_COUNT = 1;
         }
     
    +    int dagChkptWndwCnt = context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT);
    +    if (PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
    +      int chkOffset = dagChkptWndwCnt % CHECKPOINT_WINDOW_COUNT;
    +      if (chkOffset != 0) {
    +        EFFECTIVE_CHECKPOINT_WINDOW_COUNT = dagChkptWndwCnt + CHECKPOINT_WINDOW_COUNT - chkOffset;
    +      } else {
    +        EFFECTIVE_CHECKPOINT_WINDOW_COUNT = dagChkptWndwCnt;
    +      }
    +    } else {
    +      EFFECTIVE_CHECKPOINT_WINDOW_COUNT = 1;
    +    }
    --- End diff --
    
    @chandnisingh This is for iteration support, APEX-246. We need to know at the beginning of the window before the checkpoint. Initial iteration support will be for a delay of 1 but that could change in future to be more than 1 window. This is to let you know how far you are from the checkpoint window.
    
    @chandnisingh, @gauravgopi123 The CHECKPOINT_WINDOW_COUNT by itself is not equal to the windows from checkpoint. Checkpoint = Window greater than or equal to multiple of DAG checkpoint count that is also a multiple of operator checkpoint window, hence the complication. Also as Tim pointed out my logic has an issue and I will correct it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r45104209
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -301,6 +301,12 @@
          */
         int getId();
     
    +    /**
    +     * Return the number of windows before the next checkpoint including the current window.
    +     * @return Number of windows from checkpoint, 1 if the checkpoint will be after the current window
    +     */
    +    int getWindowsFromCheckpoint();
    --- End diff --
    
    @PramodSSImmaneni I don't see how we can say this is a backward incompatible change just in theory. Is there a way to find out if none of the users implement this interface?
    
    Also why are you going to use something like an attribute? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r45023930
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -301,6 +301,12 @@
          */
         int getId();
     
    +    /**
    +     * Return the number of windows before the next checkpoint including the current window.
    +     * @return Number of windows from checkpoint, 1 if the checkpoint will be after the current window
    +     */
    +    int getWindowsFromCheckpoint();
    --- End diff --
    
    @PramodSSImmaneni If an operator is inside a widow and checkpoint will happen after that, should this not return 0? 1 means that there is still one window before checkpoint will happen which is not the case right


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#discussion_r45010069
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/Node.java ---
    @@ -600,6 +605,19 @@ public void activate()
           CHECKPOINT_WINDOW_COUNT = 1;
         }
     
    +    int dagChkptWndwCnt = context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT);
    +    if (PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
    +      int chkOffset = dagChkptWndwCnt % CHECKPOINT_WINDOW_COUNT;
    +      if (chkOffset != 0) {
    +        EFFECTIVE_CHECKPOINT_WINDOW_COUNT = dagChkptWndwCnt + CHECKPOINT_WINDOW_COUNT - chkOffset;
    +      } else {
    +        EFFECTIVE_CHECKPOINT_WINDOW_COUNT = dagChkptWndwCnt;
    +      }
    +    } else {
    +      EFFECTIVE_CHECKPOINT_WINDOW_COUNT = 1;
    +    }
    +    context.setWindowsFromCheckpoint(EFFECTIVE_CHECKPOINT_WINDOW_COUNT);
    +
    --- End diff --
    
    I think this logic is incorrect. The EFFECTIVE_CHECKPOINT_WINDOW_COUNT is not necessarily a constant.
    
    Ex.
    
    The Dag CHECKPOINT_WINDOW_COUNT is 5
    The Operator CHECKPOINT_WINDOW_COUNT is 3
    
    1 - No checkpoint
    2 - No checkpoint
    3 - Operator checkpoint window end is reached but no checkpoint is performed because checkpoint tuple is not received yet
    4- no checkpoint
    5 - checkpoint tuple received
    6** - end of the checkpoint window is reached so checkpoint window is performed
    7 - No checkpoint
    8 - No checkpoint
    9 - Operator checkpoint window end is reached but no checkpoint is performed because checkpoint tuple is not received yet
    10 - checkpoint tuple received
    11 - No checkpoint 
    12** - end of the checkpoint window is reached so checkpoint window is performed
    13 - No checkpoint
    14 - No checkpoint
    15 - No checkpoint because end window is received first then checkpoint tuple
    16 - no checkpoint
    17 - no checkpoint
    18** - end of the checkpoint window is reached so checkpoint window is performed
    19 - No checkpoint
    20 - checkpoint tuple is received
    21** - end of the checkpoint window is reached so checkpoint window is performed
    
    So the effective checkpoint window counts in this example are 6, 6, 6, and 3


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#issuecomment-157156709
  
    @tweise @ilooner @davidyan74 please see


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: Providing a way for operator to ...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/168#issuecomment-157247094
  
    @PramodSSImmaneni Could you please point me to the discussion for this change or any documentation related to this. I am interested in knowing the use cases to understand the rationale behind this change.
    I was under the impression that we are adding a way to let the operator know that when the next checkpoint will begin however this seems much more complicated. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---