You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/24 05:47:18 UTC

svn commit: r1620105 - in /hive/branches/spark: common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/

Author: brock
Date: Sun Aug 24 03:47:17 2014
New Revision: 1620105

URL: http://svn.apache.org/r1620105
Log:
HIVE-7848 - Refresh SparkContext when spark configuration changes [Spark Branch] (Chinna Rao Lalam via Brock)

Modified:
    hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java

Modified: hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1620105&r1=1620104&r2=1620105&view=diff
==============================================================================
--- hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sun Aug 24 03:47:17 2014
@@ -75,8 +75,17 @@ public class HiveConf extends Configurat
 
   private boolean isWhiteListRestrictionEnabled = false;
   private final List<String> modWhiteList = new ArrayList<String>();
+  private boolean isSparkConfigUpdated = false;
 
 
+  public boolean getSparkConfigUpdated() {
+    return isSparkConfigUpdated;
+  }
+
+  public void setSparkConfigUpdated(boolean isSparkConfigUpdated) {
+    this.isSparkConfigUpdated = isSparkConfigUpdated;
+  }
+
   static {
     ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
     if (classLoader == null) {
@@ -1947,6 +1956,7 @@ public class HiveConf extends Configurat
       throw new IllegalArgumentException("Cannot modify " + name + " at runtime. It is in the list"
           + "of parameters that can't be modified at runtime");
     }
+    isSparkConfigUpdated = name.startsWith("spark");
     set(name, value);
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1620105&r1=1620104&r2=1620105&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Sun Aug 24 03:47:17 2014
@@ -225,4 +225,9 @@ public class SparkClient implements Seri
       }
     }
   }
+  
+  public void close() {
+    sc.stop();
+    client = null;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1620105&r1=1620104&r2=1620105&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Sun Aug 24 03:47:17 2014
@@ -62,9 +62,15 @@ public class SparkTask extends Task<Spar
       configureNumberOfReducers();
       sparkSessionManager = SparkSessionManagerImpl.getInstance();
       sparkSession = SessionState.get().getSparkSession();
+      
+      // Spark configurations are updated close the existing session 
+      if(conf.getSparkConfigUpdated()){
+	sparkSessionManager.closeSession(sparkSession);
+	sparkSession =  null;
+	conf.setSparkConfigUpdated(false);
+      }
       sparkSession = sparkSessionManager.getSession(sparkSession, conf, true);
       SessionState.get().setSparkSession(sparkSession);
-
       rc = sparkSession.submit(driverContext, getWork());
     } catch (Exception e) {
       LOG.error("Failed to execute spark task.", e);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java?rev=1620105&r1=1620104&r2=1620105&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java Sun Aug 24 03:47:17 2014
@@ -33,6 +33,7 @@ public class SparkSessionImpl implements
   private HiveConf conf;
   private boolean isOpen;
   private final String sessionId;
+  private SparkClient sparkClient;
 
   public SparkSessionImpl() {
     sessionId = makeSessionId();
@@ -47,8 +48,8 @@ public class SparkSessionImpl implements
   @Override
   public int submit(DriverContext driverContext, SparkWork sparkWork) {
     Preconditions.checkState(isOpen, "Session is not open. Can't submit jobs.");
-    return SparkClient.getInstance(driverContext.getCtx().getConf())
-        .execute(driverContext, sparkWork);
+    sparkClient = SparkClient.getInstance(driverContext.getCtx().getConf());
+    return sparkClient.execute(driverContext, sparkWork);
   }
 
   @Override
@@ -69,6 +70,10 @@ public class SparkSessionImpl implements
   @Override
   public void close() {
     isOpen = false;
+    if (sparkClient != null) {
+      sparkClient.close();
+    }
+    sparkClient = null;
   }
 
   public static String makeSessionId() {