You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/01/15 15:18:28 UTC
svn commit: r1652105 - in /hive/branches/spark:
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/
spark-client/src/main/java/org/apache/hive/spark/client/
spark-client/src/test/java/org/apache/hive/spark/client/
Author: xuefu
Date: Thu Jan 15 14:18:27 2015
New Revision: 1652105
URL: http://svn.apache.org/r1652105
Log:
HIVE-9178: Create a separate API for remote Spark Context RPC other than job submission [Spark Branch] (Marcelo via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1652105&r1=1652104&r2=1652105&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Thu Jan 15 14:18:27 2015
@@ -42,6 +42,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
@@ -145,7 +146,7 @@ public class RemoteSparkJobStatus implem
return getDefaultJobInfo(sparkJobId, JobExecutionStatus.FAILED);
}
}
- JobHandle<SparkJobInfo> getJobInfo = sparkClient.submit(
+ Future<SparkJobInfo> getJobInfo = sparkClient.run(
new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId));
try {
return getJobInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
@@ -156,7 +157,7 @@ public class RemoteSparkJobStatus implem
}
private SparkStageInfo getSparkStageInfo(int stageId) {
- JobHandle<SparkStageInfo> getStageInfo = sparkClient.submit(new GetStageInfoJob(stageId));
+ Future<SparkStageInfo> getStageInfo = sparkClient.run(new GetStageInfoJob(stageId));
try {
return getStageInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
} catch (Throwable t) {
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java?rev=1652105&r1=1652104&r2=1652105&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java Thu Jan 15 14:18:27 2015
@@ -138,4 +138,18 @@ abstract class BaseProtocol extends RpcD
}
}
+ protected static class SyncJobRequest<T extends Serializable> implements Serializable {
+
+ final Job<T> job;
+
+ SyncJobRequest(Job<T> job) {
+ this.job = job;
+ }
+
+ SyncJobRequest() {
+ this(null);
+ }
+
+ }
+
}
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java?rev=1652105&r1=1652104&r2=1652105&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java Thu Jan 15 14:18:27 2015
@@ -77,9 +77,9 @@ public class RemoteDriver {
private static final Logger LOG = LoggerFactory.getLogger(RemoteDriver.class);
private final Map<String, JobWrapper<?>> activeJobs;
+ private final Object jcLock;
private final Object shutdownLock;
private final ExecutorService executor;
- private final JobContextImpl jc;
private final NioEventLoopGroup egroup;
private final Rpc clientRpc;
private final DriverProtocol protocol;
@@ -87,10 +87,14 @@ public class RemoteDriver {
// Used to queue up requests while the SparkContext is being created.
private final List<JobWrapper<?>> jobQueue = Lists.newLinkedList();
- private boolean running;
+ // jc is effectively final, but it has to be volatile since it's accessed by different
+ // threads while the constructor is running.
+ private volatile JobContextImpl jc;
+ private volatile boolean running;
private RemoteDriver(String[] args) throws Exception {
this.activeJobs = Maps.newConcurrentMap();
+ this.jcLock = new Object();
this.shutdownLock = new Object();
SparkConf conf = new SparkConf();
@@ -150,14 +154,20 @@ public class RemoteDriver {
try {
JavaSparkContext sc = new JavaSparkContext(conf);
sc.sc().addSparkListener(new ClientListener());
- jc = new JobContextImpl(sc);
+ synchronized (jcLock) {
+ jc = new JobContextImpl(sc);
+ jcLock.notifyAll();
+ }
} catch (Exception e) {
LOG.error("Failed to start SparkContext.", e);
shutdown(e);
+ synchronized (jcLock) {
+ jcLock.notifyAll();
+ }
throw e;
}
- synchronized (jobQueue) {
+ synchronized (jcLock) {
for (Iterator<JobWrapper<?>> it = jobQueue.iterator(); it.hasNext();) {
it.next().submit();
}
@@ -174,7 +184,7 @@ public class RemoteDriver {
}
private void submit(JobWrapper<?> job) {
- synchronized (jobQueue) {
+ synchronized (jcLock) {
if (jc != null) {
job.submit();
} else {
@@ -264,6 +274,35 @@ public class RemoteDriver {
submit(wrapper);
}
+ private Object handle(ChannelHandlerContext ctx, SyncJobRequest msg) throws Exception {
+ // In case the job context is not up yet, let's wait, since this is supposed to be a
+ // "synchronous" RPC.
+ if (jc == null) {
+ synchronized (jcLock) {
+ while (jc == null) {
+ jcLock.wait();
+ if (!running) {
+ throw new IllegalStateException("Remote context is shutting down.");
+ }
+ }
+ }
+ }
+
+ jc.setMonitorCb(new MonitorCallback() {
+ @Override
+ public void call(JavaFutureAction<?> future,
+ SparkCounters sparkCounters, Set<Integer> cachedRDDIds) {
+ throw new IllegalStateException(
+ "JobContext.monitor() is not available for synchronous jobs.");
+ }
+ });
+ try {
+ return msg.job.call(jc);
+ } finally {
+ jc.setMonitorCb(null);
+ }
+ }
+
}
private class JobWrapper<T extends Serializable> implements Callable<Void> {
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java?rev=1652105&r1=1652104&r2=1652105&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java Thu Jan 15 14:18:27 2015
@@ -38,6 +38,23 @@ public interface SparkClient extends Ser
<T extends Serializable> JobHandle<T> submit(Job<T> job);
/**
+ * Asks the remote context to run a job immediately.
+ * <p/>
+ * Normally, the remote context will queue jobs and execute them based on how many worker
+ * threads have been configured. This method will run the submitted job in the same thread
+ * processing the RPC message, so that queueing does not apply.
+ * <p/>
+ * It's recommended that this method only be used to run code that finishes quickly. This
+ * avoids interfering with the normal operation of the context.
+ * <p/>
+ * Note: the {@link JobContext#monitor()} functionality is not available when using this method.
+ *
+ * @param job The job to execute.
+ * @return A future to monitor the result of the job.
+ */
+ <T extends Serializable> Future<T> run(Job<T> job);
+
+ /**
* Stops the remote context.
*
* Any pending jobs will be cancelled, and the remote context will be torn down.
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1652105&r1=1652104&r2=1652105&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Thu Jan 15 14:18:27 2015
@@ -116,6 +116,11 @@ class SparkClientImpl implements SparkCl
}
@Override
+ public <T extends Serializable> Future<T> run(Job<T> job) {
+ return protocol.run(job);
+ }
+
+ @Override
public void stop() {
if (isAlive) {
isAlive = false;
@@ -144,22 +149,22 @@ class SparkClientImpl implements SparkCl
@Override
public Future<?> addJar(URL url) {
- return submit(new AddJarJob(url.toString()));
+ return run(new AddJarJob(url.toString()));
}
@Override
public Future<?> addFile(URL url) {
- return submit(new AddFileJob(url.toString()));
+ return run(new AddFileJob(url.toString()));
}
@Override
public Future<Integer> getExecutorCount() {
- return submit(new GetExecutorCountJob());
+ return run(new GetExecutorCountJob());
}
@Override
public Future<Integer> getDefaultParallelism() {
- return submit(new GetDefaultParallelismJob());
+ return run(new GetDefaultParallelismJob());
}
void cancel(String jobId) {
@@ -379,16 +384,24 @@ class SparkClientImpl implements SparkCl
promise.addListener(new GenericFutureListener<Promise<T>>() {
@Override
public void operationComplete(Promise<T> p) {
- jobs.remove(jobId);
+ if (jobId != null) {
+ jobs.remove(jobId);
+ }
if (p.isCancelled() && !rpc.isDone()) {
rpc.cancel(true);
}
}
});
-
return handle;
}
+ <T extends Serializable> Future<T> run(Job<T> job) {
+ @SuppressWarnings("unchecked")
+ final io.netty.util.concurrent.Future<T> rpc = (io.netty.util.concurrent.Future<T>)
+ driverRpc.call(new SyncJobRequest(job), Serializable.class);
+ return rpc;
+ }
+
void cancel(String jobId) {
driverRpc.call(new CancelJob(jobId));
}
Modified: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java?rev=1652105&r1=1652104&r2=1652105&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java (original)
+++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java Thu Jan 15 14:18:27 2015
@@ -31,6 +31,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
@@ -111,6 +112,17 @@ public class TestSparkClient {
}
@Test
+ public void testSyncRpc() throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ public void call(SparkClient client) throws Exception {
+ Future<String> result = client.run(new SyncRpc());
+ assertEquals("Hello", result.get(TIMEOUT, TimeUnit.SECONDS));
+ }
+ });
+ }
+
+ @Test
public void testRemoteClient() throws Exception {
runTest(false, new TestFunction() {
@Override
@@ -332,6 +344,15 @@ public class TestSparkClient {
}
}
+
+ private static class SyncRpc implements Job<String> {
+
+ @Override
+ public String call(JobContext jc) {
+ return "Hello";
+ }
+
+ }
private abstract static class TestFunction {
abstract void call(SparkClient client) throws Exception;