You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/11/25 21:43:39 UTC
svn commit: r1641691 - in /hive/branches/spark:
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/
spark-client/src/main/java/org/apache/hiv...
Author: brock
Date: Tue Nov 25 20:43:39 2014
New Revision: 1641691
URL: http://svn.apache.org/r1641691
Log:
HIVE-8855 - Automatic calculate reduce number for spark job [Spark Branch] (Jimmy Xiang via Brock)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java Tue Nov 25 20:43:39 2014
@@ -17,12 +17,13 @@
*/
package org.apache.hadoop.hive.ql.exec.spark;
+import java.io.Closeable;
+import java.io.Serializable;
+
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.plan.SparkWork;
-
-import java.io.Closeable;
-import java.io.Serializable;
+import org.apache.spark.SparkConf;
public interface HiveSparkClient extends Serializable, Closeable {
/**
@@ -34,4 +35,11 @@ public interface HiveSparkClient extends
* @throws Exception
*/
public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception;
+
+ public SparkConf getSparkConf();
+
+ /**
+ * Get the count of executors
+ */
+ public int getExecutorCount() throws Exception;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java Tue Nov 25 20:43:39 2014
@@ -18,21 +18,20 @@
package org.apache.hadoop.hive.ql.exec.spark;
-import org.apache.commons.compress.utils.CharsetNames;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkException;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
+import org.apache.commons.compress.utils.CharsetNames;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
+
public class HiveSparkClientFactory {
protected static transient final Log LOG = LogFactory.getLog(HiveSparkClientFactory.class);
@@ -112,7 +111,7 @@ public class HiveSparkClientFactory {
return sparkConf;
}
- private static SparkConf generateSparkConf(Map<String, String> conf) {
+ static SparkConf generateSparkConf(Map<String, String> conf) {
SparkConf sparkConf = new SparkConf(false);
for (Map.Entry<String, String> entry : conf.entrySet()) {
sparkConf.set(entry.getKey(), entry.getValue());
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java Tue Nov 25 20:43:39 2014
@@ -18,8 +18,10 @@
package org.apache.hadoop.hive.ql.exec.spark;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -39,15 +41,12 @@ import org.apache.hadoop.hive.ql.session
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import scala.Tuple2;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
/**
* LocalSparkClient submit Spark job in local driver, it's responsible for build spark client
@@ -71,30 +70,6 @@ public class LocalHiveSparkClient implem
return client;
}
- /**
- * Get Spark shuffle memory per task, and total number of cores. This
- * information can be used to estimate how many reducers a task can have.
- *
- * @return a tuple, the first element is the shuffle memory per task in bytes,
- * the second element is the number of total cores usable by the client
- */
- public Tuple2<Long, Integer> getMemoryAndCores() {
- SparkContext sparkContext = sc.sc();
- SparkConf sparkConf = sparkContext.conf();
- int cores = sparkConf.getInt("spark.executor.cores", 1);
- double memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.2);
- // sc.executorMemory() is in MB, need to convert to bytes
- long memoryPerTask =
- (long) (sparkContext.executorMemory() * memoryFraction * 1024 * 1024 / cores);
- int executors = sparkContext.getExecutorMemoryStatus().size();
- int totalCores = executors * cores;
- LOG.info("Spark cluster current has executors: " + executors
- + ", cores per executor: " + cores + ", memory per executor: "
- + sparkContext.executorMemory() + "M, shuffle memoryFraction: " + memoryFraction);
- return new Tuple2<Long, Integer>(Long.valueOf(memoryPerTask),
- Integer.valueOf(totalCores));
- }
-
private JavaSparkContext sc;
private List<String> localJars = new ArrayList<String>();
@@ -110,6 +85,16 @@ public class LocalHiveSparkClient implem
}
@Override
+ public SparkConf getSparkConf() {
+ return sc.sc().conf();
+ }
+
+ @Override
+ public int getExecutorCount() {
+ return sc.sc().getExecutorMemoryStatus().size();
+ }
+
+ @Override
public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception {
Context ctx = driverContext.getCtx();
HiveConf hiveConf = (HiveConf) ctx.getConf();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Tue Nov 25 20:43:39 2014
@@ -17,8 +17,14 @@
*/
package org.apache.hadoop.hive.ql.exec.spark;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -41,16 +47,13 @@ import org.apache.hive.spark.client.JobC
import org.apache.hive.spark.client.JobHandle;
import org.apache.hive.spark.client.SparkClient;
import org.apache.hive.spark.client.SparkClientFactory;
+import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaPairRDD;
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.MalformedURLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
/**
* RemoteSparkClient is a wrapper of {@link org.apache.hive.spark.client.SparkClient}, which
@@ -66,17 +69,31 @@ public class RemoteHiveSparkClient imple
private static transient final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
private transient SparkClient remoteClient;
+ private transient SparkConf sparkConf;
private transient List<String> localJars = new ArrayList<String>();
private transient List<String> localFiles = new ArrayList<String>();
- RemoteHiveSparkClient(Map<String, String> sparkConf) throws IOException, SparkException {
- SparkClientFactory.initialize(sparkConf);
- remoteClient = SparkClientFactory.createClient(sparkConf);
+ RemoteHiveSparkClient(Map<String, String> conf) throws IOException, SparkException {
+ SparkClientFactory.initialize(conf);
+ sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
+ remoteClient = SparkClientFactory.createClient(conf);
+ }
+
+ @Override
+ public SparkConf getSparkConf() {
+ return sparkConf;
+ }
+
+ @Override
+ public int getExecutorCount() throws Exception {
+ Future<Integer> handler = remoteClient.getExecutorCount();
+ return handler.get().intValue();
}
@Override
+ @SuppressWarnings("serial")
public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork) throws Exception {
final Context ctx = driverContext.getCtx();
final HiveConf hiveConf = (HiveConf) ctx.getConf();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Tue Nov 25 20:43:39 2014
@@ -28,8 +28,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -70,11 +68,12 @@ import org.apache.hadoop.hive.ql.plan.Sp
import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.UnionWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.collect.Lists;
+
public class SparkTask extends Task<SparkWork> {
private static final long serialVersionUID = 1L;
private transient JobConf job;
@@ -96,18 +95,9 @@ public class SparkTask extends Task<Spar
try {
printConfigInfo();
sparkSessionManager = SparkSessionManagerImpl.getInstance();
- sparkSession = SessionState.get().getSparkSession();
+ sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager);
- // Spark configurations are updated close the existing session
- if(conf.getSparkConfigUpdated()){
- sparkSessionManager.closeSession(sparkSession);
- sparkSession = null;
- conf.setSparkConfigUpdated(false);
- }
- sparkSession = sparkSessionManager.getSession(sparkSession, conf, true);
- SessionState.get().setSparkSession(sparkSession);
SparkWork sparkWork = getWork();
-
sparkWork.setRequiredCounterPrefix(getCounterPrefixes());
SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Tue Nov 25 20:43:39 2014
@@ -17,15 +17,20 @@
*/
package org.apache.hadoop.hive.ql.exec.spark;
-import org.apache.hadoop.hive.ql.io.HiveKey;
-import org.apache.hadoop.io.BytesWritable;
-
import java.io.File;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
+
/**
* Contains utilities methods used as part of Spark tasks
*/
@@ -68,4 +73,19 @@ public class SparkUtilities {
return url;
}
+
+ public static SparkSession getSparkSession(HiveConf conf,
+ SparkSessionManager sparkSessionManager) throws HiveException {
+ SparkSession sparkSession = SessionState.get().getSparkSession();
+
+ // Spark configurations are updated close the existing session
+ if(conf.getSparkConfigUpdated()){
+ sparkSessionManager.closeSession(sparkSession);
+ sparkSession = null;
+ conf.setSparkConfigUpdated(false);
+ }
+ sparkSession = sparkSessionManager.getSession(sparkSession, conf, true);
+ SessionState.get().setSparkSession(sparkSession);
+ return sparkSession;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java Tue Nov 25 20:43:39 2014
@@ -23,6 +23,8 @@ import org.apache.hadoop.hive.ql.exec.sp
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.SparkWork;
+import scala.Tuple2;
+
public interface SparkSession {
/**
* Initializes a Spark session for DAG execution.
@@ -37,6 +39,15 @@ public interface SparkSession {
public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception;
/**
+ * Get Spark shuffle memory per task, and total number of cores. This
+ * information can be used to estimate how many reducers a task can have.
+ *
+ * @return a tuple, the first element is the shuffle memory per task in bytes,
+ * the second element is the number of total cores usable by the client
+ */
+ public Tuple2<Long, Integer> getMemoryAndCores() throws Exception;
+
+ /**
* Is the session open and ready to submit jobs?
*/
public boolean isOpen();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java Tue Nov 25 20:43:39 2014
@@ -17,21 +17,23 @@
*/
package org.apache.hadoop.hive.ql.exec.spark.session;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.UUID;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.spark.SparkException;
+import org.apache.spark.SparkConf;
-import java.io.IOException;
-import java.util.UUID;
+import scala.Tuple2;
+
+import com.google.common.base.Preconditions;
public class SparkSessionImpl implements SparkSession {
private static final Log LOG = LogFactory.getLog(SparkSession.class);
@@ -63,6 +65,23 @@ public class SparkSessionImpl implements
}
@Override
+ public Tuple2<Long, Integer> getMemoryAndCores() throws Exception {
+ SparkConf sparkConf = hiveSparkClient.getSparkConf();
+ int cores = sparkConf.getInt("spark.executor.cores", 1);
+ double memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.2);
+ int executorMemoryInMB = sparkConf.getInt("spark.executor.memory", 512);
+ long memoryPerTaskInBytes =
+ (long) (executorMemoryInMB * memoryFraction * 1024 * 1024 / cores);
+ int executors = hiveSparkClient.getExecutorCount();
+ int totalCores = executors * cores;
+ LOG.info("Spark cluster current has executors: " + executors
+ + ", cores per executor: " + cores + ", memory per executor: "
+ + executorMemoryInMB + "M, shuffle memoryFraction: " + memoryFraction);
+ return new Tuple2<Long, Integer>(Long.valueOf(memoryPerTaskInBytes),
+ Integer.valueOf(totalCores));
+ }
+
+ @Override
public boolean isOpen() {
return isOpen;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java Tue Nov 25 20:43:39 2014
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.optimizer.spark;
-import java.io.IOException;
import java.util.Stack;
import org.apache.commons.logging.Log;
@@ -27,18 +26,19 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
-import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
-import org.apache.hadoop.hive.ql.exec.spark.LocalHiveSparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
-import org.apache.spark.SparkException;
import scala.Tuple2;
/**
@@ -73,51 +73,55 @@ public class SetSparkReducerParallelism
context.getVisitedReduceSinks().add(sink);
-
if (desc.getNumReducers() <= 0) {
if (constantReducers > 0) {
LOG.info("Parallelism for reduce sink " + sink + " set by user to " + constantReducers);
desc.setNumReducers(constantReducers);
} else {
try {
- // TODO try to make this still work after integration with remote spark context, so that we
- // don't break test, we should implement automatic calculate reduce number for remote spark
- // client and refactor code later, track it with HIVE-8855.
- HiveSparkClient sparkClient = HiveSparkClientFactory.createHiveSparkClient(context.getConf());
- if (sparkClient instanceof LocalHiveSparkClient) {
- LocalHiveSparkClient localHiveSparkClient = (LocalHiveSparkClient)sparkClient;
- long numberOfBytes = 0;
-
- // we need to add up all the estimates from the siblings of this reduce sink
- for (Operator<? extends OperatorDesc> sibling:
- sink.getChildOperators().get(0).getParentOperators()) {
- if (sibling.getStatistics() != null) {
- numberOfBytes += sibling.getStatistics().getDataSize();
- } else {
- LOG.warn("No stats available from: " + sibling);
- }
- }
+ long numberOfBytes = 0;
- if (sparkMemoryAndCores == null) {
- sparkMemoryAndCores = localHiveSparkClient.getMemoryAndCores();
+ // we need to add up all the estimates from the siblings of this reduce sink
+ for (Operator<? extends OperatorDesc> sibling:
+ sink.getChildOperators().get(0).getParentOperators()) {
+ if (sibling.getStatistics() != null) {
+ numberOfBytes += sibling.getStatistics().getDataSize();
+ } else {
+ LOG.warn("No stats available from: " + sibling);
}
+ }
- // Divide it by 2 so that we can have more reducers
- long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2;
- int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer,
- maxReducers, false);
-
- // If there are more cores, use the number of cores
- int cores = sparkMemoryAndCores._2.intValue();
- if (numReducers < cores) {
- numReducers = cores;
+ if (sparkMemoryAndCores == null) {
+ SparkSessionManager sparkSessionManager = null;
+ SparkSession sparkSession = null;
+ try {
+ sparkSessionManager = SparkSessionManagerImpl.getInstance();
+ sparkSession = SparkUtilities.getSparkSession(
+ context.getConf(), sparkSessionManager);
+ sparkMemoryAndCores = sparkSession.getMemoryAndCores();
+ } finally {
+ if (sparkSession != null && sparkSessionManager != null) {
+ try {
+ sparkSessionManager.returnSession(sparkSession);
+ } catch(HiveException ex) {
+ LOG.error("Failed to return the session to SessionManager", ex);
+ }
+ }
}
- LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers);
- desc.setNumReducers(numReducers);
+ }
- } else {
- sparkClient.close();
+ // Divide it by 2 so that we can have more reducers
+ long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2;
+ int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer,
+ maxReducers, false);
+
+ // If there are more cores, use the number of cores
+ int cores = sparkMemoryAndCores._2.intValue();
+ if (numReducers < cores) {
+ numReducers = cores;
}
+ LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers);
+ desc.setNumReducers(numReducers);
} catch (Exception e) {
LOG.warn("Failed to create spark client.", e);
}
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java Tue Nov 25 20:43:39 2014
@@ -68,4 +68,8 @@ public interface SparkClient extends Ser
*/
Future<?> addFile(URL url);
+ /**
+ * Get the count of executors
+ */
+ Future<Integer> getExecutorCount();
}
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1641691&r1=1641690&r2=1641691&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Tue Nov 25 20:43:39 2014
@@ -125,6 +125,11 @@ class SparkClientImpl implements SparkCl
return submit(new AddFileJob(url.toString()));
}
+ @Override
+ public Future<Integer> getExecutorCount() {
+ return submit(new GetExecutorCountJob());
+ }
+
void cancel(String jobId) {
remoteRef.tell(new Protocol.CancelJob(jobId), clientRef);
}
@@ -366,6 +371,7 @@ class SparkClientImpl implements SparkCl
}
private static class AddJarJob implements Job<Serializable> {
+ private static final long serialVersionUID = 1L;
private final String path;
@@ -382,6 +388,7 @@ class SparkClientImpl implements SparkCl
}
private static class AddFileJob implements Job<Serializable> {
+ private static final long serialVersionUID = 1L;
private final String path;
@@ -396,5 +403,16 @@ class SparkClientImpl implements SparkCl
}
}
+
+ private static class GetExecutorCountJob implements Job<Integer> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer call(JobContext jc) throws Exception {
+ int count = jc.sc().sc().getExecutorMemoryStatus().size();
+ return Integer.valueOf(count);
+ }
+
+ }
}