You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by bu...@apache.org on 2014/01/20 19:17:35 UTC

svn commit: r894871 - in /websites/staging/crunch/trunk/content: ./ user-guide.html

Author: buildbot
Date: Mon Jan 20 18:17:34 2014
New Revision: 894871

Log:
Staging update by buildbot for crunch

Modified:
    websites/staging/crunch/trunk/content/   (props changed)
    websites/staging/crunch/trunk/content/user-guide.html

Propchange: websites/staging/crunch/trunk/content/
------------------------------------------------------------------------------
--- cms:source-revision (original)
+++ cms:source-revision Mon Jan 20 18:17:34 2014
@@ -1 +1 @@
-1558208
+1559795

Modified: websites/staging/crunch/trunk/content/user-guide.html
==============================================================================
--- websites/staging/crunch/trunk/content/user-guide.html (original)
+++ websites/staging/crunch/trunk/content/user-guide.html Mon Jan 20 18:17:34 2014
@@ -200,6 +200,7 @@
 <li><a href="#splits">Splits</a></li>
 </ol>
 </li>
+<li><a href="#objectreuse">Retaining objects within DoFns</a></li>
 </ol>
 </li>
 <li><a href="#hbase">Crunch for HBase</a></li>
@@ -1257,6 +1258,61 @@ you to split an input PCollection of Pai
   split.second().write(badOutputs);
 </pre>
 
+<p><a name="objectreuse"></a></p>
+<h3 id="retaining-objects-within-dofns">Retaining objects within DoFns</h3>
+<p>For reasons of efficiency, Hadoop MapReduce repeatedly passes the <a href="https://issues.apache.org/jira/browse/HADOOP-2399">same references as keys and values to Mappers and Reducers</a> instead of passing in new objects for each call. 
+The state of the singleton key and value objects is updated between each call 
+to <code>Mapper.map()</code> and <code>Reducer.reduce()</code>, as well as updating it between each 
+call to <code>Iterator.next</code> while iterating over the Iterable within a Reducer.</p>
+<p>The result of this optimization in MapReduce is that a reference to an object 
+received within a map or reduce call cannot be held on to past the scope of 
+that single method call invocation, as its value will change between 
+invocations of the method call. In some (but not all) situations, the 
+consequences of this optimization affect DoFns as well, meaning that you can't 
+simply retain a reference that is passed in to <code>DoFn.process</code> past the lifetime 
+of a method call.</p>
+<p>A convenience method called <code>getDetachedValue</code> is specified in the <code>PType</code> 
+interface to get around this limitation. Implementations of this method 
+perform a deep copy of values of their configured type if needed, and return 
+the value that has been "detached" from the ownership of the MapReduce 
+framework.</p>
+<p>In order to make use of the <code>getDetachedValue</code> method in a PType, you need to 
+have an initialized instance of the PType within the DoFn. Note that the 
+initialization of the PType should be performed in the <code>initialize()</code> method of 
+the DoFn.</p>
+<p>An example of a DoFn that would make use of getDetachedValue to correctly emit 
+the maximum value encountered would be implemented as follows:</p>
+<div class="codehilite"><pre><span class="n">public</span> <span class="n">class</span> <span class="n">FindMax</span><span class="o">&lt;</span><span class="n">T</span> <span class="n">extends</span> <span class="n">Comparable</span><span class="o">&gt;</span> <span class="n">extends</span> <span class="n">DoFn</span><span class="o">&lt;</span><span class="n">T</span><span class="p">,</span> <span class="n">T</span><span class="o">&gt;</span> <span class="p">{</span>
+
+  <span class="n">private</span> <span class="n">PType</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="n">ptype</span><span class="p">;</span>
+  <span class="n">private</span> <span class="n">T</span> <span class="n">maxValue</span><span class="p">;</span>
+
+  <span class="n">public</span> <span class="n">FindMax</span><span class="p">(</span><span class="n">PType</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="n">ptype</span><span class="p">)</span> <span class="p">{</span>
+    <span class="n">this</span><span class="p">.</span><span class="n">ptype</span> <span class="p">=</span> <span class="n">ptype</span><span class="p">;</span>
+  <span class="p">}</span>
+
+  <span class="n">public</span> <span class="n">void</span> <span class="n">initialize</span><span class="p">()</span> <span class="p">{</span>
+    <span class="n">this</span><span class="p">.</span><span class="n">ptype</span><span class="p">.</span><span class="n">initialize</span><span class="p">(</span><span class="n">getConfiguration</span><span class="p">());</span>
+  <span class="p">}</span>
+
+  <span class="n">public</span> <span class="n">void</span> <span class="n">process</span><span class="p">(</span><span class="n">T</span> <span class="n">input</span><span class="p">,</span> <span class="n">Emitter</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="n">emitter</span><span class="p">)</span> <span class="p">{</span>
+    <span class="k">if</span> <span class="p">(</span><span class="n">maxValue</span> <span class="o">==</span> <span class="n">null</span> <span class="o">||</span> <span class="n">maxValue</span><span class="p">.</span><span class="n">compareTo</span><span class="p">(</span><span class="n">input</span><span class="p">)</span> <span class="o">&gt;</span> 0<span class="p">)</span> <span class="p">{</span>
+      <span class="o">//</span> <span class="n">We</span> <span class="n">need</span> <span class="n">to</span> <span class="n">call</span> <span class="n">getDetachedValue</span> <span class="n">here</span><span class="p">,</span> <span class="k">otherwise</span> <span class="n">the</span> <span class="n">internal</span>
+      <span class="o">//</span> <span class="n">state</span> <span class="n">of</span> <span class="n">maxValue</span> <span class="n">might</span> <span class="n">change</span> <span class="n">with</span> <span class="n">each</span> <span class="n">call</span> <span class="n">to</span> <span class="n">process</span><span class="p">()</span>
+      <span class="o">//</span> <span class="n">and</span> <span class="n">we</span> <span class="n">won</span><span class="o">&#39;</span><span class="n">t</span> <span class="n">hold</span> <span class="n">on</span> <span class="n">to</span> <span class="n">the</span> <span class="n">max</span> <span class="n">value</span>
+      <span class="n">maxValue</span> <span class="p">=</span> <span class="n">ptype</span><span class="p">.</span><span class="n">getDetachedValue</span><span class="p">(</span><span class="n">input</span><span class="p">);</span>
+    <span class="p">}</span>
+  <span class="p">}</span>
+
+  <span class="n">public</span> <span class="n">void</span> <span class="n">cleanup</span><span class="p">(</span><span class="n">Emitter</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="n">emitter</span><span class="p">)</span> <span class="p">{</span>
+    <span class="k">if</span> <span class="p">(</span><span class="n">maxValue</span> !<span class="p">=</span> <span class="n">null</span><span class="p">)</span> <span class="p">{</span>
+      <span class="n">emitter</span><span class="p">.</span><span class="n">emit</span><span class="p">(</span><span class="n">maxValue</span><span class="p">);</span>
+    <span class="p">}</span>
+  <span class="p">}</span>
+<span class="p">}</span>
+</pre></div>
+
+
 <p><a name="hbase"></a></p>
 <h2 id="crunch-for-hbase">Crunch for HBase</h2>
 <p>Crunch is an excellent platform for creating pipelines that involve processing data from HBase tables. Because of Crunch's