You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sarath Chandra <sa...@algofusiontech.com> on 2014/09/30 16:37:39 UTC

Parallel spark jobs on mesos cluster

Hi All,

I have a requirement to process a set of files in parallel. So I'm
submitting spark jobs using java's ExecutorService. But when I do this way,
1 or more jobs are failing with status as "EXITED". Earlier I tried with a
standalone spark cluster setting the job scheduling to "Fair Scheduling". I
face same issue and hence tried with mesos cluster.

Please let me know the reasons for job failure and guide me if I'm missing
something. Is it right way to do? What is best way to achieve my objective?

I have a mesos cluster with 1 master and 2 slaves (total 10 cores and 12GB
memory available).
My spark job program is as below -

*Test9.scala*
*class Test9 extends Callable[Boolean] {*
*  val logger = Logger.getLogger("Test9");*
*  val su = new SparkHadoopUtil;*
*  var filePath: String = _;*
*  def this(filePath: String) {*
*    this;*
*    this.filePath = filePath;*
*    logger.setLevel(Level.FINER);*
*  }*
*  def call():Boolean = {*
*    logger.log(Level.INFO, "Getting spark configuration...");*
*    val conf = new SparkConf()*
*      .setMaster("mesos://slave1:5050")*
*      .setAppName("ParallelTest")*
*      .setSparkHome("/usr/local/spark-1.0.1-bin-hadoop1")*
*      .set("spark.executor.memory", "3g")*
*      .set("spark.cores.max", "4")*
*      .set("spark.task.cpus","4")*
*      .set("spark.executor.uri",
"hdfs://sarath:54310/user/hduser/spark-1.0.1-bin-hadoop1.tgz");*
*    logger.log(Level.INFO, "Getting spark context...");*
*    var sc = new SparkContext(conf);*
*
sc.addJar("/home/sarath/Projects/Workspaces/SparkScala/test1/target/test1-0.1.jar");*
*    var lines: RDD[(Long, String)] = null;*
*    su.runAsSparkUser(() => {*
*      logger.log(Level.INFO, "Loading HDFS file...");*
*      lines = sc.newAPIHadoopFile("hdfs://sarath:54310/user/hduser/" +
filePath,*
*        classOf[TextInputFormat],*
*        classOf[LongWritable],*
*        classOf[Text],*
*        new Configuration).map(l => (l._1.get, l._2.toString()));*
*    });*
*    logger.log(Level.INFO, "Got " + lines.count + " lines...");*
*    logger.log(Level.FINE, "Processing lines...");*
*    val processed = lines.map(l => l._1 + "," + l._2);*
*    logger.log(Level.INFO, "Processed " + processed.count + " lines...");*
*    sc.stop;*
*    true;*
*  }*

My job submitter program is as below -

*Test10.scala*
*def main(args: Array[String]):Unit = {*
*    val files = Array("load_test/17786_1.csv",*
*    "load_test/17786_2.csv",*
*    "load_test/17802_1.csv",*
*    "load_test/17802_2.csv");*
*    val es = Executors.newFixedThreadPool(4);*
*    val jobs = new HashMap[Integer,Future[Boolean]];*
*    var index = 1;*
*    files.foreach(s => {*
*      jobs.put(index, es.submit(new Test9(s)));*
*      index = index + 1;*
*    });*
*    es.shutdown;*
*    es.awaitTermination(Long.MaxValue, TimeUnit.SECONDS);*
*    jobs.keySet.foreach(key => {*
*      println("Result of job" + key + " is " + jobs.get(key).get);*
*    });*
*  }*

~Sarath