You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/01/20 20:21:32 UTC

svn commit: r1653341 - in /hive/branches/spark/spark-client: ./ src/main/java/org/apache/hive/spark/client/ src/test/java/org/apache/hive/spark/client/

Author: xuefu
Date: Tue Jan 20 19:21:31 2015
New Revision: 1653341

URL: http://svn.apache.org/r1653341
Log:
HIVE-9179: Add listeners on JobHandle so job status change can be notified to the client [Spark Branch] (Marcelo via Xuefu)

Added:
    hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java
Modified:
    hive/branches/spark/spark-client/pom.xml
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
    hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java

Modified: hive/branches/spark/spark-client/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/pom.xml?rev=1653341&r1=1653340&r2=1653341&view=diff
==============================================================================
--- hive/branches/spark/spark-client/pom.xml (original)
+++ hive/branches/spark/spark-client/pom.xml Tue Jan 20 19:21:31 2015
@@ -65,6 +65,11 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java?rev=1653341&r1=1653340&r2=1653341&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java Tue Jan 20 19:21:31 2015
@@ -121,6 +121,20 @@ abstract class BaseProtocol extends RpcD
 
   }
 
+  protected static class JobStarted implements Serializable {
+
+    final String id;
+
+    JobStarted(String id) {
+      this.id = id;
+    }
+
+    JobStarted() {
+      this(null);
+    }
+
+  }
+
   /**
    * Inform the client that a new spark job has been submitted for the client job.
    */

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java?rev=1653341&r1=1653340&r2=1653341&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java Tue Jan 20 19:21:31 2015
@@ -55,4 +55,53 @@ public interface JobHandle<T extends Ser
    */
   SparkCounters getSparkCounters();
 
+  /**
+   * Return the current state of the job.
+   */
+  State getState();
+
+  /**
+   * Add a listener to the job handle. If the job's state is not SENT, a callback for the
+   * corresponding state will be invoked immediately.
+   *
+   * @param l The listener to add.
+   */
+  void addListener(Listener<T> l);
+
+  /**
+   * The current state of the submitted job.
+   */
+  static enum State {
+    SENT,
+    QUEUED,
+    STARTED,
+    CANCELLED,
+    FAILED,
+    SUCCEEDED;
+  }
+
+  /**
+   * A listener for monitoring the state of the job in the remote context. Callbacks are called
+   * when the corresponding state change occurs.
+   */
+  static interface Listener<T extends Serializable> {
+
+    void onJobQueued(JobHandle<T> job);
+
+    void onJobStarted(JobHandle<T> job);
+
+    void onJobCancelled(JobHandle<T> job);
+
+    void onJobFailed(JobHandle<T> job, Throwable cause);
+
+    void onJobSucceeded(JobHandle<T> job, T result);
+
+    /**
+     * Called when a monitored Spark job is started on the remote context. This callback
+     * does not indicate a state change in the client job's status.
+     */
+    void onSparkJobStarted(JobHandle<T> job, int sparkJobId);
+
+  }
+
 }

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java?rev=1653341&r1=1653340&r2=1653341&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java Tue Jan 20 19:21:31 2015
@@ -17,15 +17,16 @@
 
 package org.apache.hive.spark.client;
 
-import io.netty.util.concurrent.Promise;
-
 import java.io.Serializable;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import io.netty.util.concurrent.Promise;
 
 import org.apache.hive.spark.counter.SparkCounters;
 
@@ -34,28 +35,30 @@ import org.apache.hive.spark.counter.Spa
  */
 class JobHandleImpl<T extends Serializable> implements JobHandle<T> {
 
-  private final AtomicBoolean cancelled;
   private final SparkClientImpl client;
   private final String jobId;
   private final MetricsCollection metrics;
   private final Promise<T> promise;
   private final List<Integer> sparkJobIds;
+  private final List<Listener> listeners;
+  private volatile State state;
   private volatile SparkCounters sparkCounters;
 
   JobHandleImpl(SparkClientImpl client, Promise<T> promise, String jobId) {
-    this.cancelled = new AtomicBoolean();
     this.client = client;
     this.jobId = jobId;
     this.promise = promise;
+    this.listeners = Lists.newLinkedList();
     this.metrics = new MetricsCollection();
     this.sparkJobIds = new CopyOnWriteArrayList<Integer>();
+    this.state = State.SENT;
     this.sparkCounters = null;
   }
 
   /** Requests a running job to be cancelled. */
   @Override
   public boolean cancel(boolean mayInterrupt) {
-    if (cancelled.compareAndSet(false, true)) {
+    if (changeState(State.CANCELLED)) {
       client.cancel(jobId);
       promise.cancel(mayInterrupt);
       return true;
@@ -114,20 +117,116 @@ class JobHandleImpl<T extends Serializab
     return sparkCounters;
   }
 
+  @Override
+  public State getState() {
+    return state;
+  }
+
+  @Override
+  public void addListener(Listener l) {
+    synchronized (listeners) {
+      listeners.add(l);
+      // If current state is a final state, notify of Spark job IDs before notifying about the
+      // state transition.
+      if (state.ordinal() >= State.CANCELLED.ordinal()) {
+        for (Integer i : sparkJobIds) {
+          l.onSparkJobStarted(this, i);
+        }
+      }
+
+      fireStateChange(state, l);
+
+      // Otherwise, notify about Spark jobs after the state notification.
+      if (state.ordinal() < State.CANCELLED.ordinal()) {
+        for (Integer i : sparkJobIds) {
+          l.onSparkJobStarted(this, i);
+        }
+      }
+    }
+  }
+
   public void setSparkCounters(SparkCounters sparkCounters) {
     this.sparkCounters = sparkCounters;
   }
 
   @SuppressWarnings("unchecked")
   void setSuccess(Object result) {
-    promise.setSuccess((T) result);
+    // The synchronization here is not necessary, but tests depend on it.
+    synchronized (listeners) {
+      promise.setSuccess((T) result);
+      changeState(State.SUCCEEDED);
+    }
   }
 
   void setFailure(Throwable error) {
-    promise.setFailure(error);
+    // The synchronization here is not necessary, but tests depend on it.
+    synchronized (listeners) {
+      promise.setFailure(error);
+      changeState(State.FAILED);
+    }
+  }
+
+  /**
+   * Changes the state of this job handle, making sure that illegal state transitions are ignored.
+   * Fires events appropriately.
+   *
+   * As a rule, state transitions can only occur if the current state is "higher" than the current
+   * state (i.e., has a higher ordinal number) and is not a "final" state. "Final" states are
+   * CANCELLED, FAILED and SUCCEEDED, defined here in the code as having an ordinal number higher
+   * than the CANCELLED enum constant.
+   */
+  boolean changeState(State newState) {
+    synchronized (listeners) {
+      if (newState.ordinal() > state.ordinal() && state.ordinal() < State.CANCELLED.ordinal()) {
+        state = newState;
+        for (Listener l : listeners) {
+          fireStateChange(newState, l);
+        }
+        return true;
+      }
+      return false;
+    }
+  }
+
+  void addSparkJobId(int sparkJobId) {
+    synchronized (listeners) {
+      sparkJobIds.add(sparkJobId);
+      for (Listener l : listeners) {
+        l.onSparkJobStarted(this, sparkJobId);
+      }
+    }
+  }
+
+  private void fireStateChange(State s, Listener l) {
+    switch (s) {
+    case SENT:
+      break;
+    case QUEUED:
+      l.onJobQueued(this);
+      break;
+    case STARTED:
+      l.onJobStarted(this);
+      break;
+    case CANCELLED:
+      l.onJobCancelled(this);
+      break;
+    case FAILED:
+      l.onJobFailed(this, promise.cause());
+      break;
+    case SUCCEEDED:
+      try {
+        l.onJobSucceeded(this, promise.get());
+      } catch (Exception e) {
+        // Shouldn't really happen.
+        throw new IllegalStateException(e);
+      }
+      break;
+    default:
+      throw new IllegalStateException();
+    }
   }
 
-  /** Last attempt resort at preventing stray jobs from accumulating in SparkClientImpl. */
+  /** Last attempt at preventing stray jobs from accumulating in SparkClientImpl. */
   @Override
   protected void finalize() {
     if (!isDone()) {

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java?rev=1653341&r1=1653340&r2=1653341&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java Tue Jan 20 19:21:31 2015
@@ -245,6 +245,10 @@ public class RemoteDriver {
       clientRpc.call(new JobResult(jobId, result, error, counters));
     }
 
+    void jobStarted(String jobId) {
+      clientRpc.call(new JobStarted(jobId));
+    }
+
     void jobSubmitted(String jobId, int sparkJobId) {
       LOG.debug("Send job({}/{}) submitted to Client.", jobId, sparkJobId);
       clientRpc.call(new JobSubmitted(jobId, sparkJobId));
@@ -325,6 +329,8 @@ public class RemoteDriver {
 
     @Override
     public Void call() throws Exception {
+      protocol.jobStarted(req.id);
+
       try {
         jc.setMonitorCb(new MonitorCallback() {
           @Override

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1653341&r1=1653340&r2=1653341&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Tue Jan 20 19:21:31 2015
@@ -382,7 +382,7 @@ class SparkClientImpl implements SparkCl
     <T extends Serializable> JobHandleImpl<T> submit(Job<T> job) {
       final String jobId = UUID.randomUUID().toString();
       final Promise<T> promise = driverRpc.createPromise();
-      JobHandleImpl<T> handle = new JobHandleImpl<T>(SparkClientImpl.this, promise, jobId);
+      final JobHandleImpl<T> handle = new JobHandleImpl<T>(SparkClientImpl.this, promise, jobId);
       jobs.put(jobId, handle);
 
       final io.netty.util.concurrent.Future<Void> rpc = driverRpc.call(new JobRequest(jobId, job));
@@ -393,7 +393,9 @@ class SparkClientImpl implements SparkCl
       rpc.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() {
         @Override
         public void operationComplete(io.netty.util.concurrent.Future<Void> f) {
-          if (!f.isSuccess() && !promise.isDone()) {
+          if (f.isSuccess()) {
+            handle.changeState(JobHandle.State.QUEUED);
+          } else if (!promise.isDone()) {
             promise.setFailure(f.cause());
           }
         }
@@ -456,11 +458,20 @@ class SparkClientImpl implements SparkCl
       }
     }
 
+    private void handle(ChannelHandlerContext ctx, JobStarted msg) {
+      JobHandleImpl<?> handle = jobs.get(msg.id);
+      if (handle != null) {
+        handle.changeState(JobHandle.State.STARTED);
+      } else {
+        LOG.warn("Received event for unknown job {}", msg.id);
+      }
+    }
+
     private void handle(ChannelHandlerContext ctx, JobSubmitted msg) {
       JobHandleImpl<?> handle = jobs.get(msg.clientJobId);
       if (handle != null) {
         LOG.info("Received spark job ID: {} for {}", msg.sparkJobId, msg.clientJobId);
-        handle.getSparkJobIds().add(msg.sparkJobId);
+        handle.addSparkJobId(msg.sparkJobId);
       } else {
         LOG.warn("Received spark job ID: {} for unknown job {}", msg.sparkJobId, msg.clientJobId);
       }

Added: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java?rev=1653341&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java (added)
+++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java Tue Jan 20 19:21:31 2015
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.Serializable;
+
+import io.netty.util.concurrent.Promise;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestJobHandle {
+
+  @Mock private SparkClientImpl client;
+  @Mock private Promise<Serializable> promise;
+  @Mock private JobHandle.Listener<Serializable> listener;
+  @Mock private JobHandle.Listener<Serializable> listener2;
+
+  @Test
+  public void testStateChanges() throws Exception {
+    JobHandleImpl<Serializable> handle = new JobHandleImpl<Serializable>(client, promise, "job");
+    handle.addListener(listener);
+
+    assertTrue(handle.changeState(JobHandle.State.QUEUED));
+    verify(listener).onJobQueued(handle);
+
+    assertTrue(handle.changeState(JobHandle.State.STARTED));
+    verify(listener).onJobStarted(handle);
+
+    handle.addSparkJobId(1);
+    verify(listener).onSparkJobStarted(same(handle), eq(1));
+
+    assertTrue(handle.changeState(JobHandle.State.CANCELLED));
+    verify(listener).onJobCancelled(handle);
+
+    assertFalse(handle.changeState(JobHandle.State.STARTED));
+    assertFalse(handle.changeState(JobHandle.State.FAILED));
+    assertFalse(handle.changeState(JobHandle.State.SUCCEEDED));
+  }
+
+  @Test
+  public void testFailedJob() throws Exception {
+    JobHandleImpl<Serializable> handle = new JobHandleImpl<Serializable>(client, promise, "job");
+    handle.addListener(listener);
+
+    Throwable cause = new Exception();
+    when(promise.cause()).thenReturn(cause);
+
+    assertTrue(handle.changeState(JobHandle.State.FAILED));
+    verify(promise).cause();
+    verify(listener).onJobFailed(handle, cause);
+  }
+
+  @Test
+  public void testSucceededJob() throws Exception {
+    JobHandleImpl<Serializable> handle = new JobHandleImpl<Serializable>(client, promise, "job");
+    handle.addListener(listener);
+
+    Serializable result = new Exception();
+    when(promise.get()).thenReturn(result);
+
+    assertTrue(handle.changeState(JobHandle.State.SUCCEEDED));
+    verify(promise).get();
+    verify(listener).onJobSucceeded(handle, result);
+  }
+
+  @Test
+  public void testImmediateCallback() throws Exception {
+    JobHandleImpl<Serializable> handle = new JobHandleImpl<Serializable>(client, promise, "job");
+    assertTrue(handle.changeState(JobHandle.State.QUEUED));
+    handle.addListener(listener);
+    verify(listener).onJobQueued(handle);
+
+    handle.changeState(JobHandle.State.STARTED);
+    handle.addSparkJobId(1);
+    handle.changeState(JobHandle.State.CANCELLED);
+
+    handle.addListener(listener2);
+    InOrder inOrder = inOrder(listener2);
+    inOrder.verify(listener2).onSparkJobStarted(same(handle), eq(1));
+    inOrder.verify(listener2).onJobCancelled(same(handle));
+  }
+
+}

Modified: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java?rev=1653341&r1=1653340&r2=1653341&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java (original)
+++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java Tue Jan 20 19:21:31 2015
@@ -17,15 +17,11 @@
 
 package org.apache.hive.spark.client;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.InputStream;
+import java.io.Serializable;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -36,17 +32,19 @@ import java.util.concurrent.TimeUnit;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
 
+import com.google.common.base.Objects;
+import com.google.common.base.Strings;
+import com.google.common.io.ByteStreams;
 import org.apache.hive.spark.counter.SparkCounters;
+import org.apache.spark.SparkException;
 import org.apache.spark.SparkFiles;
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.VoidFunction;
 import org.junit.Test;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Strings;
-import com.google.common.io.ByteStreams;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 public class TestSparkClient {
 
@@ -79,8 +77,19 @@ public class TestSparkClient {
     runTest(true, new TestFunction() {
       @Override
       public void call(SparkClient client) throws Exception {
+        JobHandle.Listener<String> listener = newListener();
         JobHandle<String> handle = client.submit(new SimpleJob());
+        handle.addListener(listener);
         assertEquals("hello", handle.get(TIMEOUT, TimeUnit.SECONDS));
+
+        // Try an invalid state transition on the handle. This ensures that the actual state
+        // change we're interested in actually happened, since internally the handle serializes
+        // state changes.
+        assertFalse(((JobHandleImpl<String>)handle).changeState(JobHandle.State.SENT));
+
+        verify(listener).onJobQueued(handle);
+        verify(listener).onJobStarted(handle);
+        verify(listener).onJobSucceeded(same(handle), eq(handle.get()));
       }
     });
   }
@@ -101,12 +110,25 @@ public class TestSparkClient {
     runTest(true, new TestFunction() {
       @Override
       public void call(SparkClient client) throws Exception {
-      JobHandle<String> handle = client.submit(new SimpleJob());
+        JobHandle.Listener<String> listener = newListener();
+        JobHandle<String> handle = client.submit(new ErrorJob());
+        handle.addListener(listener);
         try {
           handle.get(TIMEOUT, TimeUnit.SECONDS);
+          fail("Should have thrown an exception.");
         } catch (ExecutionException ee) {
-          assertTrue(ee.getCause() instanceof IllegalStateException);
+          assertTrue(ee.getCause() instanceof SparkException);
+          assertTrue(ee.getCause().getMessage().contains("IllegalStateException: Hello"));
         }
+
+        // Try an invalid state transition on the handle. This ensures that the actual state
+        // change we're interested in actually happened, since internally the handle serializes
+        // state changes.
+        assertFalse(((JobHandleImpl<String>)handle).changeState(JobHandle.State.SENT));
+
+        verify(listener).onJobQueued(handle);
+        verify(listener).onJobStarted(handle);
+        verify(listener).onJobFailed(same(handle), any(Throwable.class));
       }
     });
   }
@@ -138,18 +160,26 @@ public class TestSparkClient {
     runTest(true, new TestFunction() {
       @Override
       public void call(SparkClient client) throws Exception {
+        JobHandle.Listener<Integer> listener = newListener();
         JobHandle<Integer> future = client.submit(new AsyncSparkJob());
+        future.addListener(listener);
         future.get(TIMEOUT, TimeUnit.SECONDS);
         MetricsCollection metrics = future.getMetrics();
         assertEquals(1, metrics.getJobIds().size());
         assertTrue(metrics.getAllMetrics().executorRunTime > 0L);
+        verify(listener).onSparkJobStarted(same(future),
+          eq(metrics.getJobIds().iterator().next()));
 
+        JobHandle.Listener<Integer> listener2 = newListener();
         JobHandle<Integer> future2 = client.submit(new AsyncSparkJob());
+        future2.addListener(listener2);
         future2.get(TIMEOUT, TimeUnit.SECONDS);
         MetricsCollection metrics2 = future2.getMetrics();
         assertEquals(1, metrics2.getJobIds().size());
         assertFalse(Objects.equal(metrics.getJobIds(), metrics2.getJobIds()));
         assertTrue(metrics2.getAllMetrics().executorRunTime > 0L);
+        verify(listener2).onSparkJobStarted(same(future2),
+          eq(metrics2.getJobIds().iterator().next()));
       }
     });
   }
@@ -226,6 +256,13 @@ public class TestSparkClient {
     });
   }
 
+  private <T extends Serializable> JobHandle.Listener<T> newListener() {
+    @SuppressWarnings("unchecked")
+    JobHandle.Listener<T> listener =
+      (JobHandle.Listener<T>) mock(JobHandle.Listener.class);
+    return listener;
+  }
+
   private void runTest(boolean local, TestFunction test) throws Exception {
     Map<String, String> conf = createConf(local);
     SparkClientFactory.initialize(conf);
@@ -250,6 +287,15 @@ public class TestSparkClient {
     }
 
   }
+
+  private static class ErrorJob implements Job<String> {
+
+    @Override
+    public String call(JobContext jc) {
+      throw new IllegalStateException("Hello");
+    }
+
+  }
 
   private static class SparkJob implements Job<Long> {