You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/24 05:47:18 UTC
svn commit: r1620105 - in /hive/branches/spark:
common/src/java/org/apache/hadoop/hive/conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/
Author: brock
Date: Sun Aug 24 03:47:17 2014
New Revision: 1620105
URL: http://svn.apache.org/r1620105
Log:
HIVE-7848 - Refresh SparkContext when spark configuration changes [Spark Branch] (Chinna Rao Lalam via Brock)
Modified:
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
Modified: hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1620105&r1=1620104&r2=1620105&view=diff
==============================================================================
--- hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sun Aug 24 03:47:17 2014
@@ -75,8 +75,17 @@ public class HiveConf extends Configurat
private boolean isWhiteListRestrictionEnabled = false;
private final List<String> modWhiteList = new ArrayList<String>();
+ private boolean isSparkConfigUpdated = false;
+ public boolean getSparkConfigUpdated() {
+ return isSparkConfigUpdated;
+ }
+
+ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) {
+ this.isSparkConfigUpdated = isSparkConfigUpdated;
+ }
+
static {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
if (classLoader == null) {
@@ -1947,6 +1956,7 @@ public class HiveConf extends Configurat
throw new IllegalArgumentException("Cannot modify " + name + " at runtime. It is in the list"
+ "of parameters that can't be modified at runtime");
}
+ isSparkConfigUpdated = name.startsWith("spark");
set(name, value);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1620105&r1=1620104&r2=1620105&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Sun Aug 24 03:47:17 2014
@@ -225,4 +225,9 @@ public class SparkClient implements Seri
}
}
}
+
+ public void close() {
+ sc.stop();
+ client = null;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1620105&r1=1620104&r2=1620105&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Sun Aug 24 03:47:17 2014
@@ -62,9 +62,15 @@ public class SparkTask extends Task<Spar
configureNumberOfReducers();
sparkSessionManager = SparkSessionManagerImpl.getInstance();
sparkSession = SessionState.get().getSparkSession();
+
+ // Spark configurations are updated close the existing session
+ if(conf.getSparkConfigUpdated()){
+ sparkSessionManager.closeSession(sparkSession);
+ sparkSession = null;
+ conf.setSparkConfigUpdated(false);
+ }
sparkSession = sparkSessionManager.getSession(sparkSession, conf, true);
SessionState.get().setSparkSession(sparkSession);
-
rc = sparkSession.submit(driverContext, getWork());
} catch (Exception e) {
LOG.error("Failed to execute spark task.", e);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java?rev=1620105&r1=1620104&r2=1620105&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java Sun Aug 24 03:47:17 2014
@@ -33,6 +33,7 @@ public class SparkSessionImpl implements
private HiveConf conf;
private boolean isOpen;
private final String sessionId;
+ private SparkClient sparkClient;
public SparkSessionImpl() {
sessionId = makeSessionId();
@@ -47,8 +48,8 @@ public class SparkSessionImpl implements
@Override
public int submit(DriverContext driverContext, SparkWork sparkWork) {
Preconditions.checkState(isOpen, "Session is not open. Can't submit jobs.");
- return SparkClient.getInstance(driverContext.getCtx().getConf())
- .execute(driverContext, sparkWork);
+ sparkClient = SparkClient.getInstance(driverContext.getCtx().getConf());
+ return sparkClient.execute(driverContext, sparkWork);
}
@Override
@@ -69,6 +70,10 @@ public class SparkSessionImpl implements
@Override
public void close() {
isOpen = false;
+ if (sparkClient != null) {
+ sparkClient.close();
+ }
+ sparkClient = null;
}
public static String makeSessionId() {