You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/01/20 20:21:32 UTC
svn commit: r1653341 - in /hive/branches/spark/spark-client: ./
src/main/java/org/apache/hive/spark/client/
src/test/java/org/apache/hive/spark/client/
Author: xuefu
Date: Tue Jan 20 19:21:31 2015
New Revision: 1653341
URL: http://svn.apache.org/r1653341
Log:
HIVE-9179: Add listeners on JobHandle so job status change can be notified to the client [Spark Branch] (Marcelo via Xuefu)
Added:
hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java
Modified:
hive/branches/spark/spark-client/pom.xml
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
Modified: hive/branches/spark/spark-client/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/pom.xml?rev=1653341&r1=1653340&r2=1653341&view=diff
==============================================================================
--- hive/branches/spark/spark-client/pom.xml (original)
+++ hive/branches/spark/spark-client/pom.xml Tue Jan 20 19:21:31 2015
@@ -65,6 +65,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java?rev=1653341&r1=1653340&r2=1653341&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java Tue Jan 20 19:21:31 2015
@@ -121,6 +121,20 @@ abstract class BaseProtocol extends RpcD
}
+ protected static class JobStarted implements Serializable {
+
+ final String id;
+
+ JobStarted(String id) {
+ this.id = id;
+ }
+
+ JobStarted() {
+ this(null);
+ }
+
+ }
+
/**
* Inform the client that a new spark job has been submitted for the client job.
*/
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java?rev=1653341&r1=1653340&r2=1653341&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java Tue Jan 20 19:21:31 2015
@@ -55,4 +55,53 @@ public interface JobHandle<T extends Ser
*/
SparkCounters getSparkCounters();
+ /**
+ * Return the current state of the job.
+ */
+ State getState();
+
+ /**
+ * Add a listener to the job handle. If the job's state is not SENT, a callback for the
+ * corresponding state will be invoked immediately.
+ *
+ * @param l The listener to add.
+ */
+ void addListener(Listener<T> l);
+
+ /**
+ * The current state of the submitted job.
+ */
+ static enum State {
+ SENT,
+ QUEUED,
+ STARTED,
+ CANCELLED,
+ FAILED,
+ SUCCEEDED;
+ }
+
+ /**
+ * A listener for monitoring the state of the job in the remote context. Callbacks are called
+ * when the corresponding state change occurs.
+ */
+ static interface Listener<T extends Serializable> {
+
+ void onJobQueued(JobHandle<T> job);
+
+ void onJobStarted(JobHandle<T> job);
+
+ void onJobCancelled(JobHandle<T> job);
+
+ void onJobFailed(JobHandle<T> job, Throwable cause);
+
+ void onJobSucceeded(JobHandle<T> job, T result);
+
+ /**
+ * Called when a monitored Spark job is started on the remote context. This callback
+ * does not indicate a state change in the client job's status.
+ */
+ void onSparkJobStarted(JobHandle<T> job, int sparkJobId);
+
+ }
+
}
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java?rev=1653341&r1=1653340&r2=1653341&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java Tue Jan 20 19:21:31 2015
@@ -17,15 +17,16 @@
package org.apache.hive.spark.client;
-import io.netty.util.concurrent.Promise;
-
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import io.netty.util.concurrent.Promise;
import org.apache.hive.spark.counter.SparkCounters;
@@ -34,28 +35,30 @@ import org.apache.hive.spark.counter.Spa
*/
class JobHandleImpl<T extends Serializable> implements JobHandle<T> {
- private final AtomicBoolean cancelled;
private final SparkClientImpl client;
private final String jobId;
private final MetricsCollection metrics;
private final Promise<T> promise;
private final List<Integer> sparkJobIds;
+ private final List<Listener> listeners;
+ private volatile State state;
private volatile SparkCounters sparkCounters;
JobHandleImpl(SparkClientImpl client, Promise<T> promise, String jobId) {
- this.cancelled = new AtomicBoolean();
this.client = client;
this.jobId = jobId;
this.promise = promise;
+ this.listeners = Lists.newLinkedList();
this.metrics = new MetricsCollection();
this.sparkJobIds = new CopyOnWriteArrayList<Integer>();
+ this.state = State.SENT;
this.sparkCounters = null;
}
/** Requests a running job to be cancelled. */
@Override
public boolean cancel(boolean mayInterrupt) {
- if (cancelled.compareAndSet(false, true)) {
+ if (changeState(State.CANCELLED)) {
client.cancel(jobId);
promise.cancel(mayInterrupt);
return true;
@@ -114,20 +117,116 @@ class JobHandleImpl<T extends Serializab
return sparkCounters;
}
+ @Override
+ public State getState() {
+ return state;
+ }
+
+ @Override
+ public void addListener(Listener l) {
+ synchronized (listeners) {
+ listeners.add(l);
+ // If current state is a final state, notify of Spark job IDs before notifying about the
+ // state transition.
+ if (state.ordinal() >= State.CANCELLED.ordinal()) {
+ for (Integer i : sparkJobIds) {
+ l.onSparkJobStarted(this, i);
+ }
+ }
+
+ fireStateChange(state, l);
+
+ // Otherwise, notify about Spark jobs after the state notification.
+ if (state.ordinal() < State.CANCELLED.ordinal()) {
+ for (Integer i : sparkJobIds) {
+ l.onSparkJobStarted(this, i);
+ }
+ }
+ }
+ }
+
public void setSparkCounters(SparkCounters sparkCounters) {
this.sparkCounters = sparkCounters;
}
@SuppressWarnings("unchecked")
void setSuccess(Object result) {
- promise.setSuccess((T) result);
+ // The synchronization here is not necessary, but tests depend on it.
+ synchronized (listeners) {
+ promise.setSuccess((T) result);
+ changeState(State.SUCCEEDED);
+ }
}
void setFailure(Throwable error) {
- promise.setFailure(error);
+ // The synchronization here is not necessary, but tests depend on it.
+ synchronized (listeners) {
+ promise.setFailure(error);
+ changeState(State.FAILED);
+ }
+ }
+
+ /**
+ * Changes the state of this job handle, making sure that illegal state transitions are ignored.
+ * Fires events appropriately.
+ *
+ * As a rule, state transitions can only occur if the current state is "higher" than the current
+ * state (i.e., has a higher ordinal number) and is not a "final" state. "Final" states are
+ * CANCELLED, FAILED and SUCCEEDED, defined here in the code as having an ordinal number higher
+ * than the CANCELLED enum constant.
+ */
+ boolean changeState(State newState) {
+ synchronized (listeners) {
+ if (newState.ordinal() > state.ordinal() && state.ordinal() < State.CANCELLED.ordinal()) {
+ state = newState;
+ for (Listener l : listeners) {
+ fireStateChange(newState, l);
+ }
+ return true;
+ }
+ return false;
+ }
+ }
+
+ void addSparkJobId(int sparkJobId) {
+ synchronized (listeners) {
+ sparkJobIds.add(sparkJobId);
+ for (Listener l : listeners) {
+ l.onSparkJobStarted(this, sparkJobId);
+ }
+ }
+ }
+
+ private void fireStateChange(State s, Listener l) {
+ switch (s) {
+ case SENT:
+ break;
+ case QUEUED:
+ l.onJobQueued(this);
+ break;
+ case STARTED:
+ l.onJobStarted(this);
+ break;
+ case CANCELLED:
+ l.onJobCancelled(this);
+ break;
+ case FAILED:
+ l.onJobFailed(this, promise.cause());
+ break;
+ case SUCCEEDED:
+ try {
+ l.onJobSucceeded(this, promise.get());
+ } catch (Exception e) {
+ // Shouldn't really happen.
+ throw new IllegalStateException(e);
+ }
+ break;
+ default:
+ throw new IllegalStateException();
+ }
}
- /** Last attempt resort at preventing stray jobs from accumulating in SparkClientImpl. */
+ /** Last attempt at preventing stray jobs from accumulating in SparkClientImpl. */
@Override
protected void finalize() {
if (!isDone()) {
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java?rev=1653341&r1=1653340&r2=1653341&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java Tue Jan 20 19:21:31 2015
@@ -245,6 +245,10 @@ public class RemoteDriver {
clientRpc.call(new JobResult(jobId, result, error, counters));
}
+ void jobStarted(String jobId) {
+ clientRpc.call(new JobStarted(jobId));
+ }
+
void jobSubmitted(String jobId, int sparkJobId) {
LOG.debug("Send job({}/{}) submitted to Client.", jobId, sparkJobId);
clientRpc.call(new JobSubmitted(jobId, sparkJobId));
@@ -325,6 +329,8 @@ public class RemoteDriver {
@Override
public Void call() throws Exception {
+ protocol.jobStarted(req.id);
+
try {
jc.setMonitorCb(new MonitorCallback() {
@Override
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1653341&r1=1653340&r2=1653341&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Tue Jan 20 19:21:31 2015
@@ -382,7 +382,7 @@ class SparkClientImpl implements SparkCl
<T extends Serializable> JobHandleImpl<T> submit(Job<T> job) {
final String jobId = UUID.randomUUID().toString();
final Promise<T> promise = driverRpc.createPromise();
- JobHandleImpl<T> handle = new JobHandleImpl<T>(SparkClientImpl.this, promise, jobId);
+ final JobHandleImpl<T> handle = new JobHandleImpl<T>(SparkClientImpl.this, promise, jobId);
jobs.put(jobId, handle);
final io.netty.util.concurrent.Future<Void> rpc = driverRpc.call(new JobRequest(jobId, job));
@@ -393,7 +393,9 @@ class SparkClientImpl implements SparkCl
rpc.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<Void> f) {
- if (!f.isSuccess() && !promise.isDone()) {
+ if (f.isSuccess()) {
+ handle.changeState(JobHandle.State.QUEUED);
+ } else if (!promise.isDone()) {
promise.setFailure(f.cause());
}
}
@@ -456,11 +458,20 @@ class SparkClientImpl implements SparkCl
}
}
+ private void handle(ChannelHandlerContext ctx, JobStarted msg) {
+ JobHandleImpl<?> handle = jobs.get(msg.id);
+ if (handle != null) {
+ handle.changeState(JobHandle.State.STARTED);
+ } else {
+ LOG.warn("Received event for unknown job {}", msg.id);
+ }
+ }
+
private void handle(ChannelHandlerContext ctx, JobSubmitted msg) {
JobHandleImpl<?> handle = jobs.get(msg.clientJobId);
if (handle != null) {
LOG.info("Received spark job ID: {} for {}", msg.sparkJobId, msg.clientJobId);
- handle.getSparkJobIds().add(msg.sparkJobId);
+ handle.addSparkJobId(msg.sparkJobId);
} else {
LOG.warn("Received spark job ID: {} for unknown job {}", msg.sparkJobId, msg.clientJobId);
}
Added: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java?rev=1653341&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java (added)
+++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java Tue Jan 20 19:21:31 2015
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.Serializable;
+
+import io.netty.util.concurrent.Promise;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestJobHandle {
+
+ @Mock private SparkClientImpl client;
+ @Mock private Promise<Serializable> promise;
+ @Mock private JobHandle.Listener<Serializable> listener;
+ @Mock private JobHandle.Listener<Serializable> listener2;
+
+ @Test
+ public void testStateChanges() throws Exception {
+ JobHandleImpl<Serializable> handle = new JobHandleImpl<Serializable>(client, promise, "job");
+ handle.addListener(listener);
+
+ assertTrue(handle.changeState(JobHandle.State.QUEUED));
+ verify(listener).onJobQueued(handle);
+
+ assertTrue(handle.changeState(JobHandle.State.STARTED));
+ verify(listener).onJobStarted(handle);
+
+ handle.addSparkJobId(1);
+ verify(listener).onSparkJobStarted(same(handle), eq(1));
+
+ assertTrue(handle.changeState(JobHandle.State.CANCELLED));
+ verify(listener).onJobCancelled(handle);
+
+ assertFalse(handle.changeState(JobHandle.State.STARTED));
+ assertFalse(handle.changeState(JobHandle.State.FAILED));
+ assertFalse(handle.changeState(JobHandle.State.SUCCEEDED));
+ }
+
+ @Test
+ public void testFailedJob() throws Exception {
+ JobHandleImpl<Serializable> handle = new JobHandleImpl<Serializable>(client, promise, "job");
+ handle.addListener(listener);
+
+ Throwable cause = new Exception();
+ when(promise.cause()).thenReturn(cause);
+
+ assertTrue(handle.changeState(JobHandle.State.FAILED));
+ verify(promise).cause();
+ verify(listener).onJobFailed(handle, cause);
+ }
+
+ @Test
+ public void testSucceededJob() throws Exception {
+ JobHandleImpl<Serializable> handle = new JobHandleImpl<Serializable>(client, promise, "job");
+ handle.addListener(listener);
+
+ Serializable result = new Exception();
+ when(promise.get()).thenReturn(result);
+
+ assertTrue(handle.changeState(JobHandle.State.SUCCEEDED));
+ verify(promise).get();
+ verify(listener).onJobSucceeded(handle, result);
+ }
+
+ @Test
+ public void testImmediateCallback() throws Exception {
+ JobHandleImpl<Serializable> handle = new JobHandleImpl<Serializable>(client, promise, "job");
+ assertTrue(handle.changeState(JobHandle.State.QUEUED));
+ handle.addListener(listener);
+ verify(listener).onJobQueued(handle);
+
+ handle.changeState(JobHandle.State.STARTED);
+ handle.addSparkJobId(1);
+ handle.changeState(JobHandle.State.CANCELLED);
+
+ handle.addListener(listener2);
+ InOrder inOrder = inOrder(listener2);
+ inOrder.verify(listener2).onSparkJobStarted(same(handle), eq(1));
+ inOrder.verify(listener2).onJobCancelled(same(handle));
+ }
+
+}
Modified: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java?rev=1653341&r1=1653340&r2=1653341&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java (original)
+++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java Tue Jan 20 19:21:31 2015
@@ -17,15 +17,11 @@
package org.apache.hive.spark.client;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
+import java.io.Serializable;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
@@ -36,17 +32,19 @@ import java.util.concurrent.TimeUnit;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
+import com.google.common.base.Objects;
+import com.google.common.base.Strings;
+import com.google.common.io.ByteStreams;
import org.apache.hive.spark.counter.SparkCounters;
+import org.apache.spark.SparkException;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.junit.Test;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Strings;
-import com.google.common.io.ByteStreams;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
public class TestSparkClient {
@@ -79,8 +77,19 @@ public class TestSparkClient {
runTest(true, new TestFunction() {
@Override
public void call(SparkClient client) throws Exception {
+ JobHandle.Listener<String> listener = newListener();
JobHandle<String> handle = client.submit(new SimpleJob());
+ handle.addListener(listener);
assertEquals("hello", handle.get(TIMEOUT, TimeUnit.SECONDS));
+
+ // Try an invalid state transition on the handle. This ensures that the actual state
+ // change we're interested in actually happened, since internally the handle serializes
+ // state changes.
+ assertFalse(((JobHandleImpl<String>)handle).changeState(JobHandle.State.SENT));
+
+ verify(listener).onJobQueued(handle);
+ verify(listener).onJobStarted(handle);
+ verify(listener).onJobSucceeded(same(handle), eq(handle.get()));
}
});
}
@@ -101,12 +110,25 @@ public class TestSparkClient {
runTest(true, new TestFunction() {
@Override
public void call(SparkClient client) throws Exception {
- JobHandle<String> handle = client.submit(new SimpleJob());
+ JobHandle.Listener<String> listener = newListener();
+ JobHandle<String> handle = client.submit(new ErrorJob());
+ handle.addListener(listener);
try {
handle.get(TIMEOUT, TimeUnit.SECONDS);
+ fail("Should have thrown an exception.");
} catch (ExecutionException ee) {
- assertTrue(ee.getCause() instanceof IllegalStateException);
+ assertTrue(ee.getCause() instanceof SparkException);
+ assertTrue(ee.getCause().getMessage().contains("IllegalStateException: Hello"));
}
+
+ // Try an invalid state transition on the handle. This ensures that the actual state
+ // change we're interested in actually happened, since internally the handle serializes
+ // state changes.
+ assertFalse(((JobHandleImpl<String>)handle).changeState(JobHandle.State.SENT));
+
+ verify(listener).onJobQueued(handle);
+ verify(listener).onJobStarted(handle);
+ verify(listener).onJobFailed(same(handle), any(Throwable.class));
}
});
}
@@ -138,18 +160,26 @@ public class TestSparkClient {
runTest(true, new TestFunction() {
@Override
public void call(SparkClient client) throws Exception {
+ JobHandle.Listener<Integer> listener = newListener();
JobHandle<Integer> future = client.submit(new AsyncSparkJob());
+ future.addListener(listener);
future.get(TIMEOUT, TimeUnit.SECONDS);
MetricsCollection metrics = future.getMetrics();
assertEquals(1, metrics.getJobIds().size());
assertTrue(metrics.getAllMetrics().executorRunTime > 0L);
+ verify(listener).onSparkJobStarted(same(future),
+ eq(metrics.getJobIds().iterator().next()));
+ JobHandle.Listener<Integer> listener2 = newListener();
JobHandle<Integer> future2 = client.submit(new AsyncSparkJob());
+ future2.addListener(listener2);
future2.get(TIMEOUT, TimeUnit.SECONDS);
MetricsCollection metrics2 = future2.getMetrics();
assertEquals(1, metrics2.getJobIds().size());
assertFalse(Objects.equal(metrics.getJobIds(), metrics2.getJobIds()));
assertTrue(metrics2.getAllMetrics().executorRunTime > 0L);
+ verify(listener2).onSparkJobStarted(same(future2),
+ eq(metrics2.getJobIds().iterator().next()));
}
});
}
@@ -226,6 +256,13 @@ public class TestSparkClient {
});
}
+ private <T extends Serializable> JobHandle.Listener<T> newListener() {
+ @SuppressWarnings("unchecked")
+ JobHandle.Listener<T> listener =
+ (JobHandle.Listener<T>) mock(JobHandle.Listener.class);
+ return listener;
+ }
+
private void runTest(boolean local, TestFunction test) throws Exception {
Map<String, String> conf = createConf(local);
SparkClientFactory.initialize(conf);
@@ -250,6 +287,15 @@ public class TestSparkClient {
}
}
+
+ private static class ErrorJob implements Job<String> {
+
+ @Override
+ public String call(JobContext jc) {
+ throw new IllegalStateException("Hello");
+ }
+
+ }
private static class SparkJob implements Job<Long> {