You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by ilooner <gi...@git.apache.org> on 2015/11/11 20:35:17 UTC

[GitHub] incubator-apex-core pull request: - APEX-256 Added unit tests to v...

GitHub user ilooner opened a pull request:

    https://github.com/apache/incubator-apex-core/pull/163

    - APEX-256 Added unit tests to verify that checkpointed is or is not …

    …called immediately after triggering a copyToHDFS for the different processing modes for asynchronous checkpointing.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ilooner/incubator-apex-core APEX-256

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-core/pull/163.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #163
    
----
commit eb8ec32facfe72f24352a49b81d0773c2d7b16a1
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2015-11-11T19:33:16Z

    - APEX-256 Added unit tests to verify that checkpointed is or is not called immediately after triggering a copyToHDFS for the different processing modes for asynchronous checkpointing.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: - APEX-256 Added unit tests to v...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/163#discussion_r45312082
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---
    @@ -296,4 +345,132 @@ public void run()
         Assert.assertTrue("End window not called", go.endWindowId != go.beginWindowId);
       }
     
    +  /**
    +   * Tests to make sure that {@link CheckpointListener#checkpointed(long) is called
    +   * immediately after copying to hdfs in the case of exactly once processing mode.
    +   * @throws Exception
    +   */
    +  @Test
    +  public void testCheckpointedCallTestExactlyOnce() throws Exception
    +  {
    +    asyncCheckpointedCalledHelper(ProcessingMode.EXACTLY_ONCE);
    +  }
    +
    +  /**
    +   * Tests to make sure that {@link CheckpointListener#checkpointed(long) is NOT called immediately after the asynchronous
    +   * task of copying to hdfs is triggered, in the at least once processing mode.
    +   * @throws Exception
    +   */
    +  @Test
    +  public void testCheckpointedCallTestAtleastOnce() throws Exception
    +  {
    +    asyncCheckpointedCalledHelper(ProcessingMode.AT_LEAST_ONCE);
    +  }
    +
    +  /**
    +   * Tests to make sure that {@link CheckpointListener#checkpointed(long) is NOT called immediately after the asynchronous
    +   * task of copying to hdfs is triggered, in the at most once processing mode.
    +   * @throws Exception
    +   */
    +  @Test
    +  public void testCheckpointedCallTestAtModeOnce() throws Exception
    +  {
    +    asyncCheckpointedCalledHelper(ProcessingMode.AT_MOST_ONCE);
    +  }
    +
    +  private void asyncCheckpointedCalledHelper(ProcessingMode processingMode) throws Exception
    +  {
    +    long maxSleep = 5000;
    +    long sleeptime = 25L;
    --- End diff --
    
    why two different ways? 25L and 5000


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: - APEX-256 Added unit tests to v...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-core/pull/163


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: - APEX-256 Added unit tests to v...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/163#discussion_r45310744
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---
    @@ -38,10 +49,40 @@
      */
     public class GenericNodeTest
     {
    -  public static class GenericOperator implements Operator
    +  @Rule
    +  public FSTestWatcher testMeta = new FSTestWatcher();
    +
    +  public static class FSTestWatcher extends TestWatcher
    +  {
    +    public org.junit.runner.Description desc;
    +
    +    public String getDir()
    +    {
    --- End diff --
    
    This method gets called twice for a test. Instead have a directory variable defined and initialize that variable with "target/" + desc.getClassName() + "/" + desc.getMethodName() in starting(). You don't need to store description variable


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: - APEX-256 Added unit tests to v...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/163#issuecomment-155888024
  
    @gauravgopi123 Please review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: - APEX-256 Added unit tests to v...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/163#issuecomment-174212936
  
    @ilooner can you please rebase and address the review comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: - APEX-256 Added unit tests to v...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/163#discussion_r45312032
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---
    @@ -296,4 +345,132 @@ public void run()
         Assert.assertTrue("End window not called", go.endWindowId != go.beginWindowId);
       }
     
    +  /**
    +   * Tests to make sure that {@link CheckpointListener#checkpointed(long) is called
    +   * immediately after copying to hdfs in the case of exactly once processing mode.
    +   * @throws Exception
    +   */
    +  @Test
    +  public void testCheckpointedCallTestExactlyOnce() throws Exception
    +  {
    +    asyncCheckpointedCalledHelper(ProcessingMode.EXACTLY_ONCE);
    +  }
    +
    +  /**
    +   * Tests to make sure that {@link CheckpointListener#checkpointed(long) is NOT called immediately after the asynchronous
    +   * task of copying to hdfs is triggered, in the at least once processing mode.
    +   * @throws Exception
    +   */
    +  @Test
    +  public void testCheckpointedCallTestAtleastOnce() throws Exception
    +  {
    +    asyncCheckpointedCalledHelper(ProcessingMode.AT_LEAST_ONCE);
    +  }
    +
    +  /**
    +   * Tests to make sure that {@link CheckpointListener#checkpointed(long) is NOT called immediately after the asynchronous
    +   * task of copying to hdfs is triggered, in the at most once processing mode.
    +   * @throws Exception
    +   */
    +  @Test
    +  public void testCheckpointedCallTestAtModeOnce() throws Exception
    +  {
    +    asyncCheckpointedCalledHelper(ProcessingMode.AT_MOST_ONCE);
    +  }
    +
    +  private void asyncCheckpointedCalledHelper(ProcessingMode processingMode) throws Exception
    +  {
    +    long maxSleep = 5000;
    +    long sleeptime = 25L;
    +    GenericOperator go = new GenericOperator();
    +    AsyncFSStorageAgent storageAgent = new TestAsyncFSStorageAgent(testMeta.getDir(), new Configuration());
    +    DefaultAttributeMap amap = new DefaultAttributeMap();
    +    amap.put(OperatorContext.STORAGE_AGENT, storageAgent);
    +    amap.put(OperatorContext.PROCESSING_MODE, processingMode);
    +    final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, amap, null));
    +    gn.setId(1);
    +    gn.APPLICATION_WINDOW_COUNT = 1;
    +    gn.CHECKPOINT_WINDOW_COUNT = 1;
    +    DefaultReservoir reservoir1 = new DefaultReservoir("ip1Res", 1024);
    +    DefaultReservoir reservoir2 = new DefaultReservoir("ip2Res", 1024);
    +
    +    gn.connectInputPort("ip1", reservoir1);
    +    gn.connectInputPort("ip2", reservoir2);
    +    gn.connectOutputPort("op", Sink.BLACKHOLE);
    +
    +    final AtomicBoolean ab = new AtomicBoolean(false);
    +    Thread t = new Thread()
    +    {
    +      @Override
    +      public void run()
    +      {
    +        ab.set(true);
    +        gn.activate();
    +        gn.run();
    +        gn.deactivate();
    +      }
    +
    +    };
    +    t.start();
    +
    +    long interval = 0;
    +    do {
    +      Thread.sleep(sleeptime);
    +      interval += sleeptime;
    +    }
    +    while ((ab.get() == false) && (interval < maxSleep));
    +
    +
    +    int controlTupleCount = gn.controlTupleCount;
    +    Tuple beginWindow1 = new Tuple(MessageType.BEGIN_WINDOW, 0x1L);
    +
    +    reservoir1.add(beginWindow1);
    +    reservoir2.add(beginWindow1);
    +
    +    interval = 0;
    +    do {
    +      Thread.sleep(sleeptime);
    +      interval += sleeptime;
    +    }
    +    while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
    +    Assert.assertTrue("Begin window called", go.endWindowId != go.beginWindowId);
    +    controlTupleCount = gn.controlTupleCount;
    +
    +    Tuple endWindow1 = new EndWindowTuple(0x1L);
    +
    +    gn.doCheckpoint = true;
    +
    +    reservoir1.add(endWindow1);
    +    reservoir2.add(endWindow1);
    +
    +    interval = 0;
    +    do {
    +      Thread.sleep(sleeptime);
    +      interval += sleeptime;
    +    }
    +    while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
    +    Assert.assertTrue("End window called", go.endWindowId == go.beginWindowId);
    +
    +    if (processingMode == ProcessingMode.EXACTLY_ONCE) {
    +      Assert.assertTrue(go.checkpointed);
    +    } else {
    +      Assert.assertFalse(go.checkpointed);
    +    }
    +  }
    --- End diff --
    
    There is no endStreamTuple passed or shutdown call on GenericNode gn


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---