You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@livy.apache.org by bjoernlohrmann <gi...@git.apache.org> on 2018/11/13 22:21:27 UTC

[GitHub] incubator-livy pull request #128: Use setJobGroup/cancelJobGroup to cancel b...

GitHub user bjoernlohrmann opened a pull request:

    https://github.com/apache/incubator-livy/pull/128

    Use setJobGroup/cancelJobGroup to cancel bypass jobs

    ## What changes were proposed in this pull request?
    
    This PR fixes [LIVY-533](https://issues.apache.org/jira/browse/LIVY-533).
    
    The proposed fix invokes setJobGroup() in the worker thread before calling job code, and cancelJobGroup() when another thread tries to cancel job execution. synchronized blocks are employed to guard against race conditions.
    
    Also, this PR removes some unused code from the JobWrapper class.
    
    ## How was this patch tested?
    
    Manual as well as Livy test suite. No new tests were added.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bjoernlohrmann/incubator-livy master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-livy/pull/128.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #128
    
----
commit 989225990ed139d2f5ed35e89e4dcbcead96b4dc
Author: Bjoern Lohrmann <53...@...>
Date:   2018-11-13T09:37:30Z

    Use setJobGroup/cancelJobGroup to cancel bypass jobs

----


---

[GitHub] incubator-livy pull request #128: [LIVY-533] Use setJobGroup/cancelJobGroup ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-livy/pull/128


---

[GitHub] incubator-livy pull request #128: [LIVY-533] Use setJobGroup/cancelJobGroup ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/128#discussion_r238385396
  
    --- Diff: rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java ---
    @@ -490,20 +487,72 @@ public void call(LivyClient client) throws Exception {
             String resultVal = (String) s.deserialize(ByteBuffer.wrap(status.result));
             assertEquals("hello", resultVal);
     
    -        // After the result is retrieved, the driver should stop tracking the job and release
    -        // resources associated with it.
    -        try {
    -          lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
    -          fail("Should have failed to retrieve status of released job.");
    -        } catch (ExecutionException ee) {
    -          assertTrue(ee.getCause() instanceof RpcException);
    -          assertTrue(ee.getCause().getMessage().contains(
    -            "java.util.NoSuchElementException: " + jobId));
    +        assertJobIdUntracked(lclient, jobId);
    +      }
    +    });
    +  }
    +
    +  private void assertJobIdUntracked(RSCClient lclient, String jobId) throws InterruptedException,
    +          TimeoutException {
    --- End diff --
    
    nit: indented too far. I also slightly prefer to keep all the thrown exceptions in the same line.


---

[GitHub] incubator-livy pull request #128: [LIVY-533] Use setJobGroup/cancelJobGroup ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/128#discussion_r237956271
  
    --- Diff: rsc/src/main/java/org/apache/livy/rsc/driver/JobWrapper.java ---
    @@ -38,21 +34,31 @@
       public final String jobId;
     
       private final RSCDriver driver;
    +
       private final Job<T> job;
    -  private final AtomicInteger completed;
    +
    +  private boolean isCancelled = false;
     
       private Future<?> future;
     
       public JobWrapper(RSCDriver driver, String jobId, Job<T> job) {
         this.driver = driver;
         this.jobId = jobId;
         this.job = job;
    -    this.completed = new AtomicInteger();
       }
     
       @Override
       public Void call() throws Exception {
         try {
    +      // this is synchronized to avoid races with cancel()
    +      synchronized (this) {
    +        if (isCancelled) {
    +          throw new Exception("Job isCancelled");
    --- End diff --
    
    Throw `CancellationException` instead.


---

[GitHub] incubator-livy pull request #128: [LIVY-533] Use setJobGroup/cancelJobGroup ...

Posted by bjoernlohrmann <gi...@git.apache.org>.
Github user bjoernlohrmann commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/128#discussion_r238455398
  
    --- Diff: rsc/src/main/java/org/apache/livy/rsc/driver/BypassJobWrapper.java ---
    @@ -38,28 +38,50 @@ public BypassJobWrapper(RSCDriver driver, String jobId, Job<byte[]> serializedJo
     
       @Override
       public Void call() throws Exception {
    -    state = JobHandle.State.STARTED;
    +    // we ignore the return value here, because super.call() will also detect cancellation
    +    // and needs to perform some cleanup
    +    tryTransitionToState(JobHandle.State.STARTED);
    --- End diff --
    
    That is actually the standard case "job is cancelled while running" and should work as intended.


---

[GitHub] incubator-livy pull request #128: [LIVY-533] Use setJobGroup/cancelJobGroup ...

Posted by bjoernlohrmann <gi...@git.apache.org>.
Github user bjoernlohrmann commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/128#discussion_r237280060
  
    --- Diff: rsc/src/main/java/org/apache/livy/rsc/driver/JobWrapper.java ---
    @@ -39,20 +39,27 @@
     
       private final RSCDriver driver;
       private final Job<T> job;
    -  private final AtomicInteger completed;
     
    -  private Future<?> future;
    +  private volatile Future<?> future;
     
       public JobWrapper(RSCDriver driver, String jobId, Job<T> job) {
         this.driver = driver;
         this.jobId = jobId;
         this.job = job;
    -    this.completed = new AtomicInteger();
       }
     
       @Override
       public Void call() throws Exception {
         try {
    +      // this is synchronized to avoid races with cancel()
    --- End diff --
    
    `future` will only be null for synchronous bypass jobs, but those cannot be cancelled unless I am mistaken?
    
    For async bypass jobs, I think `future` should always be non-null during `call()/cancel()`, because it is assigned in `submit()` which also has the synchronized modifier and always runs before `call()` or `cancel()`.
    
    I can add a boolean cancelled flag if you prefer, but this feels like duplicating information that is already available in the future.


---

[GitHub] incubator-livy pull request #128: [LIVY-533] Use setJobGroup/cancelJobGroup ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/128#discussion_r238384975
  
    --- Diff: rsc/src/main/java/org/apache/livy/rsc/driver/BypassJobWrapper.java ---
    @@ -38,28 +38,50 @@ public BypassJobWrapper(RSCDriver driver, String jobId, Job<byte[]> serializedJo
     
       @Override
       public Void call() throws Exception {
    -    state = JobHandle.State.STARTED;
    +    // we ignore the return value here, because super.call() will also detect cancellation
    +    // and needs to perform some cleanup
    +    tryTransitionToState(JobHandle.State.STARTED);
    +
         return super.call();
       }
     
       @Override
       protected synchronized void finished(byte[] result, Throwable error) {
    -    if (error == null) {
    +    if (error == null && tryTransitionToState(JobHandle.State.SUCCEEDED)) {
           this.result = result;
    -      this.state = JobHandle.State.SUCCEEDED;
    -    } else {
    +    } else if (error != null && tryTransitionToState(JobHandle.State.FAILED)) {
           this.error = error;
    -      this.state = JobHandle.State.FAILED;
         }
       }
     
       @Override
    -  boolean cancel() {
    -    if (super.cancel()) {
    -      this.state = JobHandle.State.CANCELLED;
    -      return true;
    +  synchronized boolean cancel() {
    +    return tryTransitionToState(JobHandle.State.CANCELLED) && super.cancel();
    +  }
    +
    +  private synchronized boolean tryTransitionToState(JobHandle.State newState) {
    +    boolean success = false;
    +
    +    switch (this.state) {
    +      case QUEUED:
    +        if (newState == JobHandle.State.STARTED || newState == JobHandle.State.CANCELLED) {
    +          this.state = newState;
    +          success = true;
    +        }
    +        break;
    +      case STARTED:
    +        if (newState == JobHandle.State.CANCELLED
    +                || newState == JobHandle.State.SUCCEEDED
    --- End diff --
    
    nit: indented too far


---

[GitHub] incubator-livy pull request #128: [LIVY-533] Use setJobGroup/cancelJobGroup ...

Posted by bjoernlohrmann <gi...@git.apache.org>.
Github user bjoernlohrmann commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/128#discussion_r238458802
  
    --- Diff: rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java ---
    @@ -490,20 +487,72 @@ public void call(LivyClient client) throws Exception {
             String resultVal = (String) s.deserialize(ByteBuffer.wrap(status.result));
             assertEquals("hello", resultVal);
     
    -        // After the result is retrieved, the driver should stop tracking the job and release
    -        // resources associated with it.
    -        try {
    -          lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
    -          fail("Should have failed to retrieve status of released job.");
    -        } catch (ExecutionException ee) {
    -          assertTrue(ee.getCause() instanceof RpcException);
    -          assertTrue(ee.getCause().getMessage().contains(
    -            "java.util.NoSuchElementException: " + jobId));
    +        assertJobIdUntracked(lclient, jobId);
    +      }
    +    });
    +  }
    +
    +  private void assertJobIdUntracked(RSCClient lclient, String jobId) throws InterruptedException,
    +          TimeoutException {
    +
    +    // After the result is retrieved, the driver should stop tracking the job and release
    +    // resources associated with it.
    +    try {
    +      lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
    +      fail("Should have failed to retrieve status of released job.");
    +    } catch (ExecutionException ee) {
    +      assertTrue(ee.getCause() instanceof RpcException);
    +      assertTrue(ee.getCause().getMessage().contains(
    +              "java.util.NoSuchElementException: " + jobId));
    +    }
    +  }
    +
    +  @Test
    +  public void testBypassWithImmediateCancellation() throws Exception {
    +    runBypassCancellationTest(0, 1000);
    +  }
    +
    +  @Test
    +  public void testBypassWithCancellation() throws Exception {
    +    runBypassCancellationTest(200, 1000);
    +  }
    +
    +  public void runBypassCancellationTest(long waitBeforeCancel, long jobDuration) throws Exception {
    --- End diff --
    
    Yes, cancel() interrupts the sleeper job.
    
    I upped the job duration to one minute which I hope is enough. I am hesitant to make the job duration infinite. If cancellation fails due to some bug, the test will hang forever (or until some timeout kills the build) -- which is would be very annoying.


---

[GitHub] incubator-livy pull request #128: [LIVY-533] Use setJobGroup/cancelJobGroup ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/128#discussion_r238387799
  
    --- Diff: rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java ---
    @@ -490,20 +487,72 @@ public void call(LivyClient client) throws Exception {
             String resultVal = (String) s.deserialize(ByteBuffer.wrap(status.result));
             assertEquals("hello", resultVal);
     
    -        // After the result is retrieved, the driver should stop tracking the job and release
    -        // resources associated with it.
    -        try {
    -          lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
    -          fail("Should have failed to retrieve status of released job.");
    -        } catch (ExecutionException ee) {
    -          assertTrue(ee.getCause() instanceof RpcException);
    -          assertTrue(ee.getCause().getMessage().contains(
    -            "java.util.NoSuchElementException: " + jobId));
    +        assertJobIdUntracked(lclient, jobId);
    +      }
    +    });
    +  }
    +
    +  private void assertJobIdUntracked(RSCClient lclient, String jobId) throws InterruptedException,
    +          TimeoutException {
    +
    +    // After the result is retrieved, the driver should stop tracking the job and release
    +    // resources associated with it.
    +    try {
    +      lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
    +      fail("Should have failed to retrieve status of released job.");
    +    } catch (ExecutionException ee) {
    +      assertTrue(ee.getCause() instanceof RpcException);
    +      assertTrue(ee.getCause().getMessage().contains(
    +              "java.util.NoSuchElementException: " + jobId));
    +    }
    +  }
    +
    +  @Test
    +  public void testBypassWithImmediateCancellation() throws Exception {
    +    runBypassCancellationTest(0, 1000);
    +  }
    +
    +  @Test
    +  public void testBypassWithCancellation() throws Exception {
    +    runBypassCancellationTest(200, 1000);
    +  }
    +
    +  public void runBypassCancellationTest(long waitBeforeCancel, long jobDuration) throws Exception {
    --- End diff --
    
    I'm generally a bit wary of tests that rely on timing, especially when it's a little tight. Overloaded build machines may make this test flaky.
    
    Can the job duration be much larger than the wait, or even basically "infinite"? The cancel or the shutting down of the client should interrupt the sleep, right?


---

[GitHub] incubator-livy pull request #128: Use setJobGroup/cancelJobGroup to cancel b...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/128#discussion_r236355512
  
    --- Diff: rsc/src/main/java/org/apache/livy/rsc/driver/JobWrapper.java ---
    @@ -39,20 +39,27 @@
     
       private final RSCDriver driver;
       private final Job<T> job;
    -  private final AtomicInteger completed;
     
    -  private Future<?> future;
    +  private volatile Future<?> future;
     
       public JobWrapper(RSCDriver driver, String jobId, Job<T> job) {
         this.driver = driver;
         this.jobId = jobId;
         this.job = job;
    -    this.completed = new AtomicInteger();
       }
     
       @Override
       public Void call() throws Exception {
         try {
    +      // this is synchronized to avoid races with cancel()
    --- End diff --
    
    If this race is possible, then there's another race you're missing: if `cancel()` is called before `call()` runs, then you'll start the job after it's been cancelled (since `future` will be `null`).


---

[GitHub] incubator-livy issue #128: [LIVY-533] Use setJobGroup/cancelJobGroup to canc...

Posted by bjoernlohrmann <gi...@git.apache.org>.
Github user bjoernlohrmann commented on the issue:

    https://github.com/apache/incubator-livy/pull/128
  
    I have updated some comment in the PR to force a rebuild. The last build I think failed due to an unrelated flaky testcase in the batch integration tests.
    
    Is this otherwise good enough to be merged?


---

[GitHub] incubator-livy pull request #128: [LIVY-533] Use setJobGroup/cancelJobGroup ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/128#discussion_r238386275
  
    --- Diff: rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java ---
    @@ -490,20 +487,72 @@ public void call(LivyClient client) throws Exception {
             String resultVal = (String) s.deserialize(ByteBuffer.wrap(status.result));
             assertEquals("hello", resultVal);
     
    -        // After the result is retrieved, the driver should stop tracking the job and release
    -        // resources associated with it.
    -        try {
    -          lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
    -          fail("Should have failed to retrieve status of released job.");
    -        } catch (ExecutionException ee) {
    -          assertTrue(ee.getCause() instanceof RpcException);
    -          assertTrue(ee.getCause().getMessage().contains(
    -            "java.util.NoSuchElementException: " + jobId));
    +        assertJobIdUntracked(lclient, jobId);
    +      }
    +    });
    +  }
    +
    +  private void assertJobIdUntracked(RSCClient lclient, String jobId) throws InterruptedException,
    +          TimeoutException {
    +
    +    // After the result is retrieved, the driver should stop tracking the job and release
    +    // resources associated with it.
    +    try {
    +      lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
    +      fail("Should have failed to retrieve status of released job.");
    +    } catch (ExecutionException ee) {
    +      assertTrue(ee.getCause() instanceof RpcException);
    +      assertTrue(ee.getCause().getMessage().contains(
    +              "java.util.NoSuchElementException: " + jobId));
    +    }
    +  }
    +
    +  @Test
    +  public void testBypassWithImmediateCancellation() throws Exception {
    +    runBypassCancellationTest(0, 1000);
    +  }
    +
    +  @Test
    +  public void testBypassWithCancellation() throws Exception {
    +    runBypassCancellationTest(200, 1000);
    +  }
    +
    +  public void runBypassCancellationTest(long waitBeforeCancel, long jobDuration) throws Exception {
    +    runTest(true, new TestFunction() {
    +      @Override
    +      public void call(LivyClient client) throws Exception {
    +        Serializer s = new Serializer();
    +        RSCClient lclient = (RSCClient) client;
    +        ByteBuffer job = s.serialize(new Sleeper(jobDuration));
    +        String jobId = lclient.bypass(job, "spark", false);
    +
    +        assertNotCancelledOrFailed(lclient, jobId);
    +
    +        if (waitBeforeCancel > 0) {
    +          Thread.sleep(waitBeforeCancel);
    +          assertNotCancelledOrFailed(lclient, jobId);
             }
    +
    +        lclient.cancel(jobId);
    +        BypassJobStatus currStatus = lclient.getBypassJobStatus(jobId).get(TIMEOUT,
    +                TimeUnit.SECONDS);
    +        assertEquals(JobHandle.State.CANCELLED, currStatus.state);
    +        assertJobIdUntracked(lclient, jobId);
           }
         });
       }
     
    +  private BypassJobStatus assertNotCancelledOrFailed(RSCClient lclient, String jobId)
    +          throws InterruptedException, ExecutionException, TimeoutException {
    --- End diff --
    
    nit: indented too far


---

[GitHub] incubator-livy issue #128: [LIVY-533] Use setJobGroup/cancelJobGroup to canc...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/incubator-livy/pull/128
  
    Merging to master.
    
    I can see this not handling 100% of cases (e.g. people setting their own job group in their jobs), but at least it's a best effort.


---

[GitHub] incubator-livy pull request #128: [LIVY-533] Use setJobGroup/cancelJobGroup ...

Posted by bjoernlohrmann <gi...@git.apache.org>.
Github user bjoernlohrmann commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/128#discussion_r238457147
  
    --- Diff: rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java ---
    @@ -490,20 +487,72 @@ public void call(LivyClient client) throws Exception {
             String resultVal = (String) s.deserialize(ByteBuffer.wrap(status.result));
             assertEquals("hello", resultVal);
     
    -        // After the result is retrieved, the driver should stop tracking the job and release
    -        // resources associated with it.
    -        try {
    -          lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
    -          fail("Should have failed to retrieve status of released job.");
    -        } catch (ExecutionException ee) {
    -          assertTrue(ee.getCause() instanceof RpcException);
    -          assertTrue(ee.getCause().getMessage().contains(
    -            "java.util.NoSuchElementException: " + jobId));
    +        assertJobIdUntracked(lclient, jobId);
    +      }
    +    });
    +  }
    +
    +  private void assertJobIdUntracked(RSCClient lclient, String jobId) throws InterruptedException,
    +          TimeoutException {
    +
    +    // After the result is retrieved, the driver should stop tracking the job and release
    +    // resources associated with it.
    +    try {
    +      lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
    +      fail("Should have failed to retrieve status of released job.");
    +    } catch (ExecutionException ee) {
    +      assertTrue(ee.getCause() instanceof RpcException);
    +      assertTrue(ee.getCause().getMessage().contains(
    +              "java.util.NoSuchElementException: " + jobId));
    +    }
    +  }
    +
    +  @Test
    +  public void testBypassWithImmediateCancellation() throws Exception {
    +    runBypassCancellationTest(0, 1000);
    +  }
    +
    +  @Test
    +  public void testBypassWithCancellation() throws Exception {
    +    runBypassCancellationTest(200, 1000);
    +  }
    +
    +  public void runBypassCancellationTest(long waitBeforeCancel, long jobDuration) throws Exception {
    +    runTest(true, new TestFunction() {
    +      @Override
    +      public void call(LivyClient client) throws Exception {
    +        Serializer s = new Serializer();
    +        RSCClient lclient = (RSCClient) client;
    +        ByteBuffer job = s.serialize(new Sleeper(jobDuration));
    +        String jobId = lclient.bypass(job, "spark", false);
    +
    +        assertNotCancelledOrFailed(lclient, jobId);
    +
    +        if (waitBeforeCancel > 0) {
    +          Thread.sleep(waitBeforeCancel);
    +          assertNotCancelledOrFailed(lclient, jobId);
             }
    +
    +        lclient.cancel(jobId);
    +        BypassJobStatus currStatus = lclient.getBypassJobStatus(jobId).get(TIMEOUT,
    +                TimeUnit.SECONDS);
    +        assertEquals(JobHandle.State.CANCELLED, currStatus.state);
    +        assertJobIdUntracked(lclient, jobId);
           }
         });
       }
     
    +  private BypassJobStatus assertNotCancelledOrFailed(RSCClient lclient, String jobId)
    +          throws InterruptedException, ExecutionException, TimeoutException {
    +
    +    BypassJobStatus currStatus = lclient.getBypassJobStatus(jobId).get(TIMEOUT,
    +            TimeUnit.SECONDS);
    --- End diff --
    
    Fixed.
    
    I was using IntelliJ IDEA defaults. Is there an "official" code style one can import?


---

[GitHub] incubator-livy pull request #128: [LIVY-533] Use setJobGroup/cancelJobGroup ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/128#discussion_r238459612
  
    --- Diff: rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java ---
    @@ -490,20 +487,72 @@ public void call(LivyClient client) throws Exception {
             String resultVal = (String) s.deserialize(ByteBuffer.wrap(status.result));
             assertEquals("hello", resultVal);
     
    -        // After the result is retrieved, the driver should stop tracking the job and release
    -        // resources associated with it.
    -        try {
    -          lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
    -          fail("Should have failed to retrieve status of released job.");
    -        } catch (ExecutionException ee) {
    -          assertTrue(ee.getCause() instanceof RpcException);
    -          assertTrue(ee.getCause().getMessage().contains(
    -            "java.util.NoSuchElementException: " + jobId));
    +        assertJobIdUntracked(lclient, jobId);
    +      }
    +    });
    +  }
    +
    +  private void assertJobIdUntracked(RSCClient lclient, String jobId) throws InterruptedException,
    +          TimeoutException {
    +
    +    // After the result is retrieved, the driver should stop tracking the job and release
    +    // resources associated with it.
    +    try {
    +      lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
    +      fail("Should have failed to retrieve status of released job.");
    +    } catch (ExecutionException ee) {
    +      assertTrue(ee.getCause() instanceof RpcException);
    +      assertTrue(ee.getCause().getMessage().contains(
    +              "java.util.NoSuchElementException: " + jobId));
    +    }
    +  }
    +
    +  @Test
    +  public void testBypassWithImmediateCancellation() throws Exception {
    +    runBypassCancellationTest(0, 1000);
    +  }
    +
    +  @Test
    +  public void testBypassWithCancellation() throws Exception {
    +    runBypassCancellationTest(200, 1000);
    +  }
    +
    +  public void runBypassCancellationTest(long waitBeforeCancel, long jobDuration) throws Exception {
    +    runTest(true, new TestFunction() {
    +      @Override
    +      public void call(LivyClient client) throws Exception {
    +        Serializer s = new Serializer();
    +        RSCClient lclient = (RSCClient) client;
    +        ByteBuffer job = s.serialize(new Sleeper(jobDuration));
    +        String jobId = lclient.bypass(job, "spark", false);
    +
    +        assertNotCancelledOrFailed(lclient, jobId);
    +
    +        if (waitBeforeCancel > 0) {
    +          Thread.sleep(waitBeforeCancel);
    +          assertNotCancelledOrFailed(lclient, jobId);
             }
    +
    +        lclient.cancel(jobId);
    +        BypassJobStatus currStatus = lclient.getBypassJobStatus(jobId).get(TIMEOUT,
    +                TimeUnit.SECONDS);
    +        assertEquals(JobHandle.State.CANCELLED, currStatus.state);
    +        assertJobIdUntracked(lclient, jobId);
           }
         });
       }
     
    +  private BypassJobStatus assertNotCancelledOrFailed(RSCClient lclient, String jobId)
    +          throws InterruptedException, ExecutionException, TimeoutException {
    +
    +    BypassJobStatus currStatus = lclient.getBypassJobStatus(jobId).get(TIMEOUT,
    +            TimeUnit.SECONDS);
    --- End diff --
    
    Livy follows Spark's guide (http://spark.apache.org/contributing.html), maybe there's something there. But I don't use IntelliJ so can't help you there.


---

[GitHub] incubator-livy pull request #128: [LIVY-533] Use setJobGroup/cancelJobGroup ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/128#discussion_r238386377
  
    --- Diff: rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java ---
    @@ -490,20 +487,72 @@ public void call(LivyClient client) throws Exception {
             String resultVal = (String) s.deserialize(ByteBuffer.wrap(status.result));
             assertEquals("hello", resultVal);
     
    -        // After the result is retrieved, the driver should stop tracking the job and release
    -        // resources associated with it.
    -        try {
    -          lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
    -          fail("Should have failed to retrieve status of released job.");
    -        } catch (ExecutionException ee) {
    -          assertTrue(ee.getCause() instanceof RpcException);
    -          assertTrue(ee.getCause().getMessage().contains(
    -            "java.util.NoSuchElementException: " + jobId));
    +        assertJobIdUntracked(lclient, jobId);
    +      }
    +    });
    +  }
    +
    +  private void assertJobIdUntracked(RSCClient lclient, String jobId) throws InterruptedException,
    +          TimeoutException {
    +
    +    // After the result is retrieved, the driver should stop tracking the job and release
    +    // resources associated with it.
    +    try {
    +      lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
    +      fail("Should have failed to retrieve status of released job.");
    +    } catch (ExecutionException ee) {
    +      assertTrue(ee.getCause() instanceof RpcException);
    +      assertTrue(ee.getCause().getMessage().contains(
    +              "java.util.NoSuchElementException: " + jobId));
    +    }
    +  }
    +
    +  @Test
    +  public void testBypassWithImmediateCancellation() throws Exception {
    +    runBypassCancellationTest(0, 1000);
    +  }
    +
    +  @Test
    +  public void testBypassWithCancellation() throws Exception {
    +    runBypassCancellationTest(200, 1000);
    +  }
    +
    +  public void runBypassCancellationTest(long waitBeforeCancel, long jobDuration) throws Exception {
    +    runTest(true, new TestFunction() {
    +      @Override
    +      public void call(LivyClient client) throws Exception {
    +        Serializer s = new Serializer();
    +        RSCClient lclient = (RSCClient) client;
    +        ByteBuffer job = s.serialize(new Sleeper(jobDuration));
    +        String jobId = lclient.bypass(job, "spark", false);
    +
    +        assertNotCancelledOrFailed(lclient, jobId);
    +
    +        if (waitBeforeCancel > 0) {
    +          Thread.sleep(waitBeforeCancel);
    +          assertNotCancelledOrFailed(lclient, jobId);
             }
    +
    +        lclient.cancel(jobId);
    +        BypassJobStatus currStatus = lclient.getBypassJobStatus(jobId).get(TIMEOUT,
    +                TimeUnit.SECONDS);
    +        assertEquals(JobHandle.State.CANCELLED, currStatus.state);
    +        assertJobIdUntracked(lclient, jobId);
           }
         });
       }
     
    +  private BypassJobStatus assertNotCancelledOrFailed(RSCClient lclient, String jobId)
    +          throws InterruptedException, ExecutionException, TimeoutException {
    +
    +    BypassJobStatus currStatus = lclient.getBypassJobStatus(jobId).get(TIMEOUT,
    +            TimeUnit.SECONDS);
    --- End diff --
    
    nit: too far. basically all your continuation lines are indented too far.


---

[GitHub] incubator-livy pull request #128: [LIVY-533] Use setJobGroup/cancelJobGroup ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/128#discussion_r237282583
  
    --- Diff: rsc/src/main/java/org/apache/livy/rsc/driver/JobWrapper.java ---
    @@ -39,20 +39,27 @@
     
       private final RSCDriver driver;
       private final Job<T> job;
    -  private final AtomicInteger completed;
     
    -  private Future<?> future;
    +  private volatile Future<?> future;
     
       public JobWrapper(RSCDriver driver, String jobId, Job<T> job) {
         this.driver = driver;
         this.jobId = jobId;
         this.job = job;
    -    this.completed = new AtomicInteger();
       }
     
       @Override
       public Void call() throws Exception {
         try {
    +      // this is synchronized to avoid races with cancel()
    --- End diff --
    
    `future` will be null before `submit` is called. What if `cancel` is called before `submit` is called?
    
    That's my question.


---

[GitHub] incubator-livy issue #128: Use setJobGroup/cancelJobGroup to cancel bypass j...

Posted by codecov-io <gi...@git.apache.org>.
Github user codecov-io commented on the issue:

    https://github.com/apache/incubator-livy/pull/128
  
    # [Codecov](https://codecov.io/gh/apache/incubator-livy/pull/128?src=pr&el=h1) Report
    > Merging [#128](https://codecov.io/gh/apache/incubator-livy/pull/128?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-livy/commit/4cfb6bcb8fb9ac6b2d6c8b3d04b20f647b507e1f?src=pr&el=desc) will **increase** coverage by `0.03%`.
    > The diff coverage is `66.66%`.
    
    [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-livy/pull/128/graphs/tree.svg?width=650&token=0MkVbiUFwE&height=150&src=pr)](https://codecov.io/gh/apache/incubator-livy/pull/128?src=pr&el=tree)
    
    ```diff
    @@             Coverage Diff              @@
    ##             master     #128      +/-   ##
    ============================================
    + Coverage     70.98%   71.02%   +0.03%     
    - Complexity      925      928       +3     
    ============================================
      Files           100      100              
      Lines          5511     5511              
      Branches        829      831       +2     
    ============================================
    + Hits           3912     3914       +2     
    + Misses         1058     1055       -3     
    - Partials        541      542       +1
    ```
    
    
    | [Impacted Files](https://codecov.io/gh/apache/incubator-livy/pull/128?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
    |---|---|---|---|
    | [...in/java/org/apache/livy/rsc/driver/JobWrapper.java](https://codecov.io/gh/apache/incubator-livy/pull/128/diff?src=pr&el=tree#diff-cnNjL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9saXZ5L3JzYy9kcml2ZXIvSm9iV3JhcHBlci5qYXZh) | `90% <66.66%> (+10%)` | `9 <3> (+2)` | :arrow_up: |
    | [...cala/org/apache/livy/scalaapi/ScalaJobHandle.scala](https://codecov.io/gh/apache/incubator-livy/pull/128/diff?src=pr&el=tree#diff-c2NhbGEtYXBpL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvbGl2eS9zY2FsYWFwaS9TY2FsYUpvYkhhbmRsZS5zY2FsYQ==) | `52.94% <0%> (-2.95%)` | `7% <0%> (ø)` | |
    | [...c/main/scala/org/apache/livy/repl/ReplDriver.scala](https://codecov.io/gh/apache/incubator-livy/pull/128/diff?src=pr&el=tree#diff-cmVwbC9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2xpdnkvcmVwbC9SZXBsRHJpdmVyLnNjYWxh) | `30.76% <0%> (-2.57%)` | `7% <0%> (ø)` | |
    | [...main/java/org/apache/livy/rsc/ContextLauncher.java](https://codecov.io/gh/apache/incubator-livy/pull/128/diff?src=pr&el=tree#diff-cnNjL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9saXZ5L3JzYy9Db250ZXh0TGF1bmNoZXIuamF2YQ==) | `81.63% <0%> (-2.56%)` | `18% <0%> (ø)` | |
    | [...ain/scala/org/apache/livy/utils/SparkYarnApp.scala](https://codecov.io/gh/apache/incubator-livy/pull/128/diff?src=pr&el=tree#diff-c2VydmVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvbGl2eS91dGlscy9TcGFya1lhcm5BcHAuc2NhbGE=) | `77.46% <0%> (-2.12%)` | `33% <0%> (ø)` | |
    | [...ain/java/org/apache/livy/rsc/driver/RSCDriver.java](https://codecov.io/gh/apache/incubator-livy/pull/128/diff?src=pr&el=tree#diff-cnNjL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9saXZ5L3JzYy9kcml2ZXIvUlNDRHJpdmVyLmphdmE=) | `80.08% <0%> (+2.11%)` | `42% <0%> (+1%)` | :arrow_up: |
    | [...c/src/main/java/org/apache/livy/rsc/RSCClient.java](https://codecov.io/gh/apache/incubator-livy/pull/128/diff?src=pr&el=tree#diff-cnNjL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9saXZ5L3JzYy9SU0NDbGllbnQuamF2YQ==) | `82.6% <0%> (+2.48%)` | `26% <0%> (ø)` | :arrow_down: |
    
    ------
    
    [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-livy/pull/128?src=pr&el=continue).
    > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
    > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
    > Powered by [Codecov](https://codecov.io/gh/apache/incubator-livy/pull/128?src=pr&el=footer). Last update [4cfb6bc...9892259](https://codecov.io/gh/apache/incubator-livy/pull/128?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).



---

[GitHub] incubator-livy pull request #128: [LIVY-533] Use setJobGroup/cancelJobGroup ...

Posted by bjoernlohrmann <gi...@git.apache.org>.
Github user bjoernlohrmann commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/128#discussion_r238336934
  
    --- Diff: rsc/src/main/java/org/apache/livy/rsc/driver/JobWrapper.java ---
    @@ -38,21 +34,31 @@
       public final String jobId;
     
       private final RSCDriver driver;
    +
       private final Job<T> job;
    -  private final AtomicInteger completed;
    +
    +  private boolean isCancelled = false;
     
       private Future<?> future;
     
       public JobWrapper(RSCDriver driver, String jobId, Job<T> job) {
         this.driver = driver;
         this.jobId = jobId;
         this.job = job;
    -    this.completed = new AtomicInteger();
       }
     
       @Override
       public Void call() throws Exception {
         try {
    +      // this is synchronized to avoid races with cancel()
    +      synchronized (this) {
    +        if (isCancelled) {
    +          throw new Exception("Job isCancelled");
    --- End diff --
    
    Done.
    
    Sorry that this took so long, but I also added two tests for bypass job cancellation, which revealed problems in the state handling of BypassJobWrapper. The state would end up as FAILED after cancellation (due to CancellationException being thrown). Also, state transitions like ANCELLED -> STARTED -> FAILED were possible. The latest update to this PR should fix that.
    
    It would be great if this fix could still make it into the 0.6 release.


---

[GitHub] incubator-livy pull request #128: [LIVY-533] Use setJobGroup/cancelJobGroup ...

Posted by bjoernlohrmann <gi...@git.apache.org>.
Github user bjoernlohrmann commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/128#discussion_r238466418
  
    --- Diff: rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java ---
    @@ -490,20 +487,72 @@ public void call(LivyClient client) throws Exception {
             String resultVal = (String) s.deserialize(ByteBuffer.wrap(status.result));
             assertEquals("hello", resultVal);
     
    -        // After the result is retrieved, the driver should stop tracking the job and release
    -        // resources associated with it.
    -        try {
    -          lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
    -          fail("Should have failed to retrieve status of released job.");
    -        } catch (ExecutionException ee) {
    -          assertTrue(ee.getCause() instanceof RpcException);
    -          assertTrue(ee.getCause().getMessage().contains(
    -            "java.util.NoSuchElementException: " + jobId));
    +        assertJobIdUntracked(lclient, jobId);
    +      }
    +    });
    +  }
    +
    +  private void assertJobIdUntracked(RSCClient lclient, String jobId) throws InterruptedException,
    +          TimeoutException {
    +
    +    // After the result is retrieved, the driver should stop tracking the job and release
    +    // resources associated with it.
    +    try {
    +      lclient.getBypassJobStatus(jobId).get(TIMEOUT, TimeUnit.SECONDS);
    +      fail("Should have failed to retrieve status of released job.");
    +    } catch (ExecutionException ee) {
    +      assertTrue(ee.getCause() instanceof RpcException);
    +      assertTrue(ee.getCause().getMessage().contains(
    +              "java.util.NoSuchElementException: " + jobId));
    +    }
    +  }
    +
    +  @Test
    +  public void testBypassWithImmediateCancellation() throws Exception {
    +    runBypassCancellationTest(0, 1000);
    +  }
    +
    +  @Test
    +  public void testBypassWithCancellation() throws Exception {
    +    runBypassCancellationTest(200, 1000);
    +  }
    +
    +  public void runBypassCancellationTest(long waitBeforeCancel, long jobDuration) throws Exception {
    +    runTest(true, new TestFunction() {
    +      @Override
    +      public void call(LivyClient client) throws Exception {
    +        Serializer s = new Serializer();
    +        RSCClient lclient = (RSCClient) client;
    +        ByteBuffer job = s.serialize(new Sleeper(jobDuration));
    +        String jobId = lclient.bypass(job, "spark", false);
    +
    +        assertNotCancelledOrFailed(lclient, jobId);
    +
    +        if (waitBeforeCancel > 0) {
    +          Thread.sleep(waitBeforeCancel);
    +          assertNotCancelledOrFailed(lclient, jobId);
             }
    +
    +        lclient.cancel(jobId);
    +        BypassJobStatus currStatus = lclient.getBypassJobStatus(jobId).get(TIMEOUT,
    +                TimeUnit.SECONDS);
    +        assertEquals(JobHandle.State.CANCELLED, currStatus.state);
    +        assertJobIdUntracked(lclient, jobId);
           }
         });
       }
     
    +  private BypassJobStatus assertNotCancelledOrFailed(RSCClient lclient, String jobId)
    +          throws InterruptedException, ExecutionException, TimeoutException {
    +
    +    BypassJobStatus currStatus = lclient.getBypassJobStatus(jobId).get(TIMEOUT,
    +            TimeUnit.SECONDS);
    --- End diff --
    
    Thanks, good to know.


---

[GitHub] incubator-livy pull request #128: [LIVY-533] Use setJobGroup/cancelJobGroup ...

Posted by bjoernlohrmann <gi...@git.apache.org>.
Github user bjoernlohrmann commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/128#discussion_r237689008
  
    --- Diff: rsc/src/main/java/org/apache/livy/rsc/driver/JobWrapper.java ---
    @@ -39,20 +39,27 @@
     
       private final RSCDriver driver;
       private final Job<T> job;
    -  private final AtomicInteger completed;
     
    -  private Future<?> future;
    +  private volatile Future<?> future;
     
       public JobWrapper(RSCDriver driver, String jobId, Job<T> job) {
         this.driver = driver;
         this.jobId = jobId;
         this.job = job;
    -    this.completed = new AtomicInteger();
       }
     
       @Override
       public Void call() throws Exception {
         try {
    +      // this is synchronized to avoid races with cancel()
    --- End diff --
    
    Good point. I just checked in `RSCDriver.handle(ChannelHandlerContext, BypassJobRequest)` and it looks like this can actually happen. If it does, cancellation won't work properly (as you said).
    
    I updated the PR with a boolean flag `isCancelled` that addresses the issue.


---

[GitHub] incubator-livy pull request #128: [LIVY-533] Use setJobGroup/cancelJobGroup ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/128#discussion_r238384607
  
    --- Diff: rsc/src/main/java/org/apache/livy/rsc/driver/BypassJobWrapper.java ---
    @@ -38,28 +38,50 @@ public BypassJobWrapper(RSCDriver driver, String jobId, Job<byte[]> serializedJo
     
       @Override
       public Void call() throws Exception {
    -    state = JobHandle.State.STARTED;
    +    // we ignore the return value here, because super.call() will also detect cancellation
    +    // and needs to perform some cleanup
    +    tryTransitionToState(JobHandle.State.STARTED);
    --- End diff --
    
    There's a race in between this call and the call to `super.call` though. You may transition to started and the job is cancelled before `super.call()` runs. Although as far as I can see that won't cause problems.


---