You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/11/25 21:43:39 UTC

svn commit: r1641691 - in /hive/branches/spark: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/ spark-client/src/main/java/org/apache/hiv...

Author: brock
Date: Tue Nov 25 20:43:39 2014
New Revision: 1641691

URL: http://svn.apache.org/r1641691
Log:
HIVE-8855 - Automatic calculate reduce number for spark job [Spark Branch] (Jimmy Xiang via Brock)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java Tue Nov 25 20:43:39 2014
@@ -17,12 +17,13 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import java.io.Closeable;
+import java.io.Serializable;
+
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
-
-import java.io.Closeable;
-import java.io.Serializable;
+import org.apache.spark.SparkConf;
 
 public interface HiveSparkClient extends Serializable, Closeable {
   /**
@@ -34,4 +35,11 @@ public interface HiveSparkClient extends
    * @throws Exception
    */
   public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception;
+
+  public SparkConf getSparkConf();
+
+  /**
+   * Get the count of executors
+   */
+  public int getExecutorCount() throws Exception;
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java Tue Nov 25 20:43:39 2014
@@ -18,21 +18,20 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import org.apache.commons.compress.utils.CharsetNames;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkException;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.commons.compress.utils.CharsetNames;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
+
 public class HiveSparkClientFactory {
   protected static transient final Log LOG = LogFactory.getLog(HiveSparkClientFactory.class);
 
@@ -112,7 +111,7 @@ public class HiveSparkClientFactory {
     return sparkConf;
   }
 
-  private static SparkConf generateSparkConf(Map<String, String> conf) {
+  static SparkConf generateSparkConf(Map<String, String> conf) {
     SparkConf sparkConf = new SparkConf(false);
     for (Map.Entry<String, String> entry : conf.entrySet()) {
       sparkConf.set(entry.getKey(), entry.getValue());

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java Tue Nov 25 20:43:39 2014
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -39,15 +41,12 @@ import org.apache.hadoop.hive.ql.session
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import scala.Tuple2;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
 
 /**
  * LocalSparkClient submit Spark job in local driver, it's responsible for build spark client
@@ -71,30 +70,6 @@ public class LocalHiveSparkClient implem
     return client;
   }
 
-  /**
-   * Get Spark shuffle memory per task, and total number of cores. This
-   * information can be used to estimate how many reducers a task can have.
-   *
-   * @return a tuple, the first element is the shuffle memory per task in bytes,
-   *  the second element is the number of total cores usable by the client
-   */
-  public Tuple2<Long, Integer> getMemoryAndCores() {
-    SparkContext sparkContext = sc.sc();
-    SparkConf sparkConf = sparkContext.conf();
-    int cores = sparkConf.getInt("spark.executor.cores", 1);
-    double memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.2);
-    // sc.executorMemory() is in MB, need to convert to bytes
-    long memoryPerTask =
-      (long) (sparkContext.executorMemory() * memoryFraction * 1024 * 1024 / cores);
-    int executors = sparkContext.getExecutorMemoryStatus().size();
-    int totalCores = executors * cores;
-    LOG.info("Spark cluster current has executors: " + executors
-      + ", cores per executor: " + cores + ", memory per executor: "
-      + sparkContext.executorMemory() + "M, shuffle memoryFraction: " + memoryFraction);
-    return new Tuple2<Long, Integer>(Long.valueOf(memoryPerTask),
-      Integer.valueOf(totalCores));
-  }
-
   private JavaSparkContext sc;
 
   private List<String> localJars = new ArrayList<String>();
@@ -110,6 +85,16 @@ public class LocalHiveSparkClient implem
   }
 
   @Override
+  public SparkConf getSparkConf() {
+    return sc.sc().conf();
+  }
+
+  @Override
+  public int getExecutorCount() {
+    return sc.sc().getExecutorMemoryStatus().size();
+  }
+
+  @Override
   public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception {
     Context ctx = driverContext.getCtx();
     HiveConf hiveConf = (HiveConf) ctx.getConf();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Tue Nov 25 20:43:39 2014
@@ -17,8 +17,14 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -41,16 +47,13 @@ import org.apache.hive.spark.client.JobC
 import org.apache.hive.spark.client.JobHandle;
 import org.apache.hive.spark.client.SparkClient;
 import org.apache.hive.spark.client.SparkClientFactory;
+import org.apache.spark.SparkConf;
 import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaPairRDD;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.MalformedURLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
 
 /**
  * RemoteSparkClient is a wrapper of {@link org.apache.hive.spark.client.SparkClient}, which
@@ -66,17 +69,31 @@ public class RemoteHiveSparkClient imple
   private static transient final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
 
   private transient SparkClient remoteClient;
+  private transient SparkConf sparkConf;
 
   private transient List<String> localJars = new ArrayList<String>();
 
   private transient List<String> localFiles = new ArrayList<String>();
 
-  RemoteHiveSparkClient(Map<String, String> sparkConf) throws IOException, SparkException {
-    SparkClientFactory.initialize(sparkConf);
-    remoteClient = SparkClientFactory.createClient(sparkConf);
+  RemoteHiveSparkClient(Map<String, String> conf) throws IOException, SparkException {
+    SparkClientFactory.initialize(conf);
+    sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
+    remoteClient = SparkClientFactory.createClient(conf);
+  }
+
+  @Override
+  public SparkConf getSparkConf() {
+    return sparkConf;
+  }
+
+  @Override
+  public int getExecutorCount() throws Exception {
+    Future<Integer> handler = remoteClient.getExecutorCount();
+    return handler.get().intValue();
   }
 
   @Override
+  @SuppressWarnings("serial")
   public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork) throws Exception {
     final Context ctx = driverContext.getCtx();
     final HiveConf hiveConf = (HiveConf) ctx.getConf();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Tue Nov 25 20:43:39 2014
@@ -28,8 +28,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -70,11 +68,12 @@ import org.apache.hadoop.hive.ql.plan.Sp
 import org.apache.hadoop.hive.ql.plan.StatsWork;
 import org.apache.hadoop.hive.ql.plan.UnionWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.collect.Lists;
+
 public class SparkTask extends Task<SparkWork> {
   private static final long serialVersionUID = 1L;
   private transient JobConf job;
@@ -96,18 +95,9 @@ public class SparkTask extends Task<Spar
     try {
       printConfigInfo();
       sparkSessionManager = SparkSessionManagerImpl.getInstance();
-      sparkSession = SessionState.get().getSparkSession();
+      sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager);
 
-      // Spark configurations are updated close the existing session
-      if(conf.getSparkConfigUpdated()){
-        sparkSessionManager.closeSession(sparkSession);
-        sparkSession =  null;
-        conf.setSparkConfigUpdated(false);
-      }
-      sparkSession = sparkSessionManager.getSession(sparkSession, conf, true);
-      SessionState.get().setSparkSession(sparkSession);
       SparkWork sparkWork = getWork();
-
       sparkWork.setRequiredCounterPrefix(getCounterPrefixes());
 
       SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Tue Nov 25 20:43:39 2014
@@ -17,15 +17,20 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import org.apache.hadoop.hive.ql.io.HiveKey;
-import org.apache.hadoop.io.BytesWritable;
-
 import java.io.File;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
+
 /**
  * Contains utilities methods used as part of Spark tasks
  */
@@ -68,4 +73,19 @@ public class SparkUtilities {
 
     return url;
   }
+
+  public static SparkSession getSparkSession(HiveConf conf,
+      SparkSessionManager sparkSessionManager) throws HiveException {
+    SparkSession sparkSession = SessionState.get().getSparkSession();
+
+    // Spark configurations are updated close the existing session
+    if(conf.getSparkConfigUpdated()){
+      sparkSessionManager.closeSession(sparkSession);
+      sparkSession =  null;
+      conf.setSparkConfigUpdated(false);
+    }
+    sparkSession = sparkSessionManager.getSession(sparkSession, conf, true);
+    SessionState.get().setSparkSession(sparkSession);
+    return sparkSession;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java Tue Nov 25 20:43:39 2014
@@ -23,6 +23,8 @@ import org.apache.hadoop.hive.ql.exec.sp
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 
+import scala.Tuple2;
+
 public interface SparkSession {
   /**
    * Initializes a Spark session for DAG execution.
@@ -37,6 +39,15 @@ public interface SparkSession {
   public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception;
 
   /**
+   * Get Spark shuffle memory per task, and total number of cores. This
+   * information can be used to estimate how many reducers a task can have.
+   *
+   * @return a tuple, the first element is the shuffle memory per task in bytes,
+   *  the second element is the number of total cores usable by the client
+   */
+  public Tuple2<Long, Integer> getMemoryAndCores() throws Exception;
+
+  /**
    * Is the session open and ready to submit jobs?
    */
   public boolean isOpen();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java Tue Nov 25 20:43:39 2014
@@ -17,21 +17,23 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark.session;
 
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.UUID;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
 import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.spark.SparkException;
+import org.apache.spark.SparkConf;
 
-import java.io.IOException;
-import java.util.UUID;
+import scala.Tuple2;
+
+import com.google.common.base.Preconditions;
 
 public class SparkSessionImpl implements SparkSession {
   private static final Log LOG = LogFactory.getLog(SparkSession.class);
@@ -63,6 +65,23 @@ public class SparkSessionImpl implements
   }
 
   @Override
+  public Tuple2<Long, Integer> getMemoryAndCores() throws Exception {
+    SparkConf sparkConf = hiveSparkClient.getSparkConf();
+    int cores = sparkConf.getInt("spark.executor.cores", 1);
+    double memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.2);
+    int executorMemoryInMB = sparkConf.getInt("spark.executor.memory", 512);
+    long memoryPerTaskInBytes =
+      (long) (executorMemoryInMB * memoryFraction * 1024 * 1024 / cores);
+    int executors = hiveSparkClient.getExecutorCount();
+    int totalCores = executors * cores;
+    LOG.info("Spark cluster current has executors: " + executors
+      + ", cores per executor: " + cores + ", memory per executor: "
+      + executorMemoryInMB + "M, shuffle memoryFraction: " + memoryFraction);
+    return new Tuple2<Long, Integer>(Long.valueOf(memoryPerTaskInBytes),
+      Integer.valueOf(totalCores));
+  }
+
+  @Override
   public boolean isOpen() {
     return isOpen;
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java Tue Nov 25 20:43:39 2014
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
-import java.io.IOException;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
@@ -27,18 +26,19 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
-import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
-import org.apache.hadoop.hive.ql.exec.spark.LocalHiveSparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 
-import org.apache.spark.SparkException;
 import scala.Tuple2;
 
 /**
@@ -73,51 +73,55 @@ public class SetSparkReducerParallelism 
 
     context.getVisitedReduceSinks().add(sink);
 
-
     if (desc.getNumReducers() <= 0) {
       if (constantReducers > 0) {
         LOG.info("Parallelism for reduce sink " + sink + " set by user to " + constantReducers);
         desc.setNumReducers(constantReducers);
       } else {
         try {
-          // TODO try to make this still work after integration with remote spark context, so that we
-          // don't break test, we should implement automatic calculate reduce number for remote spark
-          // client and refactor code later, track it with HIVE-8855.
-          HiveSparkClient sparkClient = HiveSparkClientFactory.createHiveSparkClient(context.getConf());
-          if (sparkClient instanceof LocalHiveSparkClient) {
-            LocalHiveSparkClient localHiveSparkClient = (LocalHiveSparkClient)sparkClient;
-            long numberOfBytes = 0;
-
-            // we need to add up all the estimates from the siblings of this reduce sink
-            for (Operator<? extends OperatorDesc> sibling:
-              sink.getChildOperators().get(0).getParentOperators()) {
-              if (sibling.getStatistics() != null) {
-                numberOfBytes += sibling.getStatistics().getDataSize();
-              } else {
-                LOG.warn("No stats available from: " + sibling);
-              }
-            }
+          long numberOfBytes = 0;
 
-            if (sparkMemoryAndCores == null) {
-              sparkMemoryAndCores = localHiveSparkClient.getMemoryAndCores();
+          // we need to add up all the estimates from the siblings of this reduce sink
+          for (Operator<? extends OperatorDesc> sibling:
+            sink.getChildOperators().get(0).getParentOperators()) {
+            if (sibling.getStatistics() != null) {
+              numberOfBytes += sibling.getStatistics().getDataSize();
+            } else {
+              LOG.warn("No stats available from: " + sibling);
             }
+          }
 
-            // Divide it by 2 so that we can have more reducers
-            long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2;
-            int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer,
-              maxReducers, false);
-
-            // If there are more cores, use the number of cores
-            int cores = sparkMemoryAndCores._2.intValue();
-            if (numReducers < cores) {
-              numReducers = cores;
+          if (sparkMemoryAndCores == null) {
+            SparkSessionManager sparkSessionManager = null;
+            SparkSession sparkSession = null;
+            try {
+              sparkSessionManager = SparkSessionManagerImpl.getInstance();
+              sparkSession = SparkUtilities.getSparkSession(
+                context.getConf(), sparkSessionManager);
+              sparkMemoryAndCores = sparkSession.getMemoryAndCores();
+            } finally {
+              if (sparkSession != null && sparkSessionManager != null) {
+                try {
+                  sparkSessionManager.returnSession(sparkSession);
+                } catch(HiveException ex) {
+                  LOG.error("Failed to return the session to SessionManager", ex);
+                }
+              }
             }
-            LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers);
-            desc.setNumReducers(numReducers);
+          }
 
-          } else {
-            sparkClient.close();
+          // Divide it by 2 so that we can have more reducers
+          long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2;
+          int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer,
+            maxReducers, false);
+
+          // If there are more cores, use the number of cores
+          int cores = sparkMemoryAndCores._2.intValue();
+          if (numReducers < cores) {
+            numReducers = cores;
           }
+          LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers);
+          desc.setNumReducers(numReducers);
         } catch (Exception e) {
           LOG.warn("Failed to create spark client.", e);
         }

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java Tue Nov 25 20:43:39 2014
@@ -68,4 +68,8 @@ public interface SparkClient extends Ser
    */
   Future<?> addFile(URL url);
 
+  /**
+   * Get the count of executors
+   */
+  Future<Integer> getExecutorCount();
 }

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Tue Nov 25 20:43:39 2014
@@ -125,6 +125,11 @@ class SparkClientImpl implements SparkCl
     return submit(new AddFileJob(url.toString()));
   }
 
+  @Override
+  public Future<Integer> getExecutorCount() {
+    return submit(new GetExecutorCountJob());
+  }
+
   void cancel(String jobId) {
     remoteRef.tell(new Protocol.CancelJob(jobId), clientRef);
   }
@@ -366,6 +371,7 @@ class SparkClientImpl implements SparkCl
   }
 
   private static class AddJarJob implements Job<Serializable> {
+    private static final long serialVersionUID = 1L;
 
     private final String path;
 
@@ -382,6 +388,7 @@ class SparkClientImpl implements SparkCl
   }
 
   private static class AddFileJob implements Job<Serializable> {
+    private static final long serialVersionUID = 1L;
 
     private final String path;
 
@@ -396,5 +403,16 @@ class SparkClientImpl implements SparkCl
     }
 
   }
+  
+  private static class GetExecutorCountJob implements Job<Integer> {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Integer call(JobContext jc) throws Exception {
+        int count = jc.sc().sc().getExecutorMemoryStatus().size();
+        return Integer.valueOf(count);
+      }
+
+  }
 
 }