You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/01/20 21:08:12 UTC

svn commit: r1559821 - /crunch/site/trunk/content/user-guide.mdtext

Author: jwills
Date: Mon Jan 20 20:08:12 2014
New Revision: 1559821

URL: http://svn.apache.org/r1559821
Log:
Add section on unit testing

Modified:
    crunch/site/trunk/content/user-guide.mdtext

Modified: crunch/site/trunk/content/user-guide.mdtext
URL: http://svn.apache.org/viewvc/crunch/site/trunk/content/user-guide.mdtext?rev=1559821&r1=1559820&r2=1559821&view=diff
==============================================================================
--- crunch/site/trunk/content/user-guide.mdtext (original)
+++ crunch/site/trunk/content/user-guide.mdtext Mon Jan 20 20:08:12 2014
@@ -61,6 +61,7 @@ Notice:   Licensed to the Apache Softwar
     1. [MRPipeline](#mrpipeline)
     1. [SparkPipeline](#sparkpipeline)
     1. [MemPipeline](#mempipeline)
+1. [Unit Testing Pipelines](#testing)
 
 <a name="intro"></a>
 ## Introduction to Crunch
@@ -1494,3 +1495,105 @@ on the read side. Often the best way to 
 `materialize()` method to get a reference to the contents of the in-memory collection and then verify them directly,
 without writing them out to disk.
 
+<a name="testing"></a>
+## Unit Testing Pipelines
+
+For production data pipelines, unit tests are an absolute must. The [MemPipeline](#mempipeline) implementation of the Pipeline
+interface has several tools to help developers create effective unit tests, which will be detailed in this section.
+
+### Unit Testing DoFns
+
+Many of the DoFn implementations, such as `MapFn` and `FilterFn`, are very easy to test, since they accept a single input
+and return a single output. For general purpose DoFns, we need an instance of the [Emitter](apidocs/0.9.0/org/apache/crunch/Emitter.html)
+interface that we can pass to the DoFn's `process` method and then read in the values that are written by the function. Support
+for this pattern is provided by the [InMemoryEmitter](apidocs/0.9.0/org/apache/crunch/impl/mem/emit/InMemoryEmitter.html) class, which
+has a `List<T> getOutput()` method that can be used to read the values that were passed to the Emitter instance by a DoFn instance:
+
+	@Test
+	public void testToUpperCaseFn() {
+	  InMemoryEmitter<String> emitter = new InMemoryEmitter<String>();
+	  new ToUpperCaseFn().process("input", emitter);
+	  assertEquals(ImmutableList.of("INPUT"), emitter.getOutput());
+	}
+
+
+### Testing Complex DoFns and Pipelines
+
+Many of the DoFns we write involve more complex processing that require that our DoFn be initialized and cleaned up, or that
+define Counters that we use to track the inputs that we receive. In order to ensure that our DoFns are working properly across
+their entire lifecycle, it's best to use the [MemPipeline](#mempipeline) implementation to create in-memory instances of
+PCollections and PTables that contain a small amount of test data and apply our DoFns to those PCollections to test their
+functionality. We can easily retrieve the contents of any in-memory PCollection by calling its `Iterable<T> materialize()`
+method, which will return immediately. We can also track the values of any Counters that were called as the DoFns were
+executed against the test data by calling the static `getCounters()` method on the MemPipeline instance, and reset
+those Counters between test runs by calling the static `clearCounters()` method:
+
+	public static class UpperCaseWithCounterFn extends DoFn<String, String> {
+	  @Override
+	  public void process(String input, Emitter<T> emitter) {
+	    String upper = input.toUpperCase();
+	    if (!upper.equals(input)) {
+	      increment("UpperCase", "modified");
+	    }
+	    emitter.emit(upper);
+	  }
+	}
+	
+	@Before
+	public void setUp() throws Exception {
+	  MemPipeline.clearCounters();
+	}
+	
+	@Test
+	public void testToUpperCase_WithPipeline() {
+	  PCollection<String> inputStrings = MemPipeline.collectionOf("a", "B", "c");
+	  PCollection<String> upperCaseStrings = inputStrings.parallelDo(new ToUpperCaseFn(), Writables.strings());
+	  assertEquals(ImmutableList.of("A", "B", "C"), Lists.newArrayList(upperCaseStrings.materialize()));
+	  assertEquals(2L, MemPipeline.getCounters().findCounter("UpperCase", "modified").getValue());
+	}
+
+### Designing Testable Data Pipelines
+
+In the same way that we try to [write testable code](http://misko.hevery.com/code-reviewers-guide/), we want to ensure that
+our data pipelines are written in a way that makes them easy to test. In general, you should try to break up complex pipelines
+into a number of function calls that perform a small set of operations on input PCollections and return one or more PCollections
+as a result. This makes it easy to swap in different PCollection implementations for testing and production runs.
+
+Let's look at an example that computes one iteration of the [PageRank](http://en.wikipedia.org/wiki/PageRank) algorithm that
+is taken from one of Crunch's integration tests:
+
+	// Each entry in the PTable represents a URL and its associated data for PageRank computations.
+	public static PTable<String, PageRankData> pageRank(PTable<String, PageRankData> input, final float d) {
+	  PTypeFamily ptf = input.getTypeFamily();
+	
+	  // Compute the outbound page rank from each of the input pages.
+	  PTable<String, Float> outbound = input.parallelDo(new DoFn<Pair<String, PageRankData>, Pair<String, Float>>() {
+	    @Override
+	     public void process(Pair<String, PageRankData> input, Emitter<Pair<String, Float>> emitter) {
+ 	     PageRankData prd = input.second();
+	      for (String link : prd.urls) {
+	        emitter.emit(Pair.of(link, prd.propagatedScore()));
+	      }
+	    }
+	  }, ptf.tableOf(ptf.strings(), ptf.floats()));
+	
+	  // Update the PageRank for each URL.
+	  return input.cogroup(outbound).mapValues(
+	      new MapFn<Pair<Collection<PageRankData>, Collection<Float>>, PageRankData>() {
+	        @Override
+	        public PageRankData map(Pair<Collection<PageRankData>, Collection<Float>> input) {
+	          PageRankData prd = Iterables.getOnlyElement(input.first());
+	          Collection<Float> propagatedScores = input.second();
+	          float sum = 0.0f;
+	          for (Float s : propagatedScores) {
+	            sum += s;
+	          }
+	          return prd.next(d + (1.0f - d) * sum);
+	        }
+	      }, input.getValueType());
+	}
+
+By embedding our business logic inside of a static method that operates on PTables, we can easily unit test our PageRank
+computations that combine custom DoFns with Crunch's built-in `cogroup` operation by using the [MemPipeline](#mempipeline)
+implementation to create test data sets that we can easily verify by hand, and then this same logic can be executed on
+a distributed data set using either the [MRPipeline](#mrpipeline) or [SparkPipeline](#sparkpipeline) implementations.