You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/11/18 07:46:43 UTC

svn commit: r1640277 - in /hive/branches/spark: ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/ ql/src/java/org/apache/hadoop/hive/...

Author: szehon
Date: Tue Nov 18 06:46:42 2014
New Revision: 1640277

URL: http://svn.apache.org/r1640277
Log:
HIVE-8833 : Unify spark client API and implement remote spark client.[Spark Branch] (Chengxiang Li via Szehon)

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
Removed:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
Modified:
    hive/branches/spark/ql/pom.xml
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java

Modified: hive/branches/spark/ql/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/pom.xml?rev=1640277&r1=1640276&r2=1640277&view=diff
==============================================================================
--- hive/branches/spark/ql/pom.xml (original)
+++ hive/branches/spark/ql/pom.xml Tue Nov 18 06:46:42 2014
@@ -61,6 +61,11 @@
       <artifactId>hive-shims</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>spark-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <!-- inter-project -->
     <dependency>
       <groupId>com.esotericsoftware.kryo</groupId>

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java?rev=1640277&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java Tue Nov 18 06:46:42 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+
+import java.io.Closeable;
+import java.io.Serializable;
+
+public interface HiveSparkClient extends Serializable, Closeable {
+  /**
+   * HiveSparkClient should generate Spark RDD graph by given sparkWork and driverContext,
+   * and submit RDD graph to Spark cluster.
+   * @param driverContext
+   * @param sparkWork
+   * @return SparkJobRef could be used to track spark job progress and metrics.
+   * @throws Exception
+   */
+  public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception;
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1640277&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java Tue Nov 18 06:46:42 2014
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.commons.compress.utils.CharsetNames;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+public class HiveSparkClientFactory {
+  protected static transient final Log LOG = LogFactory.getLog(HiveSparkClientFactory.class);
+
+  private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
+  private static final String SPARK_DEFAULT_MASTER = "local";
+  private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark";
+
+  public static HiveSparkClient createHiveSparkClient(Configuration configuration)
+    throws IOException, SparkException {
+
+    Map<String, String> conf = initiateSparkConf(configuration);
+    // Submit spark job through local spark context while spark master is local mode, otherwise submit
+    // spark job through remote spark context.
+    String master = conf.get("spark.master");
+    if (master.equals("local") || master.startsWith("local[")) {
+      // With local spark context, all user sessions share the same spark context.
+      return LocalHiveSparkClient.getInstance(generateSparkConf(conf));
+    } else {
+      return new RemoteHiveSparkClient(conf);
+    }
+  }
+
+  private static Map<String, String> initiateSparkConf(Configuration hiveConf) {
+    Map<String, String> sparkConf = new HashMap<String, String>();
+
+    // set default spark configurations.
+    sparkConf.put("spark.master", SPARK_DEFAULT_MASTER);
+    sparkConf.put("spark.app.name", SPARK_DEFAULT_APP_NAME);
+    sparkConf.put("spark.serializer",
+      "org.apache.spark.serializer.KryoSerializer");
+    sparkConf.put("spark.default.parallelism", "1");
+
+    // load properties from spark-defaults.conf.
+    InputStream inputStream = null;
+    try {
+      inputStream = HiveSparkClientFactory.class.getClassLoader()
+        .getResourceAsStream(SPARK_DEFAULT_CONF_FILE);
+      if (inputStream != null) {
+        LOG.info("loading spark properties from:" + SPARK_DEFAULT_CONF_FILE);
+        Properties properties = new Properties();
+        properties.load(new InputStreamReader(inputStream, CharsetNames.UTF_8));
+        for (String propertyName : properties.stringPropertyNames()) {
+          if (propertyName.startsWith("spark")) {
+            String value = properties.getProperty(propertyName);
+            sparkConf.put(propertyName, properties.getProperty(propertyName));
+            LOG.info(String.format(
+              "load spark configuration from %s (%s -> %s).",
+              SPARK_DEFAULT_CONF_FILE, propertyName, value));
+          }
+        }
+      }
+    } catch (IOException e) {
+      LOG.info("Failed to open spark configuration file:"
+        + SPARK_DEFAULT_CONF_FILE, e);
+    } finally {
+      if (inputStream != null) {
+        try {
+          inputStream.close();
+        } catch (IOException e) {
+          LOG.debug("Failed to close inputstream.", e);
+        }
+      }
+    }
+
+    // load properties from hive configurations.
+    for (Map.Entry<String, String> entry : hiveConf) {
+      String propertyName = entry.getKey();
+      if (propertyName.startsWith("spark")) {
+        String value = entry.getValue();
+        sparkConf.put(propertyName, value);
+        LOG.info(String.format(
+          "load spark configuration from hive configuration (%s -> %s).",
+          propertyName, value));
+      }
+    }
+
+    return sparkConf;
+  }
+
+  private static SparkConf generateSparkConf(Map<String, String> conf) {
+    SparkConf sparkConf = new SparkConf(false);
+    for (Map.Entry<String, String> entry : conf.entrySet()) {
+      sparkConf.set(entry.getKey(), entry.getValue());
+    }
+    return sparkConf;
+  }
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java?rev=1640277&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java Tue Nov 18 06:46:42 2014
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * LocalSparkClient submit Spark job in local driver, it's responsible for build spark client
+ * environment and execute spark work.
+ */
+public class LocalHiveSparkClient implements HiveSparkClient {
+  private static final long serialVersionUID = 1L;
+
+  private static final String MR_JAR_PROPERTY = "tmpjars";
+  protected static transient final Log LOG = LogFactory
+      .getLog(LocalHiveSparkClient.class);
+
+  private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
+
+  private static LocalHiveSparkClient client;
+
+  public static synchronized LocalHiveSparkClient getInstance(SparkConf sparkConf) {
+    if (client == null) {
+      client = new LocalHiveSparkClient(sparkConf);
+    }
+    return client;
+  }
+
+  /**
+   * Get Spark shuffle memory per task, and total number of cores. This
+   * information can be used to estimate how many reducers a task can have.
+   *
+   * @return a tuple, the first element is the shuffle memory per task in bytes,
+   *  the second element is the number of total cores usable by the client
+   */
+  public Tuple2<Long, Integer> getMemoryAndCores() {
+    SparkContext sparkContext = sc.sc();
+    SparkConf sparkConf = sparkContext.conf();
+    int cores = sparkConf.getInt("spark.executor.cores", 1);
+    double memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.2);
+    // sc.executorMemory() is in MB, need to convert to bytes
+    long memoryPerTask =
+      (long) (sparkContext.executorMemory() * memoryFraction * 1024 * 1024 / cores);
+    int executors = sparkContext.getExecutorMemoryStatus().size();
+    int totalCores = executors * cores;
+    LOG.info("Spark cluster current has executors: " + executors
+      + ", cores per executor: " + cores + ", memory per executor: "
+      + sparkContext.executorMemory() + "M, shuffle memoryFraction: " + memoryFraction);
+    return new Tuple2<Long, Integer>(Long.valueOf(memoryPerTask),
+      Integer.valueOf(totalCores));
+  }
+
+  private JavaSparkContext sc;
+
+  private List<String> localJars = new ArrayList<String>();
+
+  private List<String> localFiles = new ArrayList<String>();
+
+  private JobMetricsListener jobMetricsListener;
+
+  private LocalHiveSparkClient(SparkConf sparkConf) {
+    sc = new JavaSparkContext(sparkConf);
+    jobMetricsListener = new JobMetricsListener();
+    sc.sc().listenerBus().addListener(jobMetricsListener);
+  }
+
+  @Override
+  public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception {
+    Context ctx = driverContext.getCtx();
+    HiveConf hiveConf = (HiveConf) ctx.getConf();
+    refreshLocalResources(sparkWork, hiveConf);
+    JobConf jobConf = new JobConf(hiveConf);
+
+    // Create temporary scratch dir
+    Path emptyScratchDir;
+    emptyScratchDir = ctx.getMRTmpPath();
+    FileSystem fs = emptyScratchDir.getFileSystem(jobConf);
+    fs.mkdirs(emptyScratchDir);
+
+    SparkCounters sparkCounters = new SparkCounters(sc, hiveConf);
+    Map<String, List<String>> prefixes = sparkWork.getRequiredCounterPrefix();
+    if (prefixes != null) {
+      for (String group : prefixes.keySet()) {
+        for (String counterName : prefixes.get(group)) {
+          sparkCounters.createCounter(group, counterName);
+        }
+      }
+    }
+    SparkReporter sparkReporter = new SparkReporter(sparkCounters);
+
+    // Generate Spark plan
+    SparkPlanGenerator gen =
+      new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir, sparkReporter);
+    SparkPlan plan = gen.generate(sparkWork);
+
+    // Execute generated plan.
+    JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
+    // We use Spark RDD async action to submit job as it's the only way to get jobId now.
+    JavaFutureAction<Void> future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
+    // As we always use foreach action to submit RDD graph, it would only trigger on job.
+    int jobId = future.jobIds().get(0);
+    SimpleSparkJobStatus sparkJobStatus =
+      new SimpleSparkJobStatus(sc, jobId, jobMetricsListener, sparkCounters, future);
+    return new SparkJobRef(Integer.toString(jobId), sparkJobStatus);
+  }
+
+  /**
+   * At this point single SparkContext is used by more than one thread, so make this
+   * method synchronized.
+   *
+   * TODO: This method can't remove a jar/resource from SparkContext. Looks like this is an
+   * issue we have to live with until multiple SparkContexts are supported in a single JVM.
+   */
+  private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
+    // add hive-exec jar
+    addJars((new JobConf(this.getClass())).getJar());
+
+    // add aux jars
+    addJars(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS));
+
+    // add added jars
+    String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR);
+    HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars);
+    addJars(addedJars);
+
+    // add plugin module jars on demand
+    // jobConf will hold all the configuration for hadoop, tez, and hive
+    JobConf jobConf = new JobConf(conf);
+    jobConf.set(MR_JAR_PROPERTY, "");
+    for (BaseWork work : sparkWork.getAllWork()) {
+      work.configureJobConf(jobConf);
+    }
+    addJars(conf.get(MR_JAR_PROPERTY));
+
+    // add added files
+    String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
+    HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles);
+    addResources(addedFiles);
+
+    // add added archives
+    String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE);
+    HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives);
+    addResources(addedArchives);
+  }
+
+  private void addResources(String addedFiles) {
+    for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) {
+      if (!localFiles.contains(addedFile)) {
+        localFiles.add(addedFile);
+        sc.addFile(addedFile);
+      }
+    }
+  }
+
+  private void addJars(String addedJars) {
+    for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) {
+      if (!localJars.contains(addedJar)) {
+        localJars.add(addedJar);
+        sc.addJar(addedJar);
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    sc.stop();
+    client = null;
+  }
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1640277&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Tue Nov 18 06:46:42 2014
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hive.spark.client.Job;
+import org.apache.hive.spark.client.JobContext;
+import org.apache.hive.spark.client.JobHandle;
+import org.apache.hive.spark.client.SparkClient;
+import org.apache.hive.spark.client.SparkClientFactory;
+import org.apache.spark.SparkException;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaPairRDD;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * RemoteSparkClient is a wrapper of {@link org.apache.hive.spark.client.SparkClient}, which
+ * wrap a spark job request and send to an remote SparkContext.
+ */
+public class RemoteHiveSparkClient implements HiveSparkClient {
+  private static final long serialVersionUID = 1L;
+
+  private static final String MR_JAR_PROPERTY = "tmpjars";
+  protected static transient final Log LOG = LogFactory
+    .getLog(RemoteHiveSparkClient.class);
+
+  private static transient final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
+
+  private transient SparkClient remoteClient;
+
+  private transient List<String> localJars = new ArrayList<String>();
+
+  private transient List<String> localFiles = new ArrayList<String>();
+
+  RemoteHiveSparkClient(Map<String, String> sparkConf) throws IOException, SparkException {
+    SparkClientFactory.initialize(sparkConf);
+    remoteClient = SparkClientFactory.createClient(sparkConf);
+  }
+
+  @Override
+  public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork) throws Exception {
+    final Context ctx = driverContext.getCtx();
+    final HiveConf hiveConf = (HiveConf) ctx.getConf();
+    refreshLocalResources(sparkWork, hiveConf);
+    final JobConf jobConf = new JobConf(hiveConf);
+
+    // Create temporary scratch dir
+    final Path emptyScratchDir = ctx.getMRTmpPath();
+    FileSystem fs = emptyScratchDir.getFileSystem(jobConf);
+    fs.mkdirs(emptyScratchDir);
+
+    final byte[] jobConfBytes = KryoSerializer.serializeJobConf(jobConf);
+    final byte[] scratchDirBytes = KryoSerializer.serialize(emptyScratchDir);
+    final byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork);
+
+    JobHandle<Serializable> jobHandle = remoteClient.submit(new Job<Serializable>() {
+      @Override
+      public Serializable call(JobContext jc) throws Exception {
+        JobConf localJobConf = KryoSerializer.deserializeJobConf(jobConfBytes);
+        Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class);
+        SparkWork localSparkWork = KryoSerializer.deserialize(sparkWorkBytes, SparkWork.class);
+
+        SparkCounters sparkCounters = new SparkCounters(jc.sc(), localJobConf);
+        Map<String, List<String>> prefixes = localSparkWork.getRequiredCounterPrefix();
+        if (prefixes != null) {
+          for (String group : prefixes.keySet()) {
+            for (String counterName : prefixes.get(group)) {
+              sparkCounters.createCounter(group, counterName);
+            }
+          }
+        }
+        SparkReporter sparkReporter = new SparkReporter(sparkCounters);
+
+        // Generate Spark plan
+        SparkPlanGenerator gen =
+          new SparkPlanGenerator(jc.sc(), null, localJobConf, localScratchDir, sparkReporter);
+        SparkPlan plan = gen.generate(localSparkWork);
+
+        // Execute generated plan.
+        JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
+        // We use Spark RDD async action to submit job as it's the only way to get jobId now.
+        JavaFutureAction<Void> future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
+        jc.monitor(future);
+        return null;
+      }
+    });
+    jobHandle.get();
+    return new SparkJobRef(jobHandle.getClientJobId());
+  }
+
+  private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
+    // add hive-exec jar
+    addJars((new JobConf(this.getClass())).getJar());
+
+    // add aux jars
+    addJars(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS));
+
+    // add added jars
+    String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR);
+    HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars);
+    addJars(addedJars);
+
+    // add plugin module jars on demand
+    // jobConf will hold all the configuration for hadoop, tez, and hive
+    JobConf jobConf = new JobConf(conf);
+    jobConf.set(MR_JAR_PROPERTY, "");
+    for (BaseWork work : sparkWork.getAllWork()) {
+      work.configureJobConf(jobConf);
+    }
+    addJars(conf.get(MR_JAR_PROPERTY));
+
+    // add added files
+    String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
+    HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles);
+    addResources(addedFiles);
+
+    // add added archives
+    String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE);
+    HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives);
+    addResources(addedArchives);
+  }
+
+  private void addResources(String addedFiles) {
+    for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) {
+      if (!localFiles.contains(addedFile)) {
+        localFiles.add(addedFile);
+        try {
+          remoteClient.addFile(SparkUtilities.getURL(addedFile));
+        } catch (MalformedURLException e) {
+          LOG.warn("Failed to add file:" + addedFile);
+        }
+      }
+    }
+  }
+
+  private void addJars(String addedJars) {
+    for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) {
+      if (!localJars.contains(addedJar)) {
+        localJars.add(addedJar);
+        try {
+          remoteClient.addJar(SparkUtilities.getURL(addedJar));
+        } catch (MalformedURLException e) {
+          LOG.warn("Failed to add jar:" + addedJar);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    remoteClient.stop();
+  }
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1640277&r1=1640276&r2=1640277&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Tue Nov 18 06:46:42 2014
@@ -110,15 +110,17 @@ public class SparkTask extends Task<Spar
 
       SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
       SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
-      sparkCounters = sparkJobStatus.getCounter();
-      SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus);
-      monitor.startMonitor();
-      SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
-      if (LOG.isInfoEnabled() && sparkStatistics != null) {
-        LOG.info(String.format("=====Spark Job[%d] statistics=====", jobRef.getJobId()));
-        logSparkStatistic(sparkStatistics);
+      if (sparkJobStatus != null) {
+        sparkCounters = sparkJobStatus.getCounter();
+        SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus);
+        monitor.startMonitor();
+        SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
+        if (LOG.isInfoEnabled() && sparkStatistics != null) {
+          LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId()));
+          logSparkStatistic(sparkStatistics);
+        }
+        sparkJobStatus.cleanup();
       }
-      sparkJobStatus.cleanup();
       rc = 0;
     } catch (Exception e) {
       LOG.error("Failed to execute spark task.", e);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1640277&r1=1640276&r2=1640277&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Tue Nov 18 06:46:42 2014
@@ -20,6 +20,12 @@ package org.apache.hadoop.hive.ql.exec.s
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
 
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+
 /**
  * Contains utilities methods used as part of Spark tasks
  */
@@ -41,4 +47,25 @@ public class SparkUtilities {
     copy.set(bw);
     return copy;
   }
+
+  public static URL getURL(String path) throws MalformedURLException {
+    if (path == null) {
+      return null;
+    }
+
+    URL url = null;
+    try {
+      URI uri = new URI(path);
+      if (uri.getScheme() != null) {
+        url = uri.toURL();
+      } else {
+        // if no file schema in path, we assume it's file on local fs.
+        url = new File(path).toURI().toURL();
+      }
+    } catch (URISyntaxException e) {
+      // do nothing here, just return null if input path is not a valid URI.
+    }
+
+    return url;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java?rev=1640277&r1=1640276&r2=1640277&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java Tue Nov 18 06:46:42 2014
@@ -18,12 +18,17 @@
 package org.apache.hadoop.hive.ql.exec.spark.session;
 
 import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.exec.spark.SparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 
+import java.io.IOException;
 import java.util.UUID;
 
 /**
@@ -31,10 +36,12 @@ import java.util.UUID;
  * SparkClient which is shared by all SparkSession instances.
  */
 public class SparkSessionImpl implements SparkSession {
+  private static final Log LOG = LogFactory.getLog(SparkSession.class);
+
   private HiveConf conf;
   private boolean isOpen;
   private final String sessionId;
-  private SparkClient sparkClient;
+  private HiveSparkClient hiveSparkClient;
 
   public SparkSessionImpl() {
     sessionId = makeSessionId();
@@ -49,8 +56,9 @@ public class SparkSessionImpl implements
   @Override
   public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception {
     Preconditions.checkState(isOpen, "Session is not open. Can't submit jobs.");
-    sparkClient = SparkClient.getInstance(driverContext.getCtx().getConf());
-    return sparkClient.execute(driverContext, sparkWork);
+    Configuration hiveConf = driverContext.getCtx().getConf();
+    hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(hiveConf);
+    return hiveSparkClient.execute(driverContext, sparkWork);
   }
 
   @Override
@@ -71,10 +79,14 @@ public class SparkSessionImpl implements
   @Override
   public void close() {
     isOpen = false;
-    if (sparkClient != null) {
-      sparkClient.close();
+    if (hiveSparkClient != null) {
+      try {
+        hiveSparkClient.close();
+      } catch (IOException e) {
+        LOG.error("Failed to close spark session (" + sessionId + ").", e);
+      }
     }
-    sparkClient = null;
+    hiveSparkClient = null;
   }
 
   public static String makeSessionId() {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java?rev=1640277&r1=1640276&r2=1640277&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java Tue Nov 18 06:46:42 2014
@@ -19,26 +19,26 @@ package org.apache.hadoop.hive.ql.exec.s
 
 public class SparkJobRef {
 
-  private int jobId;
+  private String jobId;
 
   private SparkJobStatus sparkJobStatus;
 
   public SparkJobRef() {}
 
-  public SparkJobRef(int jobId) {
+  public SparkJobRef(String jobId) {
     this.jobId = jobId;
   }
 
-  public SparkJobRef(int jobId, SparkJobStatus sparkJobStatus) {
+  public SparkJobRef(String jobId, SparkJobStatus sparkJobStatus) {
     this.jobId = jobId;
     this.sparkJobStatus = sparkJobStatus;
   }
 
-  public int getJobId() {
+  public String getJobId() {
     return jobId;
   }
 
-  public void setJobId(int jobId) {
+  public void setJobId(String jobId) {
     this.jobId = jobId;
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java?rev=1640277&r1=1640276&r2=1640277&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java Tue Nov 18 06:46:42 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
+import java.io.IOException;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
@@ -26,7 +27,9 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.spark.SparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
+import org.apache.hadoop.hive.ql.exec.spark.LocalHiveSparkClient;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -35,6 +38,7 @@ import org.apache.hadoop.hive.ql.parse.s
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 
+import org.apache.spark.SparkException;
 import scala.Tuple2;
 
 /**
@@ -69,39 +73,54 @@ public class SetSparkReducerParallelism 
 
     context.getVisitedReduceSinks().add(sink);
 
+
     if (desc.getNumReducers() <= 0) {
       if (constantReducers > 0) {
         LOG.info("Parallelism for reduce sink " + sink + " set by user to " + constantReducers);
         desc.setNumReducers(constantReducers);
       } else {
-        long numberOfBytes = 0;
+        try {
+          // TODO try to make this still work after integration with remote spark context, so that we
+          // don't break test, we should implement automatic calculate reduce number for remote spark
+          // client and refactor code later, track it with HIVE-8855.
+          HiveSparkClient sparkClient = HiveSparkClientFactory.createHiveSparkClient(context.getConf());
+          if (sparkClient instanceof LocalHiveSparkClient) {
+            LocalHiveSparkClient localHiveSparkClient = (LocalHiveSparkClient)sparkClient;
+            long numberOfBytes = 0;
+
+            // we need to add up all the estimates from the siblings of this reduce sink
+            for (Operator<? extends OperatorDesc> sibling:
+              sink.getChildOperators().get(0).getParentOperators()) {
+              if (sibling.getStatistics() != null) {
+                numberOfBytes += sibling.getStatistics().getDataSize();
+              } else {
+                LOG.warn("No stats available from: " + sibling);
+              }
+            }
+
+            if (sparkMemoryAndCores == null) {
+              sparkMemoryAndCores = localHiveSparkClient.getMemoryAndCores();
+            }
+
+            // Divide it by 2 so that we can have more reducers
+            long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2;
+            int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer,
+              maxReducers, false);
+
+            // If there are more cores, use the number of cores
+            int cores = sparkMemoryAndCores._2.intValue();
+            if (numReducers < cores) {
+              numReducers = cores;
+            }
+            LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers);
+            desc.setNumReducers(numReducers);
 
-        // we need to add up all the estimates from the siblings of this reduce sink
-        for (Operator<? extends OperatorDesc> sibling:
-          sink.getChildOperators().get(0).getParentOperators()) {
-          if (sibling.getStatistics() != null) {
-            numberOfBytes += sibling.getStatistics().getDataSize();
           } else {
-            LOG.warn("No stats available from: " + sibling);
+            sparkClient.close();
           }
+        } catch (Exception e) {
+          LOG.warn("Failed to create spark client.", e);
         }
-
-        if (sparkMemoryAndCores == null) {
-          sparkMemoryAndCores = SparkClient.getMemoryAndCores(context.getConf());
-        }
-
-        // Divide it by 2 so that we can have more reducers
-        long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2;
-        int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer,
-            maxReducers, false);
-
-        // If there are more cores, use the number of cores
-        int cores = sparkMemoryAndCores._2.intValue();
-        if (numReducers < cores) {
-          numReducers = cores;
-        }
-        LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers);
-        desc.setNumReducers(numReducers);
       }
     } else {
       LOG.info("Number of reducers determined to be: " + desc.getNumReducers());

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java?rev=1640277&r1=1640276&r2=1640277&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java Tue Nov 18 06:46:42 2014
@@ -27,7 +27,7 @@ import org.apache.hadoop.hive.common.cla
  * Defines the API for the Spark remote client.
  */
 @InterfaceAudience.Private
-public interface SparkClient {
+public interface SparkClient extends Serializable {
 
   /**
    * Submits a job for asynchronous execution.

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1640277&r1=1640276&r2=1640277&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Tue Nov 18 06:46:42 2014
@@ -190,7 +190,7 @@ class SparkClientImpl implements SparkCl
         LOG.info("No spark.home provided, calling SparkSubmit directly.");
         argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());
 
-        if (master.startsWith("local") || master.startsWith("mesos") || master.endsWith("-client")) {
+        if (master.startsWith("local") || master.startsWith("mesos") || master.endsWith("-client") || master.startsWith("spark")) {
           String mem = conf.get("spark.driver.memory");
           if (mem != null) {
             argv.add("-Xms" + mem);