You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by bu...@apache.org on 2014/01/20 19:17:35 UTC
svn commit: r894871 - in /websites/staging/crunch/trunk/content: ./
user-guide.html
Author: buildbot
Date: Mon Jan 20 18:17:34 2014
New Revision: 894871
Log:
Staging update by buildbot for crunch
Modified:
websites/staging/crunch/trunk/content/ (props changed)
websites/staging/crunch/trunk/content/user-guide.html
Propchange: websites/staging/crunch/trunk/content/
------------------------------------------------------------------------------
--- cms:source-revision (original)
+++ cms:source-revision Mon Jan 20 18:17:34 2014
@@ -1 +1 @@
-1558208
+1559795
Modified: websites/staging/crunch/trunk/content/user-guide.html
==============================================================================
--- websites/staging/crunch/trunk/content/user-guide.html (original)
+++ websites/staging/crunch/trunk/content/user-guide.html Mon Jan 20 18:17:34 2014
@@ -200,6 +200,7 @@
<li><a href="#splits">Splits</a></li>
</ol>
</li>
+<li><a href="#objectreuse">Retaining objects within DoFns</a></li>
</ol>
</li>
<li><a href="#hbase">Crunch for HBase</a></li>
@@ -1257,6 +1258,61 @@ you to split an input PCollection of Pai
split.second().write(badOutputs);
</pre>
+<p><a name="objectreuse"></a></p>
+<h3 id="retaining-objects-within-dofns">Retaining objects within DoFns</h3>
+<p>For reasons of efficiency, Hadoop MapReduce repeatedly passes the <a href="https://issues.apache.org/jira/browse/HADOOP-2399">same references as keys and values to Mappers and Reducers</a> instead of passing in new objects for each call.
+The state of the singleton key and value objects is updated between each call
+to <code>Mapper.map()</code> and <code>Reducer.reduce()</code>, as well as updating it between each
+call to <code>Iterator.next</code> while iterating over the Iterable within a Reducer.</p>
+<p>The result of this optimization in MapReduce is that a reference to an object
+received within a map or reduce call cannot be held on to past the scope of
+that single method call invocation, as its value will change between
+invocations of the method call. In some (but not all) situations, the
+consequences of this optimization affect DoFns as well, meaning that you can't
+simply retain a reference that is passed in to <code>DoFn.process</code> past the lifetime
+of a method call.</p>
+<p>A convenience method called <code>getDetachedValue</code> is specified in the <code>PType</code>
+interface to get around this limitation. Implementations of this method
+perform a deep copy of values of their configured type if needed, and return
+the value that has been "detached" from the ownership of the MapReduce
+framework.</p>
+<p>In order to make use of the <code>getDetachedValue</code> method in a PType, you need to
+have an initialized instance of the PType within the DoFn. Note that the
+initialization of the PType should be performed in the <code>initialize()</code> method of
+the DoFn.</p>
+<p>An example of a DoFn that would make use of getDetachedValue to correctly emit
+the maximum value encountered would be implemented as follows:</p>
+<div class="codehilite"><pre><span class="n">public</span> <span class="n">class</span> <span class="n">FindMax</span><span class="o"><</span><span class="n">T</span> <span class="n">extends</span> <span class="n">Comparable</span><span class="o">></span> <span class="n">extends</span> <span class="n">DoFn</span><span class="o"><</span><span class="n">T</span><span class="p">,</span> <span class="n">T</span><span class="o">></span> <span class="p">{</span>
+
+ <span class="n">private</span> <span class="n">PType</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="n">ptype</span><span class="p">;</span>
+ <span class="n">private</span> <span class="n">T</span> <span class="n">maxValue</span><span class="p">;</span>
+
+ <span class="n">public</span> <span class="n">FindMax</span><span class="p">(</span><span class="n">PType</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="n">ptype</span><span class="p">)</span> <span class="p">{</span>
+ <span class="n">this</span><span class="p">.</span><span class="n">ptype</span> <span class="p">=</span> <span class="n">ptype</span><span class="p">;</span>
+ <span class="p">}</span>
+
+ <span class="n">public</span> <span class="n">void</span> <span class="n">initialize</span><span class="p">()</span> <span class="p">{</span>
+ <span class="n">this</span><span class="p">.</span><span class="n">ptype</span><span class="p">.</span><span class="n">initialize</span><span class="p">(</span><span class="n">getConfiguration</span><span class="p">());</span>
+ <span class="p">}</span>
+
+ <span class="n">public</span> <span class="n">void</span> <span class="n">process</span><span class="p">(</span><span class="n">T</span> <span class="n">input</span><span class="p">,</span> <span class="n">Emitter</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="n">emitter</span><span class="p">)</span> <span class="p">{</span>
+ <span class="k">if</span> <span class="p">(</span><span class="n">maxValue</span> <span class="o">==</span> <span class="n">null</span> <span class="o">||</span> <span class="n">maxValue</span><span class="p">.</span><span class="n">compareTo</span><span class="p">(</span><span class="n">input</span><span class="p">)</span> <span class="o">></span> 0<span class="p">)</span> <span class="p">{</span>
+ <span class="o">//</span> <span class="n">We</span> <span class="n">need</span> <span class="n">to</span> <span class="n">call</span> <span class="n">getDetachedValue</span> <span class="n">here</span><span class="p">,</span> <span class="k">otherwise</span> <span class="n">the</span> <span class="n">internal</span>
+ <span class="o">//</span> <span class="n">state</span> <span class="n">of</span> <span class="n">maxValue</span> <span class="n">might</span> <span class="n">change</span> <span class="n">with</span> <span class="n">each</span> <span class="n">call</span> <span class="n">to</span> <span class="n">process</span><span class="p">()</span>
+ <span class="o">//</span> <span class="n">and</span> <span class="n">we</span> <span class="n">won</span><span class="o">'</span><span class="n">t</span> <span class="n">hold</span> <span class="n">on</span> <span class="n">to</span> <span class="n">the</span> <span class="n">max</span> <span class="n">value</span>
+ <span class="n">maxValue</span> <span class="p">=</span> <span class="n">ptype</span><span class="p">.</span><span class="n">getDetachedValue</span><span class="p">(</span><span class="n">input</span><span class="p">);</span>
+ <span class="p">}</span>
+ <span class="p">}</span>
+
+ <span class="n">public</span> <span class="n">void</span> <span class="n">cleanup</span><span class="p">(</span><span class="n">Emitter</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="n">emitter</span><span class="p">)</span> <span class="p">{</span>
+ <span class="k">if</span> <span class="p">(</span><span class="n">maxValue</span> !<span class="p">=</span> <span class="n">null</span><span class="p">)</span> <span class="p">{</span>
+ <span class="n">emitter</span><span class="p">.</span><span class="n">emit</span><span class="p">(</span><span class="n">maxValue</span><span class="p">);</span>
+ <span class="p">}</span>
+ <span class="p">}</span>
+<span class="p">}</span>
+</pre></div>
+
+
<p><a name="hbase"></a></p>
<h2 id="crunch-for-hbase">Crunch for HBase</h2>
<p>Crunch is an excellent platform for creating pipelines that involve processing data from HBase tables. Because of Crunch's