You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by bu...@apache.org on 2014/01/20 21:08:21 UTC

svn commit: r894879 - in /websites/staging/crunch/trunk/content: ./ user-guide.html

Author: buildbot
Date: Mon Jan 20 20:08:21 2014
New Revision: 894879

Log:
Staging update by buildbot for crunch

Modified:
    websites/staging/crunch/trunk/content/   (props changed)
    websites/staging/crunch/trunk/content/user-guide.html

Propchange: websites/staging/crunch/trunk/content/
------------------------------------------------------------------------------
--- cms:source-revision (original)
+++ cms:source-revision Mon Jan 20 20:08:21 2014
@@ -1 +1 @@
-1559795
+1559821

Modified: websites/staging/crunch/trunk/content/user-guide.html
==============================================================================
--- websites/staging/crunch/trunk/content/user-guide.html (original)
+++ websites/staging/crunch/trunk/content/user-guide.html Mon Jan 20 20:08:21 2014
@@ -211,6 +211,7 @@
 <li><a href="#mempipeline">MemPipeline</a></li>
 </ol>
 </li>
+<li><a href="#testing">Unit Testing Pipelines</a></li>
 </ol>
 <p><a name="intro"></a></p>
 <h2 id="introduction-to-crunch">Introduction to Crunch</h2>
@@ -1471,6 +1472,104 @@ read side because Crunch does not an equ
 on the read side. Often the best way to verify that the contents of your pipeline are correct is by using the
 <code>materialize()</code> method to get a reference to the contents of the in-memory collection and then verify them directly,
 without writing them out to disk.</p>
+<p><a name="testing"></a></p>
+<h2 id="unit-testing-pipelines">Unit Testing Pipelines</h2>
+<p>For production data pipelines, unit tests are an absolute must. The <a href="#mempipeline">MemPipeline</a> implementation of the Pipeline
+interface has several tools to help developers create effective unit tests, which will be detailed in this section.</p>
+<h3 id="unit-testing-dofns">Unit Testing DoFns</h3>
+<p>Many of the DoFn implementations, such as <code>MapFn</code> and <code>FilterFn</code>, are very easy to test, since they accept a single input
+and return a single output. For general purpose DoFns, we need an instance of the <a href="apidocs/0.9.0/org/apache/crunch/Emitter.html">Emitter</a>
+interface that we can pass to the DoFn's <code>process</code> method and then read in the values that are written by the function. Support
+for this pattern is provided by the <a href="apidocs/0.9.0/org/apache/crunch/impl/mem/emit/InMemoryEmitter.html">InMemoryEmitter</a> class, which
+has a <code>List&lt;T&gt; getOutput()</code> method that can be used to read the values that were passed to the Emitter instance by a DoFn instance:</p>
+<div class="codehilite"><pre><span class="p">@</span><span class="n">Test</span>
+<span class="n">public</span> <span class="n">void</span> <span class="n">testToUpperCaseFn</span><span class="p">()</span> <span class="p">{</span>
+  <span class="n">InMemoryEmitter</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">emitter</span> <span class="p">=</span> <span class="n">new</span> <span class="n">InMemoryEmitter</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="p">();</span>
+  <span class="n">new</span> <span class="n">ToUpperCaseFn</span><span class="p">().</span><span class="n">process</span><span class="p">(</span>&quot;<span class="n">input</span>&quot;<span class="p">,</span> <span class="n">emitter</span><span class="p">);</span>
+  <span class="n">assertEquals</span><span class="p">(</span><span class="n">ImmutableList</span><span class="p">.</span><span class="n">of</span><span class="p">(</span>&quot;<span class="n">INPUT</span>&quot;<span class="p">),</span> <span class="n">emitter</span><span class="p">.</span><span class="n">getOutput</span><span class="p">());</span>
+<span class="p">}</span>
+</pre></div>
+
+
+<h3 id="testing-complex-dofns-and-pipelines">Testing Complex DoFns and Pipelines</h3>
+<p>Many of the DoFns we write involve more complex processing that require that our DoFn be initialized and cleaned up, or that
+define Counters that we use to track the inputs that we receive. In order to ensure that our DoFns are working properly across
+their entire lifecycle, it's best to use the <a href="#mempipeline">MemPipeline</a> implementation to create in-memory instances of
+PCollections and PTables that contain a small amount of test data and apply our DoFns to those PCollections to test their
+functionality. We can easily retrieve the contents of any in-memory PCollection by calling its <code>Iterable&lt;T&gt; materialize()</code>
+method, which will return immediately. We can also track the values of any Counters that were called as the DoFns were
+executed against the test data by calling the static <code>getCounters()</code> method on the MemPipeline instance, and reset
+those Counters between test runs by calling the static <code>clearCounters()</code> method:</p>
+<div class="codehilite"><pre><span class="n">public</span> <span class="n">static</span> <span class="n">class</span> <span class="n">UpperCaseWithCounterFn</span> <span class="n">extends</span> <span class="n">DoFn</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="p">{</span>
+  <span class="p">@</span><span class="n">Override</span>
+  <span class="n">public</span> <span class="n">void</span> <span class="n">process</span><span class="p">(</span><span class="n">String</span> <span class="n">input</span><span class="p">,</span> <span class="n">Emitter</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="n">emitter</span><span class="p">)</span> <span class="p">{</span>
+    <span class="n">String</span> <span class="n">upper</span> <span class="p">=</span> <span class="n">input</span><span class="p">.</span><span class="n">toUpperCase</span><span class="p">();</span>
+    <span class="k">if</span> <span class="p">(</span>!<span class="n">upper</span><span class="p">.</span><span class="n">equals</span><span class="p">(</span><span class="n">input</span><span class="p">))</span> <span class="p">{</span>
+      <span class="n">increment</span><span class="p">(</span>&quot;<span class="n">UpperCase</span>&quot;<span class="p">,</span> &quot;<span class="n">modified</span>&quot;<span class="p">);</span>
+    <span class="p">}</span>
+    <span class="n">emitter</span><span class="p">.</span><span class="n">emit</span><span class="p">(</span><span class="n">upper</span><span class="p">);</span>
+  <span class="p">}</span>
+<span class="p">}</span>
+
+<span class="p">@</span><span class="n">Before</span>
+<span class="n">public</span> <span class="n">void</span> <span class="n">setUp</span><span class="p">()</span> <span class="n">throws</span> <span class="n">Exception</span> <span class="p">{</span>
+  <span class="n">MemPipeline</span><span class="p">.</span><span class="n">clearCounters</span><span class="p">();</span>
+<span class="p">}</span>
+
+<span class="p">@</span><span class="n">Test</span>
+<span class="n">public</span> <span class="n">void</span> <span class="n">testToUpperCase_WithPipeline</span><span class="p">()</span> <span class="p">{</span>
+  <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">inputStrings</span> <span class="p">=</span> <span class="n">MemPipeline</span><span class="p">.</span><span class="n">collectionOf</span><span class="p">(</span>&quot;<span class="n">a</span>&quot;<span class="p">,</span> &quot;<span class="n">B</span>&quot;<span class="p">,</span> &quot;<span class="n">c</span>&quot;<span class="p">);</span>
+  <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">upperCaseStrings</span> <span class="p">=</span> <span class="n">inputStrings</span><span class="p">.</span><span class="n">parallelDo</span><span class="p">(</span><span class="n">new</span> <span class="n">ToUpperCaseFn</span><span class="p">(),</span> <span class="n">Writables</span><span class="p">.</span><span class="n">strings</span><span class="p">());</span>
+  <span class="n">assertEquals</span><span class="p">(</span><span class="n">ImmutableList</span><span class="p">.</span><span class="n">of</span><span class="p">(</span>&quot;<span class="n">A</span>&quot;<span class="p">,</span> &quot;<span class="n">B</span>&quot;<span class="p">,</span> &quot;<span class="n">C</span>&quot;<span class="p">),</span> <span class="n">Lists</span><span class="p">.</span><span class="n">newArrayList</span><span class="p">(</span><span class="n">upperCaseStrings</span><span class="p">.</span><span class="n">materialize</span><span class="p">()));</span>
+  <span class="n">assertEquals</span><span class="p">(</span>2<span class="n">L</span><span class="p">,</span> <span class="n">MemPipeline</span><span class="p">.</span><span class="n">getCounters</span><span class="p">().</span><span class="n">findCounter</span><span class="p">(</span>&quot;<span class="n">UpperCase</span>&quot;<span class="p">,</span> &quot;<span class="n">modified</span>&quot;<span class="p">).</span><span class="n">getValue</span><span class="p">());</span>
+<span class="p">}</span>
+</pre></div>
+
+
+<h3 id="designing-testable-data-pipelines">Designing Testable Data Pipelines</h3>
+<p>In the same way that we try to <a href="http://misko.hevery.com/code-reviewers-guide/">write testable code</a>, we want to ensure that
+our data pipelines are written in a way that makes them easy to test. In general, you should try to break up complex pipelines
+into a number of function calls that perform a small set of operations on input PCollections and return one or more PCollections
+as a result. This makes it easy to swap in different PCollection implementations for testing and production runs.</p>
+<p>Let's look at an example that computes one iteration of the <a href="http://en.wikipedia.org/wiki/PageRank">PageRank</a> algorithm that
+is taken from one of Crunch's integration tests:</p>
+<div class="codehilite"><pre><span class="c1">// Each entry in the PTable represents a URL and its associated data for PageRank computations.</span>
+<span class="n">public</span> <span class="k">static</span> <span class="n">PTable</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span> <span class="n">PageRankData</span><span class="o">&gt;</span> <span class="n">pageRank</span><span class="p">(</span><span class="n">PTable</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span> <span class="n">PageRankData</span><span class="o">&gt;</span> <span class="k">input</span><span class="p">,</span> <span class="k">final</span> <span class="n">float</span> <span class="n">d</span><span class="p">)</span> <span class="p">{</span>
+  <span class="n">PTypeFamily</span> <span class="n">ptf</span> <span class="o">=</span> <span class="k">input</span><span class="p">.</span><span class="n">getTypeFamily</span><span class="p">();</span>
+
+  <span class="c1">// Compute the outbound page rank from each of the input pages.</span>
+  <span class="n">PTable</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span> <span class="n">Float</span><span class="o">&gt;</span> <span class="n">outbound</span> <span class="o">=</span> <span class="k">input</span><span class="p">.</span><span class="n">parallelDo</span><span class="p">(</span><span class="k">new</span> <span class="n">DoFn</span><span class="o">&lt;</span><span class="n">Pair</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span> <span class="n">PageRankData</span><span class="o">&gt;</span><span class="p">,</span> <span class="n">Pair</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span> <span class="n">Float</span><span class="o">&gt;&gt;</span><span class="p">()</span> <span class="p">{</span>
+    <span class="p">@</span><span class="n">Override</span>
+     <span class="n">public</span> <span class="k">void</span> <span class="n">process</span><span class="p">(</span><span class="n">Pair</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span> <span class="n">PageRankData</span><span class="o">&gt;</span> <span class="k">input</span><span class="p">,</span> <span class="n">Emitter</span><span class="o">&lt;</span><span class="n">Pair</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span> <span class="n">Float</span><span class="o">&gt;&gt;</span> <span class="n">emitter</span><span class="p">)</span> <span class="p">{</span>
+     <span class="n">PageRankData</span> <span class="n">prd</span> <span class="o">=</span> <span class="k">input</span><span class="p">.</span><span class="n">second</span><span class="p">();</span>
+      <span class="k">for</span> <span class="p">(</span><span class="n">String</span> <span class="n">link</span> <span class="o">:</span> <span class="n">prd</span><span class="p">.</span><span class="n">urls</span><span class="p">)</span> <span class="p">{</span>
+        <span class="n">emitter</span><span class="p">.</span><span class="n">emit</span><span class="p">(</span><span class="n">Pair</span><span class="p">.</span><span class="n">of</span><span class="p">(</span><span class="n">link</span><span class="p">,</span> <span class="n">prd</span><span class="p">.</span><span class="n">propagatedScore</span><span class="p">()));</span>
+      <span class="p">}</span>
+    <span class="p">}</span>
+  <span class="p">},</span> <span class="n">ptf</span><span class="p">.</span><span class="n">tableOf</span><span class="p">(</span><span class="n">ptf</span><span class="p">.</span><span class="n">strings</span><span class="p">(),</span> <span class="n">ptf</span><span class="p">.</span><span class="n">floats</span><span class="p">()));</span>
+
+  <span class="c1">// Update the PageRank for each URL.</span>
+  <span class="k">return</span> <span class="k">input</span><span class="p">.</span><span class="n">cogroup</span><span class="p">(</span><span class="n">outbound</span><span class="p">).</span><span class="n">mapValues</span><span class="p">(</span>
+      <span class="k">new</span> <span class="n">MapFn</span><span class="o">&lt;</span><span class="n">Pair</span><span class="o">&lt;</span><span class="n">Collection</span><span class="o">&lt;</span><span class="n">PageRankData</span><span class="o">&gt;</span><span class="p">,</span> <span class="n">Collection</span><span class="o">&lt;</span><span class="n">Float</span><span class="o">&gt;&gt;</span><span class="p">,</span> <span class="n">PageRankData</span><span class="o">&gt;</span><span class="p">()</span> <span class="p">{</span>
+        <span class="p">@</span><span class="n">Override</span>
+        <span class="n">public</span> <span class="n">PageRankData</span> <span class="n">map</span><span class="p">(</span><span class="n">Pair</span><span class="o">&lt;</span><span class="n">Collection</span><span class="o">&lt;</span><span class="n">PageRankData</span><span class="o">&gt;</span><span class="p">,</span> <span class="n">Collection</span><span class="o">&lt;</span><span class="n">Float</span><span class="o">&gt;&gt;</span> <span class="k">input</span><span class="p">)</span> <span class="p">{</span>
+          <span class="n">PageRankData</span> <span class="n">prd</span> <span class="o">=</span> <span class="n">Iterables</span><span class="p">.</span><span class="n">getOnlyElement</span><span class="p">(</span><span class="k">input</span><span class="p">.</span><span class="n">first</span><span class="p">());</span>
+          <span class="n">Collection</span><span class="o">&lt;</span><span class="n">Float</span><span class="o">&gt;</span> <span class="n">propagatedScores</span> <span class="o">=</span> <span class="k">input</span><span class="p">.</span><span class="n">second</span><span class="p">();</span>
+          <span class="n">float</span> <span class="n">sum</span> <span class="o">=</span> <span class="mf">0.0f</span><span class="p">;</span>
+          <span class="k">for</span> <span class="p">(</span><span class="n">Float</span> <span class="n">s</span> <span class="o">:</span> <span class="n">propagatedScores</span><span class="p">)</span> <span class="p">{</span>
+            <span class="n">sum</span> <span class="o">+=</span> <span class="n">s</span><span class="p">;</span>
+          <span class="p">}</span>
+          <span class="k">return</span> <span class="n">prd</span><span class="p">.</span><span class="n">next</span><span class="p">(</span><span class="n">d</span> <span class="o">+</span> <span class="p">(</span><span class="mf">1.0f</span> <span class="o">-</span> <span class="n">d</span><span class="p">)</span> <span class="o">*</span> <span class="n">sum</span><span class="p">);</span>
+        <span class="p">}</span>
+      <span class="p">},</span> <span class="k">input</span><span class="p">.</span><span class="n">getValueType</span><span class="p">());</span>
+<span class="p">}</span>
+</pre></div>
+
+
+<p>By embedding our business logic inside of a static method that operates on PTables, we can easily unit test our PageRank
+computations that combine custom DoFns with Crunch's built-in <code>cogroup</code> operation by using the <a href="#mempipeline">MemPipeline</a>
+implementation to create test data sets that we can easily verify by hand, and then this same logic can be executed on
+a distributed data set using either the <a href="#mrpipeline">MRPipeline</a> or <a href="#sparkpipeline">SparkPipeline</a> implementations.</p>
         </div> <!-- /span -->
 
       </div> <!-- /row-fluid -->