You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/04/27 22:58:40 UTC

svn commit: r1097239 - in /pig/trunk/src/docs/src/documentation/content/xdocs: basic.xml cont.xml images/penny-archt.jpg test.xml udf.xml

Author: daijy
Date: Wed Apr 27 20:58:39 2011
New Revision: 1097239

URL: http://svn.apache.org/viewvc?rev=1097239&view=rev
Log:
PIG-1772: Pig 090 Documentation (pig-1772-beta2-2.patch)

Added:
    pig/trunk/src/docs/src/documentation/content/xdocs/images/penny-archt.jpg   (with props)
Modified:
    pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
    pig/trunk/src/docs/src/documentation/content/xdocs/cont.xml
    pig/trunk/src/docs/src/documentation/content/xdocs/test.xml
    pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml?rev=1097239&r1=1097238&r2=1097239&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml Wed Apr 27 20:58:39 2011
@@ -7357,7 +7357,7 @@ STORE Y INTO 'byuser';
    <table>
       <tr> 
             <td>
-               <p>DEFINE alias {function | [`command` [input] [output] [ship] [cache]] };</p>
+               <p>DEFINE alias {function | [`command` [input] [output] [ship] [cache] [stderr] ] };</p>
             </td>
          </tr> 
    </table>
@@ -7390,6 +7390,12 @@ STORE Y INTO 'byuser';
             <td>
             <p>For use with streaming.</p>
                <p>A command, including the arguments, enclosed in back tics (where a command is anything that can be executed).</p>
+               <p>The clauses (input, output, ship, cache, stderr) are described below. Note the following:</p>
+               <ul>
+                  <li>All clauses are optional.</li>
+				  <li>The clauses can be specified in any order (for example, stderr can appear before input)</li>
+				  <li>Each clause can be specified at most once (for example, multiple inputs are not allowed)</li>
+				</ul>
             </td>
          </tr>
          <tr>
@@ -7476,6 +7482,18 @@ STORE Y INTO 'byuser';
                </ul>
             </td>
          </tr> 
+         <tr>
+            <td>
+               <p>stderr</p>
+            </td>
+           <td>
+            <p>For use with streaming.</p>
+            <p>STDERR( '/dir') or STDERR( '/dir' LIMIT n)</p>
+             <p>Where:</p>
+             <ul><li>'/dir' is the log directory, enclosed in single quotes.</li></ul>
+             <ul><li>(optional) LIMIT n is the error threshold where n is an integer value. If not specified, the default error threshold is unlimited.</li></ul>
+            </td>
+         </tr>
    </table></section>
    
    <section>

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/cont.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/cont.xml?rev=1097239&r1=1097238&r2=1097239&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/cont.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/cont.xml Wed Apr 27 20:58:39 2011
@@ -335,35 +335,42 @@ else:
 
 <section>
 <title>Convergence</title>
-<p>There is a whole class of problems that involve iterating over a pipeline an indeterminate number of times. Examples include running until values or error converge, graph traversal algorithms, etc. while and for loops in the host language along with the ability to test the output of a pipeline and either run it again or move to the next step will enable users to satisfy this use case. </p>
+<p>There is a class of problems that involve iterating over a data pipeline an indeterminate number of times until a certain value is reached. Examples arise in machine learning, graph traversal, and a host of numerical analysis problems which involve finding interpolations, extrapolations or regressions. The Python example below shows one way to achieve convergence using Pig scripts.</p>
 
 <source>
-P = Pig.compile("""A = load '$in’;
-                   C = group A by user;
-                   D = foreach C generate group, myUDF(A);
-                   store D into '$out’;
-                   F = group D all;
-                   G = foreach F generate MAX(D);
-                   store G into 'tmp’;
+#!/usr/bin/python
+
+# explicitly import Pig class
+from org.apache.pig.scripting import Pig
+
+P = Pig.compile("""A = load '$input' as (user, age, gpa);
+                   B = group A all;
+                   C = foreach B generate AVG(A.gpa);
+                   store C into '$output';
                 """)
-error = 100.0
-input = "original”
-output = "output-0”
-final = "final-output”
-
-for i in range(1, 100):
-    p.bind({'in':input, 'out':output}) # attaches $in, $out in Pig Latin to input, output Python variables
-    results = p.runSingle()
-    
-    if results.isSuccessful() = "FAILED":
+# initial output
+input = "studenttab5"
+output = "output-5"
+final = "final-output"
+
+for i in range(1, 4):
+    Q = P.bind({'input':input, 'output':output}) # attaches $input, $output in Pig Latin to input, output Python variable
+    results = Q.runSingle()
+
+    if results.isSuccessful() == "FAILED":
         raise "Pig job failed"
-    iter = results.getResults("G")
-    if iter.next() > 1:
-        input = output
-        output = "output-" + i.to_s
-    else:
-        H = Pig.fs("mv " + output + " " + final)
-        break
+    iter = results.result("C").iterator()
+    if iter.hasNext():
+        tuple = iter.next()
+        value = tuple.get(0)
+        if float(str(value)) &lt; 3:
+            print "value: " + str(value)
+            input = "studenttab" + str(i+5)
+            output = "output-" + str(i+5)
+            print "output: " + output
+        else:
+           Pig.fs("mv " + output + " " + final)
+           break
 </source>
 </section>
 

Added: pig/trunk/src/docs/src/documentation/content/xdocs/images/penny-archt.jpg
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/images/penny-archt.jpg?rev=1097239&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/src/docs/src/documentation/content/xdocs/images/penny-archt.jpg
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/test.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/test.xml?rev=1097239&r1=1097238&r2=1097239&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/test.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/test.xml Wed Apr 27 20:58:39 2011
@@ -123,7 +123,7 @@ C = FOREACH B { 
 DESCRIBE C::D; 
 D: {age: bytearray} 
 </source>
-   </section></section>
+</section></section>
    
  <!-- +++++++++++++++++++++++++++++++++++++++ -->   
  <section>
@@ -209,7 +209,7 @@ DUMP B;
                <p>–script</p>
             </td>
             <td>
-               <p>Use to specify a pig script.</p>
+               <p>Use to specify a Pig script.</p>
             </td>
          </tr>      
 
@@ -271,8 +271,6 @@ DUMP B;
                <p>The name of a relation.</p>
             </td>
          </tr>
-         
-    
    </table></section>
    
    <section>
@@ -501,7 +499,6 @@ grunt> illustrate -script visits.pig
 </source>
 
 </section>
-
 </section>
 
 <!-- =========================================================================== -->
@@ -515,7 +512,7 @@ JobId Maps Reduces MaxMapTime MinMapTIme
 job_201004271216_12712 1 1 3 3 3 12 12 12 B,C GROUP_BY,COMBINER
 job_201004271216_12713 1 1 3 3 3 12 12 12 D SAMPLER
 job_201004271216_12714 1 1 3 3 3 12 12 12 D ORDER_BY,COMBINER 
-    hdfs://wilbur20.labs.corp.sp1.yahoo.com:9020/tmp/temp743703298/tmp-2019944040,
+    hdfs://mymachine.com:9020/tmp/temp743703298/tmp-2019944040,
 </source>
 
 </section>
@@ -529,6 +526,7 @@ job_201004271216_12714 1 1 3 3 3 12 12 1
 
 <p>The new Pig statistics and the existing Hadoop statistics can also be accessed via the Hadoop job history file (and job xml file). Piggybank has a HadoopJobHistoryLoader which acts as an example of using Pig itself to query these statistics (the loader can be used as a reference implementation but is NOT supported for production use).</p>
 
+<!-- +++++++++++++++++++++++++++++++++++++++ -->
 <section>
 <title>Java API</title>
 
@@ -573,10 +571,9 @@ public interface PigProgressNotification
     public void launchCompletedNotification(int numJobsSucceeded);
 }
 </source>
-
-
 </section>
 
+<!-- +++++++++++++++++++++++++++++++++++++++ -->
 <section>
 <title>Job XML</title>
 <p>The following entries are included in job conf: </p>
@@ -689,6 +686,8 @@ public interface PigProgressNotification
 </table>
 </section>
 
+
+<!-- +++++++++++++++++++++++++++++++++++++++ -->
 <section>
 <title>Hadoop Job History Loader</title>
 <p>The HadoopJobHistoryLoader in Piggybank loads Hadoop job history files and job xml files from file system. For each MapReduce job, the loader produces a tuple with schema (j:map[], m:map[], r:map[]). The first map in the schema contains job-related entries. Here are some of important key names in the map: </p>
@@ -724,7 +723,7 @@ public interface PigProgressNotification
 <p>Examples that use the loader to query Pig statistics are shown below.</p>
 </section>
 
-
+<!-- +++++++++++++++++++++++++++++++++++++++ -->
 <section>
 <title>Examples</title>
 <p>Find scripts that generate more then three MapReduce jobs:</p>
@@ -786,6 +785,7 @@ dump e;
         No cluster set up is required if you run Pig in local mode.
       </p>
 
+<!-- +++++++++++++++++++++++++++++++++++++++ -->
     <section>
       <title>Build PigUnit</title>
       <p>To compile PigUnit run the command shown below from the Pig trunk. The compile will create the pigunit.jar file.</p>
@@ -793,7 +793,8 @@ dump e;
 $pig_trunk ant pigunit-jar   
 </source>
     </section>
-    
+ 
+ <!-- +++++++++++++++++++++++++++++++++++++++ -->   
       <section>
       <title>Run PigUnit</title>
       <p>You can run PigUnit using Pig's local mode or mapreduce mode.</p>
@@ -806,15 +807,16 @@ $pig_trunk ant pigunit-jar   
       </p>
     </section>
 
+<!-- +++++++++++++++++++++++++++++++++++++++ -->
     <section>
       <title>Mapreduce Mode</title>
       <p>PigUnit also runs in Pig's mapreduce mode. Mapreduce mode requires you to use a Hadoop cluster and HDFS installation.
         It is enabled when the Java system property pigunit.exectype.cluster is set to any value: e.g. -Dpigunit.exectype.cluster=true or System.getProperties().setProperty("pigunit.exectype.cluster", "true"). The cluster you select must be specified in the CLASSPATH (similar to the HADOOP_CONF_DIR variable). 
       </p>
     </section>
-
     </section>
 
+<!-- +++++++++++++++++++++++++++++++++++++++ -->
     <section>
       <title>PigUnit Example</title>
       
@@ -834,6 +836,7 @@ $pig_trunk ant pigunit-jar   
         script that will be compared to the actual result of the execution of the Pig script. 
       </p>
 
+<!-- +++++++++++++++++++++++++++++++++++++++ -->
       <section>
         <title>Java Test</title>
         <source>
@@ -863,8 +866,9 @@ $pig_trunk ant pigunit-jar   
     test.assertOutput("data", input, "queries_limit", output);
   }
 </source>
-      </section>
+ </section>
 
+<!-- +++++++++++++++++++++++++++++++++++++++ -->
       <section>
         <title>top_queries.pig</title>
         <source>
@@ -891,8 +895,9 @@ queries_limit =
 
 STORE queries_limit INTO 'output';
 </source>
-      </section>
+</section>
 
+<!-- +++++++++++++++++++++++++++++++++++++++ -->
       <section>
         <title>Run</title>
 
@@ -919,7 +924,7 @@ junit.framework.ComparisonFailure: null 
       </section>
     </section>
 
-
+<!-- +++++++++++++++++++++++++++++++++++++++ -->
     <section>
       <title>Troubleshooting Tips</title>
       <p>Common problems you may encounter are discussed below.</p>
@@ -938,6 +943,7 @@ If you plan to use local mode, please pu
 </source>
       </section>
 
+<!-- +++++++++++++++++++++++++++++++++++++++ -->
       <section>
         <title>UDF jars Not Found</title>
         <p>This error means that you are missing some jars in your test environment.</p>
@@ -947,6 +953,7 @@ org.apache.pig.piggybank.evaluation.stri
 </source>
       </section>
 
+<!-- +++++++++++++++++++++++++++++++++++++++ -->
       <section>
         <title>Storing Data</title>
         <p>Pig currently drops all STORE and DUMP commands. You can tell PigUnit to keep the
@@ -958,6 +965,7 @@ test.runScript();
 </source>
       </section>
 
+<!-- +++++++++++++++++++++++++++++++++++++++ -->
       <section>
         <title>Cache Archive</title>
         <p>For cache archive to work, your test environment needs to have the cache archive options
@@ -968,6 +976,7 @@ test.runScript();
       </section>
     </section>
 
+<!-- +++++++++++++++++++++++++++++++++++++++ -->
     <section>
       <title>Future Enhancements</title>
       <p>Improvements and other components based on PigUnit that could be built later.</p>
@@ -980,6 +989,245 @@ test.runScript();
       </ol>
     </section>
     </section>
+    
+    
+    
+<!-- =========================================================================== -->
+<!-- PENNY -->    
+
+  <section>
+      <title>Penny</title>
+      <p><strong>Note:</strong> <em>Penny is an experimental feature.</em></p>
+      <p></p>
+      <p>Penny is a framework for creating Pig monitoring and debugging tools. Penny comes with a library of tools (see <a href="http://wiki.apache.org/pig/PennyToolLibrary">Penny Tool Library</a>). However, the real power of Penny is in creating your own custom monitoring and debugging tools using Penny's simple API.</p>
+
+<!-- +++++++++++++++++++++++++++++++++++++++++++++++++-->    
+<section>
+<title>How it Works</title>
+<p>Before you can create a tool, you need to understand how Penny instruments Pig scripts (called "dataflow programs" in the following diagram).</p>
 
+<figure src="images/penny-archt.jpg" align="left" alt="Penny Architecture"/>
+
+<p>As shown in the diagram, Penny inserts one or more monitor agents (called "Penny agent" in the diagram) between steps of the Pig script, which observe data flowing between the Pig script steps. Monitor agents run arbitrary Java code as needed for your tool, which has access to some primitives for tagging records and communicating with other agents and with a central coordinator process (called "Penny coordinator" in the diagram). The coordinator also runs arbitrary code defined by your tool. </p>
+
+<p>The whole thing is kicked off by the tool's Main program (called "application" in the diagram), which receives instructions from the user (e.g. "please figure out why this Pig script keeps crashing"), launches one or more runs of the Pig script instrumented with monitor agents, and reports the outcome back to the user (e.g. "the crash appears to be caused by one of these records: ..."). </p>
+</section>
+
+<!-- +++++++++++++++++++++++++++++++++++++++++++++++++-->          
+<section>
+<title>API</title>
+<p>You need to write three Java classes: a Main class, a Coordinator class, and a MonitorAgent class (for certain, fancy tools, you may need multiple MonitorAgent classes). You can find many examples of Main/Coordinator/MonitorAgent classes that define Penny tools in the Penny source code (<a href="http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/">/pig/trunk/contrib/penny/java/src/main/java/</a>) under org.apache.pig.penny.apps. All of the tools described in <a href="http://wiki.apache.org/pig/PennyToolLibrary">Penny Tool Library</a> are written using this API, so you've got plenty of examples to work with. We'll paste a few code fragments below to get you going -- in fact the entire code for the "data samples" tool (all 97 lines of Java) is included below.</p>
+   
+      <!-- +++++++++++++++++++++++++++++++++++++++++++++++++-->       
+      <section>
+      <title>Main Class</title>
+      <p>Your Main class is the "shell" of your application. It receives instructions from the user, and configures and launches one or more Penny-instrumented runs of the user's Pig script. </p>
+      
+      <p>You talk to Penny via the PennyServer class. You can do two things: (1) parse a user's Pig script and (2) launch an Penny-instrumented run of the Pig script. Here is the Main class for the data samples tool, described in <a href="http://wiki.apache.org/pig/PennyToolLibrary">Penny Tool Library</a>:</p>
+      
+      <source>
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pig.penny.ClassWithArgs;
+import org.apache.pig.penny.ParsedPigScript;
+import org.apache.pig.penny.PennyServer;
+
+/**
+ * Data samples app.
+ */
+public class Main {
+    public static void main(String[] args) throws Exception {
+        PennyServer pennyServer = new PennyServer();
+        String pigScriptFilename = args[0];
+        ParsedPigScript parsedPigScript = pennyServer.parse(pigScriptFilename);
+        Map&lt;String, ClassWithArgs&gt; monitorClasses = new HashMap&lt;String, ClassWithArgs&gt;();
+        for (String alias : parsedPigScript.aliases()) {
+            monitorClasses.put(alias, new ClassWithArgs(DSMonitorAgent.class));
+        }
+        parsedPigScript.trace(DSCoordinator.class, monitorClasses);
+    }
+}      
+</source>
+    
+<p>The "monitorClasses" map dictates which monitor agent (if any) to place after each dataflow step (steps are identified by Pig script aliases). You can also pass arguments to each monitor agent, and/or to the coordinator, as shown in this example for the "data histograms" tool: </p>
+    
+<source>
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.pig.penny.ClassWithArgs;
+import org.apache.pig.penny.ParsedPigScript;
+import org.apache.pig.penny.PennyServer;
+
+/**
+ * Data summaries app. that computes a histogram of one of the fields of one of the intermediate data sets.
+ */
+
+public class Main {
+    public static void main(String[] args) throws Exception {
+        PennyServer pennyServer = new PennyServer();
+        String pigScriptFilename = args[0];
+        ParsedPigScript parsedPigScript = pennyServer.parse(pigScriptFilename);
+        String alias = args[1]; // which alias to create histogram for
+        int fieldNo = Integer.parseInt(args[2]); // which field to create histogram for
+        int min = Integer.parseInt(args[3]); // min field value
+        int max = Integer.parseInt(args[4]); // max field value
+        int bucketSize = Integer.parseInt(args[5]); // histogram bucket size
+        if (!parsedPigScript.aliases().contains(alias)) throw new IllegalArgumentException("No such alias.");
+        Map&lt;String, ClassWithArgs&gt; monitorClasses = new HashMap&lt;String, ClassWithArgs&gt;();
+        monitorClasses.put(alias, new ClassWithArgs(DHMonitorAgent.class, fieldNo, min, max, bucketSize));
+        TreeMap&lt;Integer, Integer&gt; histogram = (TreeMap&lt;Integer, Integer&gt;) parsedPigScript.trace(DHCoordinator.class, monitorClasses);
+        System.out.println("Histogram: " + histogram);
+    }
+}    
+</source>
+</section>
+
+      <!-- +++++++++++++++++++++++++++++++++++++++++++++++++-->     
+      <section>
+      <title>MonitorAgent Class</title>
+      <p>Monitor agents implement the following API: </p>
+      <source>
+ /**
+  * Furnish set of fields to monitor. (Null means monitor all fields ('*').)
+  * /
+ public abstract Set&lt;Integer&gt; furnishFieldsToMonitor(); /**
+  * Initialize, using any arguments passed from higher layer.
+  * /
+ public abstract void init(Serializable[] args);
+ /**
+  * Process a tuple that passes through the monitoring point.
+  *
+  * @param t   the tuple
+  * @param tag t's tags
+  * @return FILTER_OUT to remove the tuple from the data stream; 
+  *    NO_TAGS to let it pass through and not give it any tags; 
+  *    a set of tags to let it pass through and assign those tags
+  */
+ public abstract Set&lt;String&gt; observeTuple(Tuple t, Set&lt;String&gt; tags) throws ExecException;
+ /**
+  * Process an incoming (synchronous or asynchronous) message.
+  */
+ public abstract void receiveMessage(Location source, Tuple message);
+ /**
+  * No more tuples are going to pass through the monitoring point. Finish any ongoing processing.
+  */
+ public abstract void finish();      
+      </source>
+      
+      <p>Here's an example from the "data samples" tool:</p>
+      <source>
+import java.io.Serializable; import java.util.Set;
+
+import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.Tuple; 
+
+import org.apache.pig.penny.Location; import org.apache.pig.penny.MonitorAgent;
+
+public class DSMonitorAgent extends MonitorAgent {
+
+    private final static int NUM_SAMPLES = 5;
+    private int tupleCount = 0;
+    public void finish() { }
+    public Set&lt;Integer&gt; furnishFieldsToMonitor() {
+        return null;
+    }
+    public void init(Serializable[] args) { }
+    public Set&lt;String&gt; observeTuple(Tuple t, Set&lt;String&gt; tags) throws ExecException {
+        if (tupleCount++ &lt; NUM_SAMPLES) {
+            communicator().sendToCoordinator(t);
+        }
+        return tags;
+    }
+    public void receiveMessage(Location source, Tuple message) { }
+}      
+      </source>
+      
+      <p>Monitor agents have access to a "communicator" object, which is the gateway for sending messages to other agents or to the coordinator. The communicator API is: </p>
+      <source>
+  /**
+  * Find out my (physical) location.
+  * /
+ public abstract Location myLocation();
+ /**
+  * Send an message to the coordinator, asynchronously.
+  * /
+ public abstract void sendToCoordinator(Tuple message);
+ /**
+  * Send a message to immediate downstream neighbor(s), synchronously.
+  * If downstream neighbor(s) span a task boundary, all instances will receive it; otherwise only same-task instances will receive it.
+  * If there is no downstream neighbor, an exception will be thrown.
+  * /
+ public abstract void sendDownstream(Tuple message) throws NoSuchLocationException;
+ /**
+  * Send a message to immediate upstream neighbor(s), synchronously.
+  * If upstream neighbor(s) are non-existent or span a task boundary, an exception will be thrown.
+  * /
+ public abstract void sendUpstream(Tuple message) throws NoSuchLocationException;
+ /**
+  * Send a message to current/future instances of a given logical location.
+  * Instances that have already terminated will not receive the message (obviously).
+  * Instances that are currently executing will receive it asynchronously (or perhaps not at all, if they terminate before the message arrives).
+  * Instances that have not yet started will receive the message prior to beginning processing of tuples.
+  * /
+ public abstract void sendToAgents(LogicalLocation destination, Tuple message) throws NoSuchLocationException; 
+ // The following methods mirror the ones above, but take care of packaging a list of objects into a tuple (you're welcome!) ...
+ public void sendToCoordinator(Object ... message) {
+  . sendToCoordinator(makeTuple(message));
+ }
+ public void sendDownstream(Object ... message) throws NoSuchLocationException {
+  . sendDownstream(makeTuple(message));
+ }
+ public void sendUpstream(Object ... message) throws NoSuchLocationException {
+  . sendUpstream(makeTuple(message));
+ }
+ public void sendToAgents(LogicalLocation destination, Object ... message) throws NoSuchLocationException {
+  . sendToAgents(destination, makeTuple(message));
+ }     
+</source>
+</section>
+
+      <!-- +++++++++++++++++++++++++++++++++++++++++++++++++-->  
+      <section>
+      <title>Coordinator Class</title>
+      <p>Your tool's coordinator implements the following API: </p>
+      <source>
+ /**
+  * Initialize, using any arguments passed from higher layer.
+  * /
+ public abstract void init(Serializable[] args);
+ /**
+  * Process an incoming (synchronous or asynchronous) message.
+  * /
+ public abstract void receiveMessage(Location source, Tuple message); /**
+  * The data flow has completed and all messages have been delivered. Finish processing.
+  *   * @return              final output to pass back to application
+  * /
+ public abstract Object finish();      
+      </source>
+      
+      <p>The coordinator for the "data samples" tool is: </p>
+<source>
+import java.io.Serializable;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.penny.Coordinator;
+import org.apache.pig.penny.Location;
+public class DSCoordinator extends Coordinator {
+    public void init(Serializable[] args) { }
+    public Object finish() {
+        return null;
+    }
+    public void receiveMessage(Location source, Tuple message) {
+        System.out.println("*** SAMPLE RECORD AT ALIAS " + source.logId() + ": " + truncate(message));
+    }
+    private String truncate(Tuple t) {
+        String s = t.toString();
+        return s.substring(0, Math.min(s.length(), 100));
+    }
+}      
+</source>
+</section>
+</section>
+</section>
+   
 </body>
 </document>

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml?rev=1097239&r1=1097238&r2=1097239&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml Wed Apr 27 20:58:39 2011
@@ -199,6 +199,7 @@ public interface Algebraic{
 <section>
 <title> Filter Functions</title>
 <p>Filter functions are eval functions that return a <code>boolean</code> value. Filter functions can be used anywhere a Boolean expression is appropriate, including the <code>FILTER</code> operator or <code>bincond</code> expression. </p>
+
 <p>The example below uses the <code>IsEmpy</code> builtin filter function to implement joins. </p>
 
 <source>
@@ -220,7 +221,7 @@ A = LOAD 'student_data' AS (name: charar
 B = LOAD 'voter_data' AS (name: chararray, age: int, registration: chararay, contributions: float);
 C = COGROUP A BY name, B BY name;
 D = FOREACH C GENERATE group, flatten((IsEmpty(A) ? null : A)), flatten((IsEmpty(B) ? null : B));
-dump D
+dump D;
 </source>
 
 <p>The implementation of the <code>IsEmpty</code> function looks like this: </p>
@@ -228,32 +229,38 @@ dump D
 <source>
 import java.io.IOException;
 import java.util.Map;
+
 import org.apache.pig.FilterFunc;
+import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.DataType;
 
+/**
+ * Determine whether a bag or map is empty.
+ */
 public class IsEmpty extends FilterFunc {
+
+    @Override
     public Boolean exec(Tuple input) throws IOException {
-        if (input == null || input.size() == 0)
-            return null;
         try {
             Object values = input.get(0);
             if (values instanceof DataBag)
                 return ((DataBag)values).size() == 0;
             else if (values instanceof Map)
                 return ((Map)values).size() == 0;
-            else{
-                throw new IOException("Cannot test a " +
-                    DataType.findTypeName(values) + " for emptiness.");
+            else {
+                int errCode = 2102;
+                String msg = "Cannot test a " +
+                DataType.findTypeName(values) + " for emptiness.";
+                throw new ExecException(msg, errCode, PigException.BUG);
             }
         } catch (ExecException ee) {
-            throw new IOException("Caught exception processing input row ", ee);   
+            throw ee;
         }
     }
-}
-
+} 
 </source>
 </section>
 
@@ -710,6 +717,31 @@ public class UPPER extends EvalFunc&lt;S
 </section>
 
 <section>
+	<title>Using Distributed Cache</title>
+	<p>Use getCacheFiles, an EvalFunc method, to return a list of HDFS files that need to be shipped to distributed cache. Inside EvalFunc, you can assume that these files already exist in distributed cache. For example:</p>
+<source>
+public class Udfcachetest extends EvalFunc&lt;String&gt; { 
+
+    public String exec(Tuple input) throws IOException { 
+        FileReader fr = new FileReader("./smallfile"); 
+        BufferedReader d = new BufferedReader(fr); 
+        return d.readLine(); 
+    } 
+
+    public List&lt;String&gt; getCacheFiles() { 
+        List&lt;String&gt; list = new ArrayList&lt;String&gt;(1); 
+        list.add("/user/pig/tests/data/small#smallfile"); 
+        return list; 
+    } 
+} 
+
+a = load '1.txt'; 
+b = foreach a generate Udfcachetest(*); 
+dump b;
+</source>
+</section>
+
+<section>
 <title>Import Lists</title>
 <p>An import list allows you to specify the package to which a UDF or a group of UDFs belong,
  eliminating the need to qualify the UDF on every call. An import list can be specified via the udf.import.list Java