You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/01/15 15:18:28 UTC

svn commit: r1652105 - in /hive/branches/spark: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/ spark-client/src/main/java/org/apache/hive/spark/client/ spark-client/src/test/java/org/apache/hive/spark/client/

Author: xuefu
Date: Thu Jan 15 14:18:27 2015
New Revision: 1652105

URL: http://svn.apache.org/r1652105
Log:
HIVE-9178: Create a separate API for remote Spark Context RPC other than job submission [Spark Branch] (Marcelo via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
    hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1652105&r1=1652104&r2=1652105&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Thu Jan 15 14:18:27 2015
@@ -42,6 +42,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -145,7 +146,7 @@ public class RemoteSparkJobStatus implem
         return getDefaultJobInfo(sparkJobId, JobExecutionStatus.FAILED);
       }
     }
-    JobHandle<SparkJobInfo> getJobInfo = sparkClient.submit(
+    Future<SparkJobInfo> getJobInfo = sparkClient.run(
         new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId));
     try {
       return getJobInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
@@ -156,7 +157,7 @@ public class RemoteSparkJobStatus implem
   }
 
   private SparkStageInfo getSparkStageInfo(int stageId) {
-    JobHandle<SparkStageInfo> getStageInfo = sparkClient.submit(new GetStageInfoJob(stageId));
+    Future<SparkStageInfo> getStageInfo = sparkClient.run(new GetStageInfoJob(stageId));
     try {
       return getStageInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
     } catch (Throwable t) {

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java?rev=1652105&r1=1652104&r2=1652105&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java Thu Jan 15 14:18:27 2015
@@ -138,4 +138,18 @@ abstract class BaseProtocol extends RpcD
     }
   }
 
+  protected static class SyncJobRequest<T extends Serializable> implements Serializable {
+
+    final Job<T> job;
+
+    SyncJobRequest(Job<T> job) {
+      this.job = job;
+    }
+
+    SyncJobRequest() {
+      this(null);
+    }
+
+  }
+
 }

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java?rev=1652105&r1=1652104&r2=1652105&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java Thu Jan 15 14:18:27 2015
@@ -77,9 +77,9 @@ public class RemoteDriver {
   private static final Logger LOG = LoggerFactory.getLogger(RemoteDriver.class);
 
   private final Map<String, JobWrapper<?>> activeJobs;
+  private final Object jcLock;
   private final Object shutdownLock;
   private final ExecutorService executor;
-  private final JobContextImpl jc;
   private final NioEventLoopGroup egroup;
   private final Rpc clientRpc;
   private final DriverProtocol protocol;
@@ -87,10 +87,14 @@ public class RemoteDriver {
   // Used to queue up requests while the SparkContext is being created.
   private final List<JobWrapper<?>> jobQueue = Lists.newLinkedList();
 
-  private boolean running;
+  // jc is effectively final, but it has to be volatile since it's accessed by different
+  // threads while the constructor is running.
+  private volatile JobContextImpl jc;
+  private volatile boolean running;
 
   private RemoteDriver(String[] args) throws Exception {
     this.activeJobs = Maps.newConcurrentMap();
+    this.jcLock = new Object();
     this.shutdownLock = new Object();
 
     SparkConf conf = new SparkConf();
@@ -150,14 +154,20 @@ public class RemoteDriver {
     try {
       JavaSparkContext sc = new JavaSparkContext(conf);
       sc.sc().addSparkListener(new ClientListener());
-      jc = new JobContextImpl(sc);
+      synchronized (jcLock) {
+        jc = new JobContextImpl(sc);
+        jcLock.notifyAll();
+      }
     } catch (Exception e) {
       LOG.error("Failed to start SparkContext.", e);
       shutdown(e);
+      synchronized (jcLock) {
+        jcLock.notifyAll();
+      }
       throw e;
     }
 
-    synchronized (jobQueue) {
+    synchronized (jcLock) {
       for (Iterator<JobWrapper<?>> it = jobQueue.iterator(); it.hasNext();) {
         it.next().submit();
       }
@@ -174,7 +184,7 @@ public class RemoteDriver {
   }
 
   private void submit(JobWrapper<?> job) {
-    synchronized (jobQueue) {
+    synchronized (jcLock) {
       if (jc != null) {
         job.submit();
       } else {
@@ -264,6 +274,35 @@ public class RemoteDriver {
       submit(wrapper);
     }
 
+    private Object handle(ChannelHandlerContext ctx, SyncJobRequest msg) throws Exception {
+      // In case the job context is not up yet, let's wait, since this is supposed to be a
+      // "synchronous" RPC.
+      if (jc == null) {
+        synchronized (jcLock) {
+          while (jc == null) {
+            jcLock.wait();
+            if (!running) {
+              throw new IllegalStateException("Remote context is shutting down.");
+            }
+          }
+        }
+      }
+
+      jc.setMonitorCb(new MonitorCallback() {
+        @Override
+        public void call(JavaFutureAction<?> future,
+            SparkCounters sparkCounters, Set<Integer> cachedRDDIds) {
+          throw new IllegalStateException(
+            "JobContext.monitor() is not available for synchronous jobs.");
+        }
+      });
+      try {
+        return msg.job.call(jc);
+      } finally {
+        jc.setMonitorCb(null);
+      }
+    }
+
   }
 
   private class JobWrapper<T extends Serializable> implements Callable<Void> {

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java?rev=1652105&r1=1652104&r2=1652105&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java Thu Jan 15 14:18:27 2015
@@ -38,6 +38,23 @@ public interface SparkClient extends Ser
   <T extends Serializable> JobHandle<T> submit(Job<T> job);
 
   /**
+   * Asks the remote context to run a job immediately.
+   * <p/>
+   * Normally, the remote context will queue jobs and execute them based on how many worker
+   * threads have been configured. This method will run the submitted job in the same thread
+   * processing the RPC message, so that queueing does not apply.
+   * <p/>
+   * It's recommended that this method only be used to run code that finishes quickly. This
+   * avoids interfering with the normal operation of the context.
+   * <p/>
+   * Note: the {@link JobContext#monitor()} functionality is not available when using this method.
+   *
+   * @param job The job to execute.
+   * @return A future to monitor the result of the job.
+   */
+  <T extends Serializable> Future<T> run(Job<T> job);
+
+  /**
    * Stops the remote context.
    *
    * Any pending jobs will be cancelled, and the remote context will be torn down.

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1652105&r1=1652104&r2=1652105&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Thu Jan 15 14:18:27 2015
@@ -116,6 +116,11 @@ class SparkClientImpl implements SparkCl
   }
 
   @Override
+  public <T extends Serializable> Future<T> run(Job<T> job) {
+    return protocol.run(job);
+  }
+
+  @Override
   public void stop() {
     if (isAlive) {
       isAlive = false;
@@ -144,22 +149,22 @@ class SparkClientImpl implements SparkCl
 
   @Override
   public Future<?> addJar(URL url) {
-    return submit(new AddJarJob(url.toString()));
+    return run(new AddJarJob(url.toString()));
   }
 
   @Override
   public Future<?> addFile(URL url) {
-    return submit(new AddFileJob(url.toString()));
+    return run(new AddFileJob(url.toString()));
   }
 
   @Override
   public Future<Integer> getExecutorCount() {
-    return submit(new GetExecutorCountJob());
+    return run(new GetExecutorCountJob());
   }
 
   @Override
   public Future<Integer> getDefaultParallelism() {
-    return submit(new GetDefaultParallelismJob());
+    return run(new GetDefaultParallelismJob());
   }
 
   void cancel(String jobId) {
@@ -379,16 +384,24 @@ class SparkClientImpl implements SparkCl
       promise.addListener(new GenericFutureListener<Promise<T>>() {
         @Override
         public void operationComplete(Promise<T> p) {
-          jobs.remove(jobId);
+          if (jobId != null) {
+            jobs.remove(jobId);
+          }
           if (p.isCancelled() && !rpc.isDone()) {
             rpc.cancel(true);
           }
         }
       });
-
       return handle;
     }
 
+    <T extends Serializable> Future<T> run(Job<T> job) {
+      @SuppressWarnings("unchecked")
+      final io.netty.util.concurrent.Future<T> rpc = (io.netty.util.concurrent.Future<T>)
+        driverRpc.call(new SyncJobRequest(job), Serializable.class);
+      return rpc;
+    }
+
     void cancel(String jobId) {
       driverRpc.call(new CancelJob(jobId));
     }

Modified: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java?rev=1652105&r1=1652104&r2=1652105&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java (original)
+++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java Thu Jan 15 14:18:27 2015
@@ -31,6 +31,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
@@ -111,6 +112,17 @@ public class TestSparkClient {
   }
 
   @Test
+  public void testSyncRpc() throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      public void call(SparkClient client) throws Exception {
+        Future<String> result = client.run(new SyncRpc());
+        assertEquals("Hello", result.get(TIMEOUT, TimeUnit.SECONDS));
+      }
+    });
+  }
+
+  @Test
   public void testRemoteClient() throws Exception {
     runTest(false, new TestFunction() {
       @Override
@@ -332,6 +344,15 @@ public class TestSparkClient {
     }
 
   }
+
+  private static class SyncRpc implements Job<String> {
+
+    @Override
+    public String call(JobContext jc) {
+      return "Hello";
+    }
+
+  }
 
   private abstract static class TestFunction {
     abstract void call(SparkClient client) throws Exception;