You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/01/20 21:08:12 UTC
svn commit: r1559821 - /crunch/site/trunk/content/user-guide.mdtext
Author: jwills
Date: Mon Jan 20 20:08:12 2014
New Revision: 1559821
URL: http://svn.apache.org/r1559821
Log:
Add section on unit testing
Modified:
crunch/site/trunk/content/user-guide.mdtext
Modified: crunch/site/trunk/content/user-guide.mdtext
URL: http://svn.apache.org/viewvc/crunch/site/trunk/content/user-guide.mdtext?rev=1559821&r1=1559820&r2=1559821&view=diff
==============================================================================
--- crunch/site/trunk/content/user-guide.mdtext (original)
+++ crunch/site/trunk/content/user-guide.mdtext Mon Jan 20 20:08:12 2014
@@ -61,6 +61,7 @@ Notice: Licensed to the Apache Softwar
1. [MRPipeline](#mrpipeline)
1. [SparkPipeline](#sparkpipeline)
1. [MemPipeline](#mempipeline)
+1. [Unit Testing Pipelines](#testing)
<a name="intro"></a>
## Introduction to Crunch
@@ -1494,3 +1495,105 @@ on the read side. Often the best way to
`materialize()` method to get a reference to the contents of the in-memory collection and then verify them directly,
without writing them out to disk.
+<a name="testing"></a>
+## Unit Testing Pipelines
+
+For production data pipelines, unit tests are an absolute must. The [MemPipeline](#mempipeline) implementation of the Pipeline
+interface has several tools to help developers create effective unit tests, which will be detailed in this section.
+
+### Unit Testing DoFns
+
+Many of the DoFn implementations, such as `MapFn` and `FilterFn`, are very easy to test, since they accept a single input
+and return a single output. For general purpose DoFns, we need an instance of the [Emitter](apidocs/0.9.0/org/apache/crunch/Emitter.html)
+interface that we can pass to the DoFn's `process` method and then read in the values that are written by the function. Support
+for this pattern is provided by the [InMemoryEmitter](apidocs/0.9.0/org/apache/crunch/impl/mem/emit/InMemoryEmitter.html) class, which
+has a `List<T> getOutput()` method that can be used to read the values that were passed to the Emitter instance by a DoFn instance:
+
+ @Test
+ public void testToUpperCaseFn() {
+ InMemoryEmitter<String> emitter = new InMemoryEmitter<String>();
+ new ToUpperCaseFn().process("input", emitter);
+ assertEquals(ImmutableList.of("INPUT"), emitter.getOutput());
+ }
+
+
+### Testing Complex DoFns and Pipelines
+
+Many of the DoFns we write involve more complex processing that require that our DoFn be initialized and cleaned up, or that
+define Counters that we use to track the inputs that we receive. In order to ensure that our DoFns are working properly across
+their entire lifecycle, it's best to use the [MemPipeline](#mempipeline) implementation to create in-memory instances of
+PCollections and PTables that contain a small amount of test data and apply our DoFns to those PCollections to test their
+functionality. We can easily retrieve the contents of any in-memory PCollection by calling its `Iterable<T> materialize()`
+method, which will return immediately. We can also track the values of any Counters that were called as the DoFns were
+executed against the test data by calling the static `getCounters()` method on the MemPipeline instance, and reset
+those Counters between test runs by calling the static `clearCounters()` method:
+
+ public static class UpperCaseWithCounterFn extends DoFn<String, String> {
+ @Override
+ public void process(String input, Emitter<T> emitter) {
+ String upper = input.toUpperCase();
+ if (!upper.equals(input)) {
+ increment("UpperCase", "modified");
+ }
+ emitter.emit(upper);
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ MemPipeline.clearCounters();
+ }
+
+ @Test
+ public void testToUpperCase_WithPipeline() {
+ PCollection<String> inputStrings = MemPipeline.collectionOf("a", "B", "c");
+ PCollection<String> upperCaseStrings = inputStrings.parallelDo(new ToUpperCaseFn(), Writables.strings());
+ assertEquals(ImmutableList.of("A", "B", "C"), Lists.newArrayList(upperCaseStrings.materialize()));
+ assertEquals(2L, MemPipeline.getCounters().findCounter("UpperCase", "modified").getValue());
+ }
+
+### Designing Testable Data Pipelines
+
+In the same way that we try to [write testable code](http://misko.hevery.com/code-reviewers-guide/), we want to ensure that
+our data pipelines are written in a way that makes them easy to test. In general, you should try to break up complex pipelines
+into a number of function calls that perform a small set of operations on input PCollections and return one or more PCollections
+as a result. This makes it easy to swap in different PCollection implementations for testing and production runs.
+
+Let's look at an example that computes one iteration of the [PageRank](http://en.wikipedia.org/wiki/PageRank) algorithm that
+is taken from one of Crunch's integration tests:
+
+ // Each entry in the PTable represents a URL and its associated data for PageRank computations.
+ public static PTable<String, PageRankData> pageRank(PTable<String, PageRankData> input, final float d) {
+ PTypeFamily ptf = input.getTypeFamily();
+
+ // Compute the outbound page rank from each of the input pages.
+ PTable<String, Float> outbound = input.parallelDo(new DoFn<Pair<String, PageRankData>, Pair<String, Float>>() {
+ @Override
+ public void process(Pair<String, PageRankData> input, Emitter<Pair<String, Float>> emitter) {
+ PageRankData prd = input.second();
+ for (String link : prd.urls) {
+ emitter.emit(Pair.of(link, prd.propagatedScore()));
+ }
+ }
+ }, ptf.tableOf(ptf.strings(), ptf.floats()));
+
+ // Update the PageRank for each URL.
+ return input.cogroup(outbound).mapValues(
+ new MapFn<Pair<Collection<PageRankData>, Collection<Float>>, PageRankData>() {
+ @Override
+ public PageRankData map(Pair<Collection<PageRankData>, Collection<Float>> input) {
+ PageRankData prd = Iterables.getOnlyElement(input.first());
+ Collection<Float> propagatedScores = input.second();
+ float sum = 0.0f;
+ for (Float s : propagatedScores) {
+ sum += s;
+ }
+ return prd.next(d + (1.0f - d) * sum);
+ }
+ }, input.getValueType());
+ }
+
+By embedding our business logic inside of a static method that operates on PTables, we can easily unit test our PageRank
+computations that combine custom DoFns with Crunch's built-in `cogroup` operation by using the [MemPipeline](#mempipeline)
+implementation to create test data sets that we can easily verify by hand, and then this same logic can be executed on
+a distributed data set using either the [MRPipeline](#mrpipeline) or [SparkPipeline](#sparkpipeline) implementations.