You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/01/08 03:33:12 UTC

svn commit: r1650203 - /spark/site/docs/1.2.0/job-scheduling.html

Author: andrewor14
Date: Thu Jan  8 02:33:11 2015
New Revision: 1650203

URL: http://svn.apache.org/r1650203
Log:
Push dynamic allocation docs

Modified:
    spark/site/docs/1.2.0/job-scheduling.html

Modified: spark/site/docs/1.2.0/job-scheduling.html
URL: http://svn.apache.org/viewvc/spark/site/docs/1.2.0/job-scheduling.html?rev=1650203&r1=1650202&r2=1650203&view=diff
==============================================================================
--- spark/site/docs/1.2.0/job-scheduling.html (original)
+++ spark/site/docs/1.2.0/job-scheduling.html Thu Jan  8 02:33:11 2015
@@ -6,7 +6,7 @@
     <head>
         <meta charset="utf-8">
         <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
-        <title>Job Scheduling - Spark 1.2.0 Documentation</title>
+        <title>Job Scheduling - Spark 1.3.0 Documentation</title>
         <meta name="description" content="">
 
         
@@ -53,7 +53,7 @@
             <div class="navbar-inner">
                 <div class="container">
                     <div class="brand"><a href="index.html">
-                      <img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">1.2.0</span>
+                      <img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">1.3.0</span>
                     </div>
                     <ul class="nav">
                         <!--TODO(andyk): Add class="active" attribute to li some how.-->
@@ -113,7 +113,7 @@
                             </ul>
                         </li>
                     </ul>
-                    <!--<p class="navbar-text pull-right"><span class="version-text">v1.2.0</span></p>-->
+                    <!--<p class="navbar-text pull-right"><span class="version-text">v1.3.0</span></p>-->
                 </div>
             </div>
         </div>
@@ -125,7 +125,19 @@
 
           <ul id="markdown-toc">
   <li><a href="#overview">Overview</a></li>
-  <li><a href="#scheduling-across-applications">Scheduling Across Applications</a></li>
+  <li><a href="#scheduling-across-applications">Scheduling Across Applications</a>    <ul>
+      <li><a href="#dynamic-resource-allocation">Dynamic Resource Allocation</a>        <ul>
+          <li><a href="#configuration-and-setup">Configuration and Setup</a></li>
+          <li><a href="#resource-allocation-policy">Resource Allocation Policy</a>            <ul>
+              <li><a href="#request-policy">Request Policy</a></li>
+              <li><a href="#remove-policy">Remove Policy</a></li>
+            </ul>
+          </li>
+          <li><a href="#graceful-decommission-of-executors">Graceful Decommission of Executors</a></li>
+        </ul>
+      </li>
+    </ul>
+  </li>
   <li><a href="#scheduling-within-an-application">Scheduling Within an Application</a>    <ul>
       <li><a href="#fair-scheduler-pools">Fair Scheduler Pools</a></li>
       <li><a href="#default-behavior-of-pools">Default Behavior of Pools</a></li>
@@ -186,6 +198,117 @@ the same RDDs. For example, the <a href=
 queries. In future releases, in-memory storage systems such as <a href="http://tachyon-project.org">Tachyon</a> will
 provide another approach to share RDDs.</p>
 
+<h2 id="dynamic-resource-allocation">Dynamic Resource Allocation</h2>
+
+<p>Spark 1.2 introduces the ability to dynamically scale the set of cluster resources allocated to
+your application up and down based on the workload. This means that your application may give
+resources back to the cluster if they are no longer used and request them again later when there
+is demand. This feature is particularly useful if multiple applications share resources in your
+Spark cluster. If a subset of the resources allocated to an application becomes idle, it can be
+returned to the cluster&#8217;s pool of resources and acquired by other applications. In Spark, dynamic
+resource allocation is performed on the granularity of the executor and can be enabled through
+<code>spark.dynamicAllocation.enabled</code>.</p>
+
+<p>This feature is currently disabled by default and available only on <a href="running-on-yarn.html">YARN</a>.
+A future release will extend this to <a href="spark-standalone.html">standalone mode</a> and
+<a href="running-on-mesos.html#mesos-run-modes">Mesos coarse-grained mode</a>. Note that although Spark on
+Mesos already has a similar notion of dynamic resource sharing in fine-grained mode, enabling
+dynamic allocation allows your Mesos application to take advantage of coarse-grained low-latency
+scheduling while sharing cluster resources efficiently.</p>
+
+<h3 id="configuration-and-setup">Configuration and Setup</h3>
+
+<p>All configurations used by this feature live under the <code>spark.dynamicAllocation.*</code> namespace.
+To enable this feature, your application must set <code>spark.dynamicAllocation.enabled</code> to <code>true</code> and
+provide lower and upper bounds for the number of executors through
+<code>spark.dynamicAllocation.minExecutors</code> and <code>spark.dynamicAllocation.maxExecutors</code>. Other relevant
+configurations are described on the <a href="configuration.html#dynamic-allocation">configurations page</a>
+and in the subsequent sections in detail.</p>
+
+<p>Additionally, your application must use an external shuffle service. The purpose of the service is
+to preserve the shuffle files written by executors so the executors can be safely removed (more
+detail described <a href="job-scheduling.html#graceful-decommission-of-executors">below</a>). To enable
+this service, set <code>spark.shuffle.service.enabled</code> to <code>true</code>. In YARN, this external shuffle service
+is implemented in <code>org.apache.spark.yarn.network.YarnShuffleService</code> that runs in each <code>NodeManager</code>
+in your cluster. To start this service, follow these steps:</p>
+
+<ol>
+  <li>Build Spark with the <a href="building-spark.html">YARN profile</a>. Skip this step if you are using a
+pre-packaged distribution.</li>
+  <li>Locate the <code>spark-&lt;version&gt;-yarn-shuffle.jar</code>. This should be under
+<code>$SPARK_HOME/network/yarn/target/scala-&lt;version&gt;</code> if you are building Spark yourself, and under
+<code>lib</code> if you are using a distribution.</li>
+  <li>Add this jar to the classpath of all <code>NodeManager</code>s in your cluster.</li>
+  <li>In the <code>yarn-site.xml</code> on each node, add <code>spark_shuffle</code> to <code>yarn.nodemanager.aux-services</code>,
+then set <code>yarn.nodemanager.aux-services.spark_shuffle.class</code> to
+<code>org.apache.spark.network.yarn.YarnShuffleService</code>. Additionally, set all relevant
+<code>spark.shuffle.service.*</code> <a href="configuration.html">configurations</a>.</li>
+  <li>Restart all <code>NodeManager</code>s in your cluster.</li>
+</ol>
+
+<h3 id="resource-allocation-policy">Resource Allocation Policy</h3>
+
+<p>At a high level, Spark should relinquish executors when they are no longer used and acquire
+executors when they are needed. Since there is no definitive way to predict whether an executor
+that is about to be removed will run a task in the near future, or whether a new executor that is
+about to be added will actually be idle, we need a set of heuristics to determine when to remove
+and request executors.</p>
+
+<h4 id="request-policy">Request Policy</h4>
+
+<p>A Spark application with dynamic allocation enabled requests additional executors when it has
+pending tasks waiting to be scheduled. This condition necessarily implies that the existing set
+of executors is insufficient to simultaneously saturate all tasks that have been submitted but
+not yet finished.</p>
+
+<p>Spark requests executors in rounds. The actual request is triggered when there have been pending
+tasks for <code>spark.dynamicAllocation.schedulerBacklogTimeout</code> seconds, and then triggered again
+every <code>spark.dynamicAllocation.sustainedSchedulerBacklogTimeout</code> seconds thereafter if the queue
+of pending tasks persists. Additionally, the number of executors requested in each round increases
+exponentially from the previous round. For instance, an application will add 1 executor in the
+first round, and then 2, 4, 8 and so on executors in the subsequent rounds.</p>
+
+<p>The motivation for an exponential increase policy is twofold. First, an application should request
+executors cautiously in the beginning in case it turns out that only a few additional executors is
+sufficient. This echoes the justification for TCP slow start. Second, the application should be
+able to ramp up its resource usage in a timely manner in case it turns out that many executors are
+actually needed.</p>
+
+<h4 id="remove-policy">Remove Policy</h4>
+
+<p>The policy for removing executors is much simpler. A Spark application removes an executor when
+it has been idle for more than <code>spark.dynamicAllocation.executorIdleTimeout</code> seconds. Note that,
+under most circumstances, this condition is mutually exclusive with the request condition, in that
+an executor should not be idle if there are still pending tasks to be scheduled.</p>
+
+<h3 id="graceful-decommission-of-executors">Graceful Decommission of Executors</h3>
+
+<p>Before dynamic allocation, a Spark executor exits either on failure or when the associated
+application has also exited. In both scenarios, all state associated with the executor is no
+longer needed and can be safely discarded. With dynamic allocation, however, the application
+is still running when an executor is explicitly removed. If the application attempts to access
+state stored in or written by the executor, it will have to perform a recompute the state. Thus,
+Spark needs a mechanism to decommission an executor gracefully by preserving its state before
+removing it.</p>
+
+<p>This requirement is especially important for shuffles. During a shuffle, the Spark executor first
+writes its own map outputs locally to disk, and then acts as the server for those files when other
+executors attempt to fetch them. In the event of stragglers, which are tasks that run for much
+longer than their peers, dynamic allocation may remove an executor before the shuffle completes,
+in which case the shuffle files written by that executor must be recomputed unnecessarily.</p>
+
+<p>The solution for preserving shuffle files is to use an external shuffle service, also introduced
+in Spark 1.2. This service refers to a long-running process that runs on each node of your cluster
+independently of your Spark applications and their executors. If the service is enabled, Spark
+executors will fetch shuffle files from the service instead of from each other. This means any
+shuffle state written by an executor may continue to be served beyond the executor&#8217;s lifetime.</p>
+
+<p>In addition to writing shuffle files, executors also cache data either on disk or in memory.
+When an executor is removed, however, all cached data will no longer be accessible. There is
+currently not yet a solution for this in Spark 1.2. In future releases, the cached data may be
+preserved through an off-heap storage similar in spirit to how shuffle files are preserved through
+the external shuffle service.</p>
+
 <h1 id="scheduling-within-an-application">Scheduling Within an Application</h1>
 
 <p>Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if
@@ -209,9 +332,10 @@ mode is best for multi-user settings.</p
 <p>To enable the fair scheduler, simply set the <code>spark.scheduler.mode</code> property to <code>FAIR</code> when configuring
 a SparkContext:</p>
 
-<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">conf</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">SparkConf</span><span class="o">().</span><span class="n">setMaster</span><span class="o">(...).</span><span class="n">setAppName</span><span class="o">(...)</span>
+<div class="highlight"><pre><code class="scala"><span class="k">val</span> <span class="n">conf</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">SparkConf</span><span class="o">().</span><span class="n">setMaster</span><span class="o">(...).</span><span class="n">setAppName</span><span class="o">(...)</span>
 <span class="n">conf</span><span class="o">.</span><span class="n">set</span><span class="o">(</span><span class="s">&quot;spark.scheduler.mode&quot;</span><span class="o">,</span> <span class="s">&quot;FAIR&quot;</span><span class="o">)</span>
-<span class="k">val</span> <span class="n">sc</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">SparkContext</span><span class="o">(</span><span class="n">conf</span><span class="o">)</span></code></pre></div>
+<span class="k">val</span> <span class="n">sc</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">SparkContext</span><span class="o">(</span><span class="n">conf</span><span class="o">)</span>
+</code></pre></div>
 
 <h2 id="fair-scheduler-pools">Fair Scheduler Pools</h2>
 
@@ -225,15 +349,17 @@ many concurrent jobs they have instead o
 adding the <code>spark.scheduler.pool</code> &#8220;local property&#8221; to the SparkContext in the thread that&#8217;s submitting them.
 This is done as follows:</p>
 
-<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// Assuming sc is your SparkContext variable</span>
-<span class="n">sc</span><span class="o">.</span><span class="n">setLocalProperty</span><span class="o">(</span><span class="s">&quot;spark.scheduler.pool&quot;</span><span class="o">,</span> <span class="s">&quot;pool1&quot;</span><span class="o">)</span></code></pre></div>
+<div class="highlight"><pre><code class="scala"><span class="c1">// Assuming sc is your SparkContext variable</span>
+<span class="n">sc</span><span class="o">.</span><span class="n">setLocalProperty</span><span class="o">(</span><span class="s">&quot;spark.scheduler.pool&quot;</span><span class="o">,</span> <span class="s">&quot;pool1&quot;</span><span class="o">)</span>
+</code></pre></div>
 
 <p>After setting this local property, <em>all</em> jobs submitted within this thread (by calls in this thread
 to <code>RDD.save</code>, <code>count</code>, <code>collect</code>, etc) will use this pool name. The setting is per-thread to make
 it easy to have a thread run multiple jobs on behalf of the same user. If you&#8217;d like to clear the
 pool that a thread is associated with, simply call:</p>
 
-<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">sc</span><span class="o">.</span><span class="n">setLocalProperty</span><span class="o">(</span><span class="s">&quot;spark.scheduler.pool&quot;</span><span class="o">,</span> <span class="kc">null</span><span class="o">)</span></code></pre></div>
+<div class="highlight"><pre><code class="scala"><span class="n">sc</span><span class="o">.</span><span class="n">setLocalProperty</span><span class="o">(</span><span class="s">&quot;spark.scheduler.pool&quot;</span><span class="o">,</span> <span class="kc">null</span><span class="o">)</span>
+</code></pre></div>
 
 <h2 id="default-behavior-of-pools">Default Behavior of Pools</h2>
 
@@ -267,12 +393,13 @@ of the cluster. By default, each pool&#8
 and setting a <code>spark.scheduler.allocation.file</code> property in your
 <a href="configuration.html#spark-properties">SparkConf</a>.</p>
 
-<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">conf</span><span class="o">.</span><span class="n">set</span><span class="o">(</span><span class="s">&quot;spark.scheduler.allocation.file&quot;</span><span class="o">,</span> <span class="s">&quot;/path/to/file&quot;</span><span class="o">)</span></code></pre></div>
+<div class="highlight"><pre><code class="scala"><span class="n">conf</span><span class="o">.</span><span class="n">set</span><span class="o">(</span><span class="s">&quot;spark.scheduler.allocation.file&quot;</span><span class="o">,</span> <span class="s">&quot;/path/to/file&quot;</span><span class="o">)</span>
+</code></pre></div>
 
 <p>The format of the XML file is simply a <code>&lt;pool&gt;</code> element for each pool, with different elements
 within it for the various settings. For example:</p>
 
-<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="cp">&lt;?xml version=&quot;1.0&quot;?&gt;</span>
+<div class="highlight"><pre><code class="xml"><span class="cp">&lt;?xml version=&quot;1.0&quot;?&gt;</span>
 <span class="nt">&lt;allocations&gt;</span>
   <span class="nt">&lt;pool</span> <span class="na">name=</span><span class="s">&quot;production&quot;</span><span class="nt">&gt;</span>
     <span class="nt">&lt;schedulingMode&gt;</span>FAIR<span class="nt">&lt;/schedulingMode&gt;</span>
@@ -284,7 +411,8 @@ within it for the various settings. For
     <span class="nt">&lt;weight&gt;</span>2<span class="nt">&lt;/weight&gt;</span>
     <span class="nt">&lt;minShare&gt;</span>3<span class="nt">&lt;/minShare&gt;</span>
   <span class="nt">&lt;/pool&gt;</span>
-<span class="nt">&lt;/allocations&gt;</span></code></pre></div>
+<span class="nt">&lt;/allocations&gt;</span>
+</code></pre></div>
 
 <p>A full example is also available in <code>conf/fairscheduler.xml.template</code>. Note that any pools not
 configured in the XML file will simply get default values for all settings (scheduling mode FIFO,



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org