You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ja...@apache.org on 2014/05/18 21:59:42 UTC

[04/14] DATAFU-44: Migrate Hourglass to Gradle

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/StagedOutputJob.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/StagedOutputJob.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/StagedOutputJob.java
new file mode 100644
index 0000000..5178133
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/StagedOutputJob.java
@@ -0,0 +1,665 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/**
+ * A derivation of {@link Job} that stages its output in another location and only
+ * moves it to the final destination if the job completes successfully.
+ * It also outputs a counters file to the file system that contains counters fetched from Hadoop
+ * and other task statistics.
+ */
+public class StagedOutputJob extends Job implements Callable<Boolean>
+{
+  private final String _stagingPrefix;
+  private final Logger _log;
+  private Path _countersPath;
+  private Path _countersParentPath;
+  private boolean _writeCounters = false;
+  
+  /**
+   * Creates a job which using a temporary staging location for the output data.
+   * The data is only copied to the final output directory on successful completion
+   * of the job.  This prevents existing output data from being overwritten unless
+   * the job completes successfully.
+   * 
+   * @param conf configuration
+   * @param jobName job name
+   * @param inputPaths input paths
+   * @param stagingLocation where to stage output temporarily
+   * @param outputPath output path
+   * @param log logger 
+   * @return job
+   */
+  public static StagedOutputJob createStagedJob(
+    Configuration conf,
+    String jobName,
+    List<String> inputPaths,
+    String stagingLocation,
+    String outputPath,
+    final Logger log)
+  {
+    final StagedOutputJob retVal;
+    try 
+    {
+      retVal = new StagedOutputJob(conf, stagingLocation, log);
+      retVal.setJobName(jobName);
+      retVal.setJarByClass(getCallersClass());
+      FileInputFormat.setInputPathFilter(retVal, HiddenFilePathFilter.class);
+    }
+    catch (IOException e) 
+    {
+      log.error("IOException when making a job", e);
+      throw new RuntimeException(e);
+    }
+
+    if (inputPaths != null)
+    {
+      try 
+      {
+        FileInputFormat.setInputPaths(
+          retVal,
+          StringUtils.join(inputPaths.iterator(),",")
+        );
+      }
+      catch (IOException e) 
+      {
+          log.error("Unable to set up input paths.", e);
+          throw new RuntimeException(e);
+      }
+    }
+            
+    FileOutputFormat.setOutputPath(retVal, new Path(outputPath));
+
+    return retVal;
+  }
+
+  /**
+   * Initializes the job.
+   * 
+   * @param conf configuration
+   * @param stagingPrefix where to stage output temporarily
+   * @param log logger
+   * @throws IOException
+   */
+  public StagedOutputJob(Configuration conf, String stagingPrefix, Logger log) throws IOException
+  {
+    super(conf);
+    this._stagingPrefix = stagingPrefix;
+    this._log = log;
+  }
+  
+  /**
+   * Gets path to store the counters.  If this is not set then by default the counters will be
+   * stored in the output directory.
+   * 
+   * @return path parent path for counters
+   */
+  public Path getCountersParentPath()
+  {
+    return _countersParentPath;
+  }
+   
+  /**
+   * Sets path to store the counters.  If this is not set then by default the counters will be
+   * stored in the output directory.
+   * 
+   * @param path parent path for counters
+   */
+  public void setCountersParentPath(Path path)
+  {
+    _countersParentPath = path;
+  }
+  
+  /**
+   * Path to written counters.
+   * 
+   * @return counters path
+   */
+  public Path getCountersPath()
+  {
+    return _countersPath;
+  }
+  
+  /**
+   * Get whether counters should be written.
+   * 
+   * @return true if counters should be written
+   */
+  public boolean getWriteCounters()
+  {
+    return _writeCounters;
+  }
+  
+  /**
+   * Sets whether counters should be written.
+   * 
+   * @param writeCounters true if counters should be written
+   */
+  public void setWriteCounters(boolean writeCounters)
+  {
+    this._writeCounters = writeCounters;
+  }
+
+  /**
+   * Run the job.
+   */
+  @Override
+  public Boolean call() throws Exception
+  {
+    try
+    {
+      boolean success = false;    
+      success = waitForCompletion(false);      
+      String jobId = "?";
+      
+      if (getJobID() != null)
+      {
+        jobId = String.format("job_%s_%d",getJobID().getJtIdentifier(), getJobID().getId());
+      }
+      
+      if (success)
+      {
+        _log.info(String.format("Job %s with ID %s succeeded! Tracking URL: %s", getJobName(), jobId, this.getTrackingURL()));
+      }
+      else
+      {
+        _log.error(String.format("Job %s with ID %s failed! Tracking URL: %s", getJobName(), jobId, this.getTrackingURL()));
+      }
+      
+      return success;
+    }
+    catch (Exception e)
+    {
+      _log.error("Exception: " + e.toString());
+      throw new Exception(e);
+    }
+  }
+    
+  /**
+   * Run the job and wait for it to complete.  Output will be temporarily stored under the staging path.
+   * If the job is successful it will be moved to the final location.
+   */
+  @Override
+  public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException
+  {
+    final Path actualOutputPath = FileOutputFormat.getOutputPath(this);
+    final Path stagedPath = new Path(String.format("%s/%s/staged", _stagingPrefix, System.currentTimeMillis()));
+
+    FileOutputFormat.setOutputPath(
+      this,
+      stagedPath
+    );
+
+    final Thread hook = new Thread(new Runnable()
+    {
+      @Override
+      public void run()
+      {
+        try 
+        {
+          killJob();
+        }
+        catch (IOException e) 
+        {
+          e.printStackTrace();
+        }
+      }
+    });
+
+    Runtime.getRuntime().addShutdownHook(hook);
+    
+    final boolean retVal = super.waitForCompletion(verbose);
+    Runtime.getRuntime().removeShutdownHook(hook);
+
+    if (retVal) 
+    {
+      FileSystem fs = actualOutputPath.getFileSystem(getConfiguration());
+
+      fs.mkdirs(actualOutputPath);
+
+      _log.info(String.format("Deleting data at old path[%s]", actualOutputPath));
+      fs.delete(actualOutputPath, true);
+
+      _log.info(String.format("Moving from staged path[%s] to final resting place[%s]", stagedPath, actualOutputPath));
+      boolean renamed = fs.rename(stagedPath, actualOutputPath);
+      
+      if (renamed && _writeCounters)
+      {
+        writeCounters(fs);              
+      }
+      
+      return renamed;
+    }
+    else 
+    {
+      FileSystem fs = actualOutputPath.getFileSystem(getConfiguration());
+      _log.info(String.format("Job failed, deleting staged path[%s]", stagedPath));
+      try
+      {
+        fs.delete(stagedPath, true);
+      }
+      catch (IOException e)
+      {            
+      }
+    }
+
+    _log.warn("retVal was false for some reason...");
+    return retVal;
+  }
+    
+  /**
+   * Gets the class for the caller.
+   * 
+   * @return caller class
+   */
+  private static Class<?> getCallersClass()
+  {
+    StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+    boolean foundSelf = false;
+    for (StackTraceElement element : stack) 
+    {
+      if (foundSelf &&
+          !StagedOutputJob.class.getName().equals(element.getClassName())) 
+      {
+        try 
+        {
+          return Class.forName(element.getClassName());
+        }
+        catch (ClassNotFoundException e) 
+        {
+          throw new RuntimeException(e);
+        }
+      }
+      else if (StagedOutputJob.class.getName().equals(element.getClassName()) 
+               && "getCallersClass".equals(element.getMethodName())) 
+      {
+        foundSelf = true;
+      }
+    }
+    return StagedOutputJob.class;
+  }
+    
+  /**
+   * Writes Hadoop counters and other task statistics to a file in the file system.
+   * 
+   * @param fs
+   * @throws IOException
+   */
+  private void writeCounters(final FileSystem fs) throws IOException
+  {
+    final Path actualOutputPath = FileOutputFormat.getOutputPath(this);
+    
+    SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyyMMddHHmmss");
+    
+    String suffix = timestampFormat.format(new Date());
+    
+    if (_countersParentPath != null)
+    {
+      if (!fs.exists(_countersParentPath))
+      {
+        _log.info("Creating counter parent path " + _countersParentPath);
+        fs.mkdirs(_countersParentPath, FsPermission.valueOf("-rwxrwxr-x"));
+      }
+      // make the name as unique as possible in this case because this may be a directory
+      // where other counter files will be dropped
+      _countersPath = new Path(_countersParentPath,".counters." + suffix);
+    }
+    else
+    {
+      _countersPath = new Path(actualOutputPath,".counters." + suffix);
+    }
+    
+    _log.info(String.format("Writing counters to %s", _countersPath));
+    FSDataOutputStream counterStream = fs.create(_countersPath);
+    BufferedOutputStream buffer = new BufferedOutputStream(counterStream,256*1024);
+    OutputStreamWriter writer = new OutputStreamWriter(buffer);      
+    for (String groupName : getCounters().getGroupNames())
+    {
+      for (Counter counter : getCounters().getGroup(groupName))
+      {
+        writeAndLog(writer,String.format("%s=%d",counter.getName(),counter.getValue()));
+      }
+    }
+          
+    JobID jobID = this.getJobID();
+    
+    org.apache.hadoop.mapred.JobID oldJobId = new org.apache.hadoop.mapred.JobID(jobID.getJtIdentifier(),jobID.getId());
+    
+    long minStart = Long.MAX_VALUE;      
+    long maxFinish = 0;
+    long setupStart = Long.MAX_VALUE;
+    long cleanupFinish = 0;
+    DescriptiveStatistics mapStats = new DescriptiveStatistics();
+    DescriptiveStatistics reduceStats = new DescriptiveStatistics();
+    boolean success = true;
+    
+    JobClient jobClient = new JobClient(this.conf);
+    
+    Map<String,String> taskIdToType = new HashMap<String,String>();
+               
+    TaskReport[] setupReports = jobClient.getSetupTaskReports(oldJobId);
+    if (setupReports.length > 0)
+    {
+      _log.info("Processing setup reports");
+      for (TaskReport report : jobClient.getSetupTaskReports(oldJobId))
+      {
+        taskIdToType.put(report.getTaskID().toString(),"SETUP");
+        if (report.getStartTime() == 0)
+        {
+          _log.warn("Skipping report with zero start time");
+          continue;
+        }
+        setupStart = Math.min(setupStart, report.getStartTime());
+      }
+    }
+    else
+    {
+      _log.error("No setup reports");
+    }
+    
+    TaskReport[] mapReports = jobClient.getMapTaskReports(oldJobId);
+    if (mapReports.length > 0)
+    {
+      _log.info("Processing map reports");
+      for (TaskReport report : mapReports)
+      {
+        taskIdToType.put(report.getTaskID().toString(),"MAP");
+        if (report.getFinishTime() == 0 || report.getStartTime() == 0)
+        {
+          _log.warn("Skipping report with zero start or finish time");
+          continue;
+        }
+        minStart = Math.min(minStart, report.getStartTime());
+        mapStats.addValue(report.getFinishTime() - report.getStartTime());
+      }
+    }
+    else
+    {
+      _log.error("No map reports");
+    }
+    
+    TaskReport[] reduceReports = jobClient.getReduceTaskReports(oldJobId);
+    if (reduceReports.length > 0)
+    {
+      _log.info("Processing reduce reports");
+      for (TaskReport report : reduceReports)
+      {      
+        taskIdToType.put(report.getTaskID().toString(),"REDUCE");
+        if (report.getFinishTime() == 0 || report.getStartTime() == 0)
+        {
+          _log.warn("Skipping report with zero start or finish time");
+          continue;
+        }
+        maxFinish = Math.max(maxFinish, report.getFinishTime());
+        reduceStats.addValue(report.getFinishTime() - report.getStartTime());
+      }
+    }
+    else
+    {
+      _log.error("No reduce reports");
+    }
+    
+    TaskReport[] cleanupReports = jobClient.getCleanupTaskReports(oldJobId);
+    if (cleanupReports.length > 0)
+    {
+      _log.info("Processing cleanup reports");
+      for (TaskReport report : cleanupReports)
+      {
+        taskIdToType.put(report.getTaskID().toString(),"CLEANUP");
+        if (report.getFinishTime() == 0)
+        {
+          _log.warn("Skipping report with finish time of zero");
+          continue;
+        }
+        cleanupFinish = Math.max(cleanupFinish, report.getFinishTime());
+      }
+    }
+    else
+    {
+      _log.error("No cleanup reports");
+    }
+      
+    if (minStart == Long.MAX_VALUE)
+    {
+      _log.error("Could not determine map-reduce start time");
+      success = false;
+    }      
+    if (maxFinish == 0)
+    {
+      _log.error("Could not determine map-reduce finish time");
+      success = false;
+    }
+    
+    if (setupStart == Long.MAX_VALUE)
+    {
+      _log.error("Could not determine setup start time");
+      success = false;
+    }      
+    if (cleanupFinish == 0)
+    {
+      _log.error("Could not determine cleanup finish time");
+      success = false;
+    }     
+    
+    // Collect statistics on successful/failed/killed task attempts, categorized by setup/map/reduce/cleanup.
+    // Unfortunately the job client doesn't have an easier way to get these statistics.
+    Map<String,Integer> attemptStats = new HashMap<String,Integer>();
+    _log.info("Processing task attempts");            
+    for (TaskCompletionEvent event : getTaskCompletionEvents(jobClient,oldJobId))
+    {
+      String type = taskIdToType.get(event.getTaskAttemptId().getTaskID().toString());
+      String status = event.getTaskStatus().toString();
+      
+      String key = String.format("%s_%s_ATTEMPTS",status,type);
+      if (!attemptStats.containsKey(key))
+      {
+        attemptStats.put(key, 0);
+      }
+      attemptStats.put(key, attemptStats.get(key) + 1);
+    }
+              
+    if (success)
+    {
+      writeAndLog(writer,String.format("SETUP_START_TIME_MS=%d",setupStart));
+      writeAndLog(writer,String.format("CLEANUP_FINISH_TIME_MS=%d",cleanupFinish));
+      writeAndLog(writer,String.format("COMPLETE_WALL_CLOCK_TIME_MS=%d",cleanupFinish - setupStart));
+      
+      writeAndLog(writer,String.format("MAP_REDUCE_START_TIME_MS=%d",minStart));
+      writeAndLog(writer,String.format("MAP_REDUCE_FINISH_TIME_MS=%d",maxFinish));
+      writeAndLog(writer,String.format("MAP_REDUCE_WALL_CLOCK_TIME_MS=%d",maxFinish - minStart));
+      
+      writeAndLog(writer,String.format("MAP_TOTAL_TASKS=%d",(long)mapStats.getN()));
+      writeAndLog(writer,String.format("MAP_MAX_TIME_MS=%d",(long)mapStats.getMax()));
+      writeAndLog(writer,String.format("MAP_MIN_TIME_MS=%d",(long)mapStats.getMin()));
+      writeAndLog(writer,String.format("MAP_AVG_TIME_MS=%d",(long)mapStats.getMean()));
+      writeAndLog(writer,String.format("MAP_STD_TIME_MS=%d",(long)mapStats.getStandardDeviation()));
+      writeAndLog(writer,String.format("MAP_SUM_TIME_MS=%d",(long)mapStats.getSum()));
+      
+      writeAndLog(writer,String.format("REDUCE_TOTAL_TASKS=%d",(long)reduceStats.getN()));
+      writeAndLog(writer,String.format("REDUCE_MAX_TIME_MS=%d",(long)reduceStats.getMax()));
+      writeAndLog(writer,String.format("REDUCE_MIN_TIME_MS=%d",(long)reduceStats.getMin()));
+      writeAndLog(writer,String.format("REDUCE_AVG_TIME_MS=%d",(long)reduceStats.getMean()));
+      writeAndLog(writer,String.format("REDUCE_STD_TIME_MS=%d",(long)reduceStats.getStandardDeviation()));
+      writeAndLog(writer,String.format("REDUCE_SUM_TIME_MS=%d",(long)reduceStats.getSum()));
+      
+      writeAndLog(writer,String.format("MAP_REDUCE_SUM_TIME_MS=%d",(long)mapStats.getSum() + (long)reduceStats.getSum()));
+      
+      for (Map.Entry<String, Integer> attemptStat : attemptStats.entrySet())
+      {
+        writeAndLog(writer,String.format("%s=%d",attemptStat.getKey(),attemptStat.getValue()));
+      }
+    }
+    
+    writer.close();
+    buffer.close();
+    counterStream.close();
+  }
+    
+  /**
+   * Get all task completion events for a particular job.
+   * 
+   * @param jobClient job client
+   * @param jobId job ID
+   * @return task completion events
+   * @throws IOException
+   */
+  private List<TaskCompletionEvent> getTaskCompletionEvents(JobClient jobClient, org.apache.hadoop.mapred.JobID jobId) throws IOException
+  {
+    List<TaskCompletionEvent> events = new ArrayList<TaskCompletionEvent>();
+    
+    // Tries to use reflection to get access to the getTaskCompletionEvents method from the private jobSubmitClient field.
+    // This method has a parameter for the size, which defaults to 10 for the top level methods and can therefore be extremely slow
+    // if the goal is to get all events.
+    
+    Method getTaskCompletionEventsMethod = null;
+    Object jobSubmitClient = null;
+    
+    try
+    {
+      Field f = JobClient.class.getDeclaredField("jobSubmitClient");
+      f.setAccessible(true);
+      jobSubmitClient = f.get(jobClient);       
+      
+      if (jobSubmitClient != null)
+      { 
+        getTaskCompletionEventsMethod = jobSubmitClient.getClass().getDeclaredMethod("getTaskCompletionEvents", org.apache.hadoop.mapred.JobID.class,int.class,int.class);
+        getTaskCompletionEventsMethod.setAccessible(true);
+      }
+    }
+    catch (NoSuchMethodException e)
+    {
+    }
+    catch (SecurityException e)
+    {
+    }
+    catch (NoSuchFieldException e)
+    {
+    }
+    catch (IllegalArgumentException e)
+    {
+    }
+    catch (IllegalAccessException e)
+    {       
+    }
+    
+    if (getTaskCompletionEventsMethod != null)
+    {
+      _log.info("Will call getTaskCompletionEvents via reflection since it's faster");
+    }
+    else
+    {
+      _log.info("Will call getTaskCompletionEvents via the slow method");
+    }
+    
+    int index = 0;
+    while(true)
+    {
+      TaskCompletionEvent[] currentEvents;
+      if (getTaskCompletionEventsMethod != null)
+      {
+        try
+        {
+          // grab events, 250 at a time, which is faster than the other method which defaults to 10 at a time (with no override ability)
+          currentEvents = (TaskCompletionEvent[])getTaskCompletionEventsMethod.invoke(jobSubmitClient, jobId, index, 250);
+        }
+        catch (IllegalArgumentException e)
+        {
+          _log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
+          getTaskCompletionEventsMethod = null;
+          continue;
+        }
+        catch (IllegalAccessException e)
+        {
+          _log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
+          getTaskCompletionEventsMethod = null;
+          continue;
+        }
+        catch (InvocationTargetException e)
+        {
+          _log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
+          getTaskCompletionEventsMethod = null;
+          continue;
+        }
+      }
+      else
+      {
+        currentEvents = this.getTaskCompletionEvents(index);
+      }        
+      if (currentEvents.length == 0) break;
+      for (TaskCompletionEvent event : currentEvents)
+      {
+        events.add(event);
+      }
+      index += currentEvents.length;
+    }
+    
+    return events;
+  }
+  
+  private void writeAndLog(OutputStreamWriter writer, String line) throws IOException
+  {
+    writer.append(line);
+    writer.append("\n");
+    _log.info(line);
+  }
+  
+  static class HiddenFilePathFilter implements PathFilter
+  {
+    @Override
+    public boolean accept(Path path)
+    {
+      String name = path.getName();
+      return ! name.startsWith("_") &&
+             ! name.startsWith(".");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/TimeBasedJob.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/TimeBasedJob.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/TimeBasedJob.java
new file mode 100644
index 0000000..81112da
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/TimeBasedJob.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.text.ParseException;
+import java.util.Date;
+import java.util.Properties;
+
+import datafu.hourglass.fs.PathUtils;
+
+/**
+ * Base class for Hadoop jobs that consume time-partitioned data.
+ * 
+ * <p>
+ * This class has the same configuration and methods as {@link AbstractJob}.
+ * In addition it also recognizes the following properties:
+ * </p>
+ * 
+ * <ul>
+ *   <li><em>num.days</em> - Number of consecutive days of input data to consume</li>
+ *   <li><em>days.ago</em> - Number of days to subtract off the end of the consumption window</li>
+ *   <li><em>start.date</em> - Start date for window in yyyyMMdd format</li>
+ *   <li><em>end.date</em> - End date for window in yyyyMMdd format</li>
+ * </ul>
+ * 
+ * <p>
+ * Methods are available as well for setting these configuration parameters.
+ * </p>
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public abstract class TimeBasedJob extends AbstractJob
+{ 
+  private Integer _numDays;
+  private Integer _daysAgo;
+  private Date _startDate;
+  private Date _endDate;  
+  
+  /**
+   * Initializes the job.
+   */
+  public TimeBasedJob()
+  {    
+  }
+  
+  /**
+   * Initializes the job with a job name and properties.
+   * 
+   * @param name Job name
+   * @param props Configuration properties
+   */
+  public TimeBasedJob(String name, Properties props)
+  {        
+    super(name,props);
+  }
+  
+  @Override
+  public void setProperties(Properties props)
+  {
+    super.setProperties(props);
+    
+    if (getProperties().get("num.days") != null)
+    {
+      setNumDays(Integer.parseInt((String)getProperties().get("num.days")));
+    }
+    
+    if (getProperties().get("days.ago") != null)
+    {
+      setDaysAgo(Integer.parseInt((String)getProperties().get("days.ago")));
+    }
+    
+    if (getProperties().get("start.date") != null)
+    {
+      try
+      {
+        // start date treated as inclusive lower bound
+        setStartDate(PathUtils.datedPathFormat.parse((String)getProperties().get("start.date")));
+      }
+      catch (ParseException e)
+      {
+        throw new IllegalArgumentException(e);
+      }
+    }
+    
+    if (getProperties().get("end.date") != null)
+    {
+      try
+      {
+        setEndDate(PathUtils.datedPathFormat.parse((String)getProperties().get("end.date")));
+      }
+      catch (ParseException e)
+      {
+        throw new IllegalArgumentException(e);
+      }
+    }
+  }
+  
+  /**
+   * Gets the number of consecutive days to process.
+   * 
+   * @return number of days to process
+   */
+  public Integer getNumDays()
+  {
+    return _numDays;
+  }
+
+  /**
+   * Sets the number of consecutive days to process.
+   * 
+   * @param numDays number of days to process
+   */
+  public void setNumDays(Integer numDays)
+  {
+    this._numDays = numDays;
+  }
+
+  /**
+   * Gets the number of days to subtract off the end of the consumption window.
+   * 
+   * @return Days ago
+   */
+  public Integer getDaysAgo()
+  {
+    return _daysAgo;
+  }
+
+  /**
+   * Sets the number of days to subtract off the end of the consumption window.
+   * 
+   * @param daysAgo Days ago
+   */
+  public void setDaysAgo(Integer daysAgo)
+  {
+    this._daysAgo = daysAgo;
+  }
+
+  /**
+   * Gets the start date.
+   * 
+   * @return Start date
+   */
+  public Date getStartDate()
+  {
+    return _startDate;
+  }
+
+  /**
+   * Sets the start date.
+   * 
+   * @param startDate start date
+   */
+  public void setStartDate(Date startDate)
+  {
+    this._startDate = startDate;
+  }
+
+  /**
+   * Gets the end date.
+   * 
+   * @return end date
+   */
+  public Date getEndDate()
+  {
+    return _endDate;
+  }
+
+  /**
+   * Sets the end date.
+   * 
+   * @param endDate end date
+   */
+  public void setEndDate(Date endDate)
+  {
+    this._endDate = endDate;
+  }  
+  
+  @Override
+  protected void validate()
+  {
+    super.validate();
+    
+    if (_daysAgo != null && _endDate != null)
+    {
+      throw new IllegalArgumentException("Cannot specify both end date and days ago");
+    }
+    
+    if (_numDays != null && _startDate != null && _endDate != null)
+    {
+      throw new IllegalArgumentException("Cannot specify num days when both start date and end date are set");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/TimePartitioner.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/TimePartitioner.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/TimePartitioner.java
new file mode 100644
index 0000000..17c460d
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/TimePartitioner.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * A partitioner used by {@link AbstractPartitionPreservingIncrementalJob} to limit the number of named outputs
+ * used by each reducer.
+ * 
+ * <p>
+ * The purpose of this partitioner is to prevent a proliferation of small files created by {@link AbstractPartitionPreservingIncrementalJob}.
+ * This job writes multiple outputs.  Each output corresponds to a day of input data.  By default records will be distributed across all
+ * the reducers.  This means that if many input days are consumed, then each reducer will write many outputs.  These outputs will typically
+ * be small.  The problem gets worse as more input data is consumed, as this will cause more reducers to be required. 
+ * </p>
+ * 
+ * <p>
+ * This partitioner solves the problem by limiting how many days of input data will be mapped to each reducer.  At the extreme each day of
+ * input data could be mapped to only one reducer.  This is controlled through the configuration setting <em>incremental.reducers.per.input</em>,
+ * which should be set in the Hadoop configuration.  Input days are assigned to reducers in a round-robin fashion.
+ * </p>
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class TimePartitioner extends Partitioner<AvroKey<GenericRecord>,AvroValue<GenericRecord>> implements Configurable
+{
+  public static String INPUT_TIMES = "incremental.input.times";  
+  public static String REDUCERS_PER_INPUT = "incremental.reducers.per.input";
+  private static String REDUCE_TASKS = "mapred.reduce.tasks";
+    
+  private int numReducers;
+  private Map<Long,List<Integer>> partitionMapping;  
+  private Configuration conf;
+  
+  @Override
+  public Configuration getConf()
+  {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf)
+  {
+    this.conf = conf;
+    
+    if (conf.get(REDUCE_TASKS) == null)
+    {
+      throw new RuntimeException(REDUCE_TASKS + " is required");
+    }
+    
+    this.numReducers = Integer.parseInt(conf.get(REDUCE_TASKS));
+    
+    if (conf.get(REDUCERS_PER_INPUT) == null)
+    {
+      throw new RuntimeException(REDUCERS_PER_INPUT + " is required");
+    }
+    
+    int reducersPerInput = Integer.parseInt(conf.get(REDUCERS_PER_INPUT));
+    
+    this.partitionMapping = new HashMap<Long,List<Integer>>();
+    int partition = 0;
+    for (String part : conf.get(INPUT_TIMES).split(","))
+    {      
+      Long day = Long.parseLong(part);
+      
+      List<Integer> partitions = new ArrayList<Integer>();
+      for (int r=0; r<reducersPerInput; r++)
+      {
+        partitions.add(partition);
+        partition = (partition + 1) % this.numReducers;
+      }
+      
+      partitionMapping.put(day,partitions);
+    }  
+  }
+  
+  @Override
+  public int getPartition(AvroKey<GenericRecord> key, AvroValue<GenericRecord> value, int numReduceTasks)
+  {
+    if (numReduceTasks != this.numReducers)
+    {
+      throw new RuntimeException("numReduceTasks " + numReduceTasks + " does not match expected " + this.numReducers);
+    }
+    
+    Long time = (Long)key.datum().get("time");
+    if (time == null)
+    {
+      throw new RuntimeException("time is null");
+    }
+    
+    List<Integer> partitions = this.partitionMapping.get(time);
+  
+    if (partitions == null)
+    {
+      throw new RuntimeException("Couldn't find partition for " + time);
+    }
+    
+    GenericRecord extractedKey = (GenericRecord)key.datum().get("value");
+    
+    if (extractedKey == null)
+    {
+      throw new RuntimeException("extracted key is null");
+    }
+    
+    int partitionIndex = (extractedKey.hashCode() & Integer.MAX_VALUE) % partitions.size();
+    
+    return partitions.get(partitionIndex);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/package-info.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/package-info.java
new file mode 100644
index 0000000..c0cac13
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/package-info.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Incremental Hadoop jobs and some supporting classes.  
+ * 
+ * <p>
+ * Jobs within this package form the core of the incremental framework implementation.
+ * There are two types of incremental jobs: <em>partition-preserving</em> and 
+ * <em>partition-collapsing</em>.
+ * </p>
+ * 
+ * <p>
+ * A partition-preserving job consumes input data partitioned by day and produces output data partitioned by day.
+ * This is equivalent to running a MapReduce job for each individual day of input data,
+ * but much more efficient.  It compares the input data against the existing output data and only processes
+ * input data with no corresponding output.
+ * </p>
+ * 
+ * <p>
+ * A partition-collapsing job consumes input data partitioned by day and produces a single output.
+ * What distinguishes this job from a standard MapReduce job is that it can reuse the previous output.
+ * This enables it to process data much more efficiently.  Rather than consuming all input data on each
+ * run, it can consume only the new data since the previous run and merges it with the previous output.
+ * </p>
+ * 
+ * <p>
+ * Partition-preserving and partition-collapsing jobs can be created by extending {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob}
+ * and {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob}, respectively, and implementing the necessary methods.
+ * Alternatively, there are concrete versions of these classes, {@link datafu.hourglass.jobs.PartitionPreservingIncrementalJob} and 
+ * {@link datafu.hourglass.jobs.PartitionCollapsingIncrementalJob}, which can be used instead.  With these classes, the implementations are provided
+ * through setters.  
+ * </p>
+ * 
+ * <p>
+ * Incremental jobs use Avro for input, intermediate, and output data.  To implement an incremental job, one must define their schemas.
+ * A <em>key schema</em> and <em>intermediate value schema</em> specify the output of the mapper and combiner, which output key-value pairs.
+ * The <em>key schema</em> and an <em>output value schema</em> specify the output of the reducer, which outputs a record having key and value
+ * fields.
+ * </p>
+ * 
+ * <p>
+ * An incremental job also requires that implementations of map and reduce be defined, and optionally combine.  The map implementation must 
+ * implement a {@link datafu.hourglass.model.Mapper} interface, which is very similar to the standard map interface in Hadoop.
+ * The combine and reduce operations are implemented through an {@link datafu.hourglass.model.Accumulator} interface.
+ * This is similar to the standard reduce in Hadoop, however values are provided one-at-a-time rather than by an enumerable list.
+ * Also an accumulator returns either one value or no value at all by returning null.  That is, the accumulator may not return an arbitrary number of values
+ * for the output.  This restriction precludes the implementation of certain operations, like flatten, which do not fit well within the 
+ * incremental programming model.
+ * </p>
+ */
+package datafu.hourglass.jobs;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/AvroKeyValueIdentityMapper.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/AvroKeyValueIdentityMapper.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/AvroKeyValueIdentityMapper.java
new file mode 100644
index 0000000..ff317ca
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/AvroKeyValueIdentityMapper.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * A mapper which outputs key-value pairs as-is.
+ * 
+ * It assumes the input is an Avro record having "key" and "value" fields.
+ * The output is these exact same fields.
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class AvroKeyValueIdentityMapper extends Mapper<Object, Object, Object, Object> 
+{
+  @Override
+  protected void map(Object keyObj, Object valueObj, Context context) throws java.io.IOException, java.lang.InterruptedException
+  {
+    @SuppressWarnings("unchecked")
+    GenericRecord input = ((AvroKey<GenericRecord>)keyObj).datum();
+    GenericRecord key = (GenericRecord)input.get("key");
+    GenericRecord value = (GenericRecord)input.get("value");
+    context.write(new AvroKey<GenericRecord>(key),new AvroValue<GenericRecord>(value));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingCombiner.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingCombiner.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingCombiner.java
new file mode 100644
index 0000000..60ce4fe
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingCombiner.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.mapreduce.ReduceContext;
+
+
+import datafu.hourglass.fs.DateRange;
+import datafu.hourglass.jobs.DateRangeConfigurable;
+import datafu.hourglass.model.Accumulator;
+import datafu.hourglass.schemas.PartitionCollapsingSchemas;
+
+/**
+ * The combiner used by {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob} and its derived classes.
+ * 
+ * <p>
+ * An implementation of {@link datafu.hourglass.model.Accumulator} is used to perform aggregation and produce the
+ * intermediate value.
+ * </p>
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class CollapsingCombiner extends ObjectReducer implements DateRangeConfigurable, Serializable
+{  
+  private Accumulator<GenericRecord,GenericRecord> _accumulator;
+  private boolean _reusePreviousOutput;
+  private PartitionCollapsingSchemas _schemas;
+  private long _beginTime;
+  private long _endTime;
+  
+  @SuppressWarnings("unchecked")
+  public void reduce(Object keyObj,
+                      Iterable<Object> values,
+                      ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
+  {
+    Accumulator<GenericRecord,GenericRecord> acc = getAccumulator();
+    
+    if (acc == null)
+    {
+      throw new RuntimeException("No combiner factory set");
+    }
+    
+    long accumulatedCount = 0;
+    
+    acc.cleanup();
+    
+    for (Object valueObj : values)
+    {       
+      GenericRecord value = ((AvroValue<GenericRecord>)valueObj).datum();
+      if (value.getSchema().getFullName().equals(getSchemas().getIntermediateValueSchema().getFullName()))
+      {        
+        acc.accumulate(value);
+        accumulatedCount++;
+      }
+      else if (value.getSchema().getFullName().equals(getSchemas().getDatedIntermediateValueSchema().getFullName()))
+      {          
+        if (!_reusePreviousOutput)
+        {
+          throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName()); 
+        }
+        
+        Long time = (Long)value.get("time");
+        GenericRecord data = (GenericData.Record)value.get("value");
+        
+        if (time == null)
+        {
+          throw new RuntimeException("time is null");
+        }
+        
+        if (data == null)
+        {
+          throw new RuntimeException("value is null");
+        }
+        
+        if (time >= _beginTime && time <= _endTime)
+        {
+          acc.accumulate(data);
+          accumulatedCount++;
+        }
+        else if (time < _beginTime)
+        {
+          // pass through unchanged, reducer will handle it
+          context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(value));
+        }
+        else
+        {
+          throw new RuntimeException(String.format("Time %d is greater than end time %d",time,_endTime));
+        }
+      }
+      else if (value.getSchema().getFullName().equals(getSchemas().getOutputValueSchema().getFullName()))
+      {   
+        if (!_reusePreviousOutput)
+        {
+          throw new RuntimeException("Did not expect " + getSchemas().getOutputValueSchema().getFullName()); 
+        }
+                
+        // pass through unchanged, reducer will handle it
+        context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(value));
+      }
+      else
+      {
+        throw new RuntimeException("Unexpected type: " + value.getSchema().getFullName());
+      }      
+    }
+    
+    if (accumulatedCount > 0)
+    {
+      GenericRecord intermediateValue = acc.getFinal();
+      if (intermediateValue != null)
+      {
+        context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(intermediateValue));
+      }
+    }
+  }
+  
+  /**
+   * Sets the schemas.
+   * 
+   * @param schemas
+   */
+  public void setSchemas(PartitionCollapsingSchemas schemas)
+  {
+    _schemas = schemas;
+  }
+  
+  /**
+   * Gets the schemas.
+   * 
+   * @return schemas
+   */
+  public PartitionCollapsingSchemas getSchemas()
+  {
+    return _schemas;
+  }
+  
+  /**
+   * Gets whether previous output is being reused.
+   * 
+   * @return true if previous output is reused
+   */
+  public boolean getReuseOutput()
+  {
+    return _reusePreviousOutput;
+  }
+  
+  /**
+   * Sets whether previous output is being reused.
+   * 
+   * @param reuseOutput true if previous output is reused
+   */
+  public void setReuseOutput(boolean reuseOutput)
+  {
+    _reusePreviousOutput = reuseOutput;
+  }
+  
+  /**
+   * Gets the accumulator used to perform aggregation. 
+   * 
+   * @return The accumulator
+   */
+  public Accumulator<GenericRecord,GenericRecord> getAccumulator()
+  {
+    return _accumulator;
+  }
+  
+  /**
+   * Sets the accumulator used to perform aggregation. 
+   * 
+   * @param acc The accumulator
+   */
+  public void setAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
+  {
+    _accumulator = cloneAccumulator(acc);
+  }
+  
+  public void setOutputDateRange(DateRange dateRange)
+  {
+    _beginTime = dateRange.getBeginDate().getTime();
+    _endTime = dateRange.getEndDate().getTime();
+  }
+  
+  /**
+   * Clone a {@link Accumulator} by serializing and deserializing it.
+   * 
+   * @param acc The accumulator to clone
+   * @return The clone accumulator
+   */
+  private Accumulator<GenericRecord,GenericRecord> cloneAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
+  {
+    try
+    {
+      // clone by serializing
+      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+      ObjectOutputStream objStream;
+      objStream = new ObjectOutputStream(outputStream);    
+      objStream.writeObject(acc);
+      objStream.close();
+      outputStream.close();
+      ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+      ObjectInputStream objInputStream = new ObjectInputStream(inputStream);
+      @SuppressWarnings("unchecked")
+      Accumulator<GenericRecord,GenericRecord> result = (Accumulator<GenericRecord,GenericRecord>)objInputStream.readObject();
+      objInputStream.close();
+      inputStream.close();
+      return result;
+    }
+    catch (IOException e)
+    {
+      throw new RuntimeException(e);
+    }
+    catch (ClassNotFoundException e)
+    {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingMapper.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingMapper.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingMapper.java
new file mode 100644
index 0000000..7c6c5a2
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingMapper.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.avro.UnresolvedUnionException;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.log4j.Logger;
+
+
+import datafu.hourglass.fs.PathUtils;
+import datafu.hourglass.model.KeyValueCollector;
+import datafu.hourglass.model.Mapper;
+import datafu.hourglass.schemas.PartitionCollapsingSchemas;
+
+/**
+ * The mapper used by {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob} and its derived classes.
+ * 
+ * <p>
+ * An implementation of {@link datafu.hourglass.model.Mapper} is used for the
+ * map operation, which produces key and intermediate value pairs from the input.
+ * </p>
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class CollapsingMapper extends ObjectMapper implements Serializable
+{
+  private static Logger _log = Logger.getLogger(CollapsingMapper.class);
+  
+  private transient IdentityMapCollector _mapCollector;
+  private transient TimeMapCollector _timeMapCollector;
+ 
+  private boolean _reusePreviousOutput;
+  private PartitionCollapsingSchemas _schemas;  
+  private Mapper<GenericRecord,GenericRecord,GenericRecord> _mapper;
+    
+  @Override
+  public void map(Object inputObj, MapContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
+  {
+    @SuppressWarnings("unchecked")
+    GenericRecord input = ((AvroKey<GenericRecord>)inputObj).datum();   
+    try
+    {
+      getMapCollector().setContext(context);
+      getMapper().map(input, getMapCollector());
+    }
+    catch (InterruptedException e)
+    {
+      throw new IOException(e);
+    }
+    catch (UnresolvedUnionException e)
+    {
+      GenericRecord record = (GenericRecord)e.getUnresolvedDatum();
+      _log.error("UnresolvedUnionException on schema: " + record.getSchema());
+      throw e;
+    } 
+  }
+  
+  /**
+   * Gets whether previous output is being reused.
+   * 
+   * @return true if previous output is reused
+   */
+  public boolean getReuseOutput()
+  {
+    return _reusePreviousOutput;
+  }
+  
+  /**
+   * Sets whether previous output is being reused.
+   * 
+   * @param reuseOutput true if previous output is reused
+   */
+  public void setReuseOutput(boolean reuseOutput)
+  {
+    _reusePreviousOutput = reuseOutput;
+  }
+  
+  @Override
+  public void setContext(TaskInputOutputContext<Object,Object,Object,Object> context)
+  {      
+    super.setContext(context);
+    
+    if (_mapper instanceof Configurable)
+    {
+      ((Configurable)_mapper).setConf(context.getConfiguration());
+    }
+  }
+  
+  /**
+   * Gets the mapper.
+   * 
+   * @return mapper
+   */
+  public Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper()
+  {
+    return _mapper;
+  }
+  
+  /**
+   * Sets the mapper.
+   * 
+   * @param mapper
+   */
+  public void setMapper(Mapper<GenericRecord,GenericRecord,GenericRecord> mapper)
+  {
+    _mapper = mapper;
+  }
+  
+  /**
+   * Sets the Avro schemas.
+   * 
+   * @param schemas
+   */
+  public void setSchemas(PartitionCollapsingSchemas schemas)
+  {
+    _schemas = schemas;
+  }
+  
+  /**
+   * Gets the Avro schemas.
+   * 
+   * @return schemas
+   */
+  public PartitionCollapsingSchemas getSchemas()
+  {
+    return _schemas;
+  }
+  
+  /**
+   * Gets the collector used to collect key-value pairs.
+   * 
+   * @return The collector
+   */
+  private MapCollector getMapCollector()
+  {
+    if (getReuseOutput())
+    {
+      return getTimeMapCollector();
+    }
+    else
+    {
+      return getIdentityMapCollector();
+    }
+  }
+  
+  /**
+   * Gets a collector that maps key-value pairs, where each value
+   * is tagged with the partition from which it was derived. 
+   * 
+   * @return collector
+   */
+  private TimeMapCollector getTimeMapCollector()
+  {
+    if (_timeMapCollector == null)
+    {
+      _timeMapCollector = new TimeMapCollector(getSchemas());
+    }
+    
+    return _timeMapCollector;
+  }
+  
+  /**
+   * Gets a collector that maps key-value pairs as-is.
+   * 
+   * @return collector
+   */
+  private IdentityMapCollector getIdentityMapCollector()
+  {
+    if (_mapCollector == null)
+    {
+      _mapCollector = new IdentityMapCollector(getSchemas());
+    }
+    
+    return _mapCollector;
+  }
+    
+  private abstract class MapCollector implements KeyValueCollector<GenericRecord,GenericRecord>
+  {
+    private MapContext<Object,Object,Object,Object> context;
+    
+    public void setContext(MapContext<Object,Object,Object,Object> context)
+    {
+      this.context = context;
+    }
+    
+    public MapContext<Object,Object,Object,Object> getContext()
+    {
+      return context;
+    }
+  }
+  
+  /**
+   * A {@see KeyValueCollector} that outputs key-value pairs to {@link MapContext} 
+   * and tags each mapped value with the time for the partition it was derived from.
+   * 
+   * @author "Matthew Hayes"
+   *
+   */
+  private class TimeMapCollector extends MapCollector
+  {
+    private GenericRecord wrappedValue;
+    private InputSplit lastSplit;
+    private long lastTime;
+    
+    public TimeMapCollector(PartitionCollapsingSchemas schemas)
+    {
+      this.wrappedValue = new GenericData.Record(schemas.getDatedIntermediateValueSchema());
+    }
+        
+    public void collect(GenericRecord key, GenericRecord value) throws IOException, InterruptedException
+    {                
+      if (key == null)
+      {
+        throw new RuntimeException("key is null");
+      }
+      if (value == null)
+      {
+        throw new RuntimeException("value is null");
+      }
+      
+      // wrap the value with the time so we know what to merge and what to unmerge        
+      long time;        
+      if (lastSplit == getContext().getInputSplit())
+      {
+        time = lastTime;
+      }
+      else
+      {
+        FileSplit currentSplit;
+        lastSplit = getContext().getInputSplit();
+        try
+        {
+          Method m = getContext().getInputSplit().getClass().getMethod("getInputSplit");
+          m.setAccessible(true);
+          currentSplit = (FileSplit)m.invoke(getContext().getInputSplit());
+        }
+        catch (SecurityException e)
+        {
+          throw new RuntimeException(e);
+        }
+        catch (NoSuchMethodException e)
+        {
+          throw new RuntimeException(e);
+        }
+        catch (IllegalArgumentException e)
+        {
+          throw new RuntimeException(e);
+        }
+        catch (IllegalAccessException e)
+        {
+          throw new RuntimeException(e);
+        }
+        catch (InvocationTargetException e)
+        {
+          throw new RuntimeException(e);
+        }
+        time = PathUtils.getDateForNestedDatedPath((currentSplit).getPath().getParent()).getTime();
+        lastTime = time;
+      }
+              
+      wrappedValue.put("time", time);
+      wrappedValue.put("value", value);
+      
+      getContext().write(new AvroKey<GenericRecord>(key),new AvroValue<GenericRecord>(wrappedValue));
+    }
+  } 
+
+  /**
+   * A {@see KeyValueCollector} that outputs key-value pairs to {@link MapContext} as-is.
+   * 
+   * @author "Matthew Hayes"
+   *
+   */
+  private class IdentityMapCollector extends MapCollector
+  {      
+    public IdentityMapCollector(PartitionCollapsingSchemas schemas)
+    {
+    }
+    
+    public void collect(GenericRecord key, GenericRecord value) throws IOException, InterruptedException
+    {        
+      if (key == null)
+      {
+        throw new RuntimeException("key is null");
+      }
+      if (value == null)
+      {
+        throw new RuntimeException("value is null");
+      }
+      getContext().write(new AvroKey<GenericRecord>(key), new AvroValue<GenericRecord>(value));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingReducer.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingReducer.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingReducer.java
new file mode 100644
index 0000000..86aa66c
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingReducer.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.mapreduce.ReduceContext;
+
+
+import datafu.hourglass.fs.DateRange;
+import datafu.hourglass.jobs.DateRangeConfigurable;
+import datafu.hourglass.model.Accumulator;
+import datafu.hourglass.model.Merger;
+import datafu.hourglass.schemas.PartitionCollapsingSchemas;
+
+/**
+ * The reducer used by {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob} and its derived classes.
+ * 
+ * <p>
+ * An implementation of {@link datafu.hourglass.model.Accumulator} is used to perform aggregation and produce the
+ * output value.
+ * </p>
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class CollapsingReducer extends ObjectReducer implements DateRangeConfigurable, Serializable
+{  
+  protected long _beginTime;
+  protected long _endTime;
+  private Accumulator<GenericRecord,GenericRecord> _newAccumulator;
+  private Accumulator<GenericRecord,GenericRecord> _oldAccumulator;
+  private Merger<GenericRecord> _merger;
+  private Merger<GenericRecord> _oldMerger;
+  private boolean _reusePreviousOutput;
+  private PartitionCollapsingSchemas _schemas;
+  
+  @SuppressWarnings("unchecked")
+  public void reduce(Object keyObj,
+                     Iterable<Object> values,
+                     ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
+  {
+    if (_newAccumulator == null)
+    {
+      throw new RuntimeException("No reducer set");
+    }
+    
+    GenericRecord key = ((AvroKey<GenericRecord>)keyObj).datum();
+    
+    // used when processing all data (i.e. no window size)      
+    Accumulator<GenericRecord,GenericRecord> acc = getNewAccumulator();
+    acc.cleanup();
+    long accumulatedCount = 0;
+    
+    Accumulator<GenericRecord,GenericRecord> accOld = null;
+    long oldAccumulatedCount = 0;
+    if (getReuseOutput())
+    {
+      accOld = getOldAccumulator();
+      accOld.cleanup();
+    }
+    
+    GenericRecord previous = null;
+    
+    for (Object valueObj : values)
+    {       
+      GenericRecord value = ((AvroValue<GenericRecord>)valueObj).datum();
+      if (value.getSchema().getFullName().equals(getSchemas().getIntermediateValueSchema().getFullName()))
+      {        
+        acc.accumulate(value);
+        accumulatedCount++;
+      }
+      else if (value.getSchema().getFullName().equals(getSchemas().getDatedIntermediateValueSchema().getFullName()))
+      {          
+        if (!_reusePreviousOutput)
+        {
+          throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName()); 
+        }
+        
+        Long time = (Long)value.get("time");
+        GenericRecord data = (GenericData.Record)value.get("value");
+        
+        if (time == null)
+        {
+          throw new RuntimeException("time is null");
+        }
+        
+        if (data == null)
+        {
+          throw new RuntimeException("value is null");
+        }
+        
+        if (time >= _beginTime && time <= _endTime)
+        {
+          acc.accumulate(data);
+          accumulatedCount++;
+        }
+        else if (time < _beginTime)
+        {
+          accOld.accumulate(data);
+          oldAccumulatedCount++;
+        }
+        else
+        {
+          throw new RuntimeException(String.format("Time %d is greater than end time %d",time,_endTime));
+        }
+      }
+      else if (value.getSchema().getFullName().equals(getSchemas().getOutputValueSchema().getFullName()))
+      {   
+        if (!_reusePreviousOutput)
+        {
+          throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName()); 
+        }
+        
+        // deep clone the previous output fed back in
+        previous = new GenericData.Record((Record)value,true);
+      }
+      else
+      {
+        throw new RuntimeException("Unexpected type: " + value.getSchema().getFullName());
+      }      
+    }
+            
+    GenericRecord newOutputValue = null;
+    GenericRecord oldOutputValue = null;
+    
+    if (accumulatedCount > 0)
+    {
+      newOutputValue = acc.getFinal();
+    }
+    
+    if (oldAccumulatedCount > 0)
+    {
+      oldOutputValue = accOld.getFinal();
+    }
+    
+    GenericRecord outputValue = null;
+    
+    if (previous == null)
+    {
+      outputValue = newOutputValue;
+      
+      if (oldOutputValue != null)
+      {
+        if (_oldMerger == null)
+        {
+          throw new RuntimeException("No old record merger set");
+        }
+        
+        outputValue = _oldMerger.merge(outputValue, oldOutputValue);
+      }
+    }
+    else
+    {
+      outputValue = previous;
+      
+      if (oldOutputValue != null)
+      {
+        if (_oldMerger == null)
+        {
+          throw new RuntimeException("No old record merger set");
+        }
+        
+        outputValue = _oldMerger.merge(outputValue, oldOutputValue);
+      }
+      
+      if (newOutputValue != null)
+      {
+        if (_merger == null)
+        {
+          throw new RuntimeException("No new record merger set");
+        }
+        
+        outputValue = _merger.merge(outputValue, newOutputValue);
+      }
+    }
+    
+    if (outputValue != null)
+    {
+      GenericRecord output = new GenericData.Record(getSchemas().getReduceOutputSchema());
+      output.put("key", key);
+      output.put("value", outputValue);
+      context.write(new AvroKey<GenericRecord>(output),null);
+    }
+  }
+  
+  /**
+   * Sets the Avro schemas.
+   * 
+   * @param schemas
+   */
+  public void setSchemas(PartitionCollapsingSchemas schemas)
+  {
+    _schemas = schemas;
+  }
+  
+  /**
+   * Gets the Avro schemas.
+   * 
+   * @return
+   */
+  private PartitionCollapsingSchemas getSchemas()
+  {
+    return _schemas;
+  }
+  
+  /**
+   * Gets whether previous output is being reused.
+   * 
+   * @return true if previous output is reused
+   */  
+  public boolean getReuseOutput()
+  {
+    return _reusePreviousOutput;
+  }
+  
+  /**
+   * Sets whether previous output is being reused.
+   * 
+   * @param reuseOutput true if previous output is reused
+   */
+  public void setReuseOutput(boolean reuseOutput)
+  {
+    _reusePreviousOutput = reuseOutput;
+  }
+  
+  public void setAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
+  {
+    _newAccumulator = cloneAccumulator(acc);
+    _oldAccumulator = cloneAccumulator(acc);
+  }
+  
+  public Accumulator<GenericRecord,GenericRecord> getNewAccumulator()
+  {
+    return _newAccumulator;
+  }
+  
+  public Accumulator<GenericRecord,GenericRecord> getOldAccumulator()
+  {
+    return _oldAccumulator;
+  }
+  
+  public void setRecordMerger(Merger<GenericRecord> merger)
+  {
+    _merger = merger;
+  }
+  
+  public void setOldRecordMerger(Merger<GenericRecord> merger)
+  {
+    _oldMerger = merger;
+  }
+  
+  public void setOutputDateRange(DateRange dateRange)
+  {
+    _beginTime = dateRange.getBeginDate().getTime();
+    _endTime = dateRange.getEndDate().getTime();
+  }
+  
+  /**
+   * Clone a {@link Accumulator} by serializing and deserializing it.
+   * 
+   * @param acc The accumulator to clone
+   * @return The clone accumulator
+   */
+  private Accumulator<GenericRecord,GenericRecord> cloneAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
+  {
+    try
+    {
+      // clone by serializing
+      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+      ObjectOutputStream objStream;
+      objStream = new ObjectOutputStream(outputStream);    
+      objStream.writeObject(acc);
+      objStream.close();
+      outputStream.close();
+      ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+      ObjectInputStream objInputStream = new ObjectInputStream(inputStream);
+      @SuppressWarnings("unchecked")
+      Accumulator<GenericRecord,GenericRecord> result = (Accumulator<GenericRecord,GenericRecord>)objInputStream.readObject();
+      objInputStream.close();
+      inputStream.close();
+      return result;
+    }
+    catch (IOException e)
+    {
+      throw new RuntimeException(e);
+    }
+    catch (ClassNotFoundException e)
+    {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingCombiner.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingCombiner.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingCombiner.java
new file mode 100644
index 0000000..8b7d574
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingCombiner.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * A Hadoop combiner which delegates to an implementation read from the distributed cache.
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class DelegatingCombiner extends Reducer<Object, Object, Object, Object> 
+{
+  private ObjectReducer processor;
+  
+  @Override
+  protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException
+  {
+    super.setup(context);
+    
+    if (context != null)
+    {
+      Configuration conf = context.getConfiguration();
+      
+      String path = conf.get(Parameters.COMBINER_IMPL_PATH);
+      
+      this.processor = (ObjectReducer)DistributedCacheHelper.readObject(conf, new Path(path));
+      this.processor.setContext(context);
+    }
+  }
+  
+  @Override
+  protected void reduce(Object key, Iterable<Object> values, Context context) throws java.io.IOException, java.lang.InterruptedException
+  {
+    this.processor.reduce(key, values, context);
+  }
+
+  @Override
+  protected void cleanup(Context context) throws IOException,
+      InterruptedException
+  {
+    this.processor.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingMapper.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingMapper.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingMapper.java
new file mode 100644
index 0000000..ea7a8ae
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingMapper.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * A Hadoop mapper which delegates to an implementation read from the distributed cache.
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class DelegatingMapper extends Mapper<Object, Object, Object, Object> 
+{
+  private ObjectMapper processor;
+  
+  @Override
+  protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException
+  {
+    super.setup(context);
+    
+    if (context != null)
+    {
+      Configuration conf = context.getConfiguration();
+      
+      String path = conf.get(Parameters.MAPPER_IMPL_PATH);
+      
+      this.processor = (ObjectMapper)DistributedCacheHelper.readObject(conf, new Path(path));
+      this.processor.setContext(context);
+    }
+  }
+  
+  @Override
+  protected void map(Object key, Object value, Context context) throws java.io.IOException, java.lang.InterruptedException
+  {
+    this.processor.map(key, context);
+  }
+
+  @Override
+  protected void cleanup(Context context) throws IOException,
+      InterruptedException
+  {
+    this.processor.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingReducer.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingReducer.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingReducer.java
new file mode 100644
index 0000000..6596a5f
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingReducer.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * A Hadoop reducer which delegates to an implementation read from the distributed cache.
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class DelegatingReducer extends Reducer<Object, Object, Object, Object> 
+{
+  private ObjectReducer processor;
+  
+  @Override
+  protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException
+  {
+    super.setup(context);
+    
+    if (context != null)
+    {
+      Configuration conf = context.getConfiguration();
+      
+      String path = conf.get(Parameters.REDUCER_IMPL_PATH);
+      
+      this.processor = (ObjectReducer)DistributedCacheHelper.readObject(conf, new Path(path));
+      this.processor.setContext(context);
+    }
+  }
+  
+  @Override
+  protected void reduce(Object key, Iterable<Object> values, Context context) throws java.io.IOException, java.lang.InterruptedException
+  {
+    this.processor.reduce(key, values, context);
+  }
+
+  @Override
+  protected void cleanup(Context context) throws IOException,
+      InterruptedException
+  {
+    this.processor.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java
new file mode 100644
index 0000000..78c8911
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Methods for working with the Hadoop distributed cache.
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class DistributedCacheHelper 
+{
+  /**
+   * Deserializes an object from a path in HDFS.
+   * 
+   * @param conf Hadoop configuration
+   * @param path Path to deserialize from
+   * @return Deserialized object
+   * @throws IOException
+   */
+  public static Object readObject(Configuration conf, org.apache.hadoop.fs.Path path) throws IOException
+  {
+    String localPath = null;
+    Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(conf);
+    for (Path localCacheFile : localCacheFiles)
+    {
+      if (localCacheFile.toString().endsWith(path.toString()))
+      {
+        localPath = localCacheFile.toString();
+        break;
+      }
+    }
+    if (localPath == null)
+    {
+      throw new RuntimeException("Could not find " + path + " in local cache");
+    }
+    FileInputStream inputStream = new FileInputStream(new File(localPath));
+    ObjectInputStream objStream = new ObjectInputStream(inputStream);
+    
+    try
+    {
+      try {
+        return objStream.readObject();
+      } catch (ClassNotFoundException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    finally
+    {
+      objStream.close();
+      inputStream.close();
+    }
+  }
+  
+  /**
+   * Serializes an object to a path in HDFS and adds the file to the distributed cache.
+   * 
+   * @param conf Hadoop configuration
+   * @param obj Object to serialize
+   * @param path Path to serialize object to
+   * @throws IOException
+   */
+  public static void writeObject(Configuration conf, Object obj, org.apache.hadoop.fs.Path path) throws IOException
+  {
+    FileSystem fs = FileSystem.get(conf);
+    FSDataOutputStream outputStream = fs.create(path, true);
+    ObjectOutputStream objStream = new ObjectOutputStream(outputStream);
+    objStream.writeObject(obj);
+    objStream.close();
+    outputStream.close();
+    DistributedCache.addCacheFile(path.toUri(), conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectMapper.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectMapper.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectMapper.java
new file mode 100644
index 0000000..9723b32
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectMapper.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.MapContext;
+
+/**
+ * Defines the interface for a mapper implementation that {@link DelegatingMapper} delegates to.
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public abstract class ObjectMapper extends ObjectProcessor {
+  public abstract void map(Object input,
+      MapContext<Object,Object,Object,Object> context) throws IOException,InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectProcessor.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectProcessor.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectProcessor.java
new file mode 100644
index 0000000..b4383c7
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectProcessor.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Base class for {@link ObjectMapper} and {@link ObjectReducer}.
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public abstract class ObjectProcessor
+{
+  private TaskInputOutputContext<Object,Object,Object,Object> context;
+  
+  public TaskInputOutputContext<Object,Object,Object,Object> getContext()
+  {
+    return this.context;
+  }
+  
+  public void setContext(TaskInputOutputContext<Object,Object,Object,Object> context)
+  {    
+    this.context = context;
+  }
+  
+  public void close() throws IOException, InterruptedException
+  {    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectReducer.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectReducer.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectReducer.java
new file mode 100644
index 0000000..786ca31
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectReducer.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.ReduceContext;
+
+/**
+ * Defines the interface for combiner and reducer implementations that {@link DelegatingCombiner} and
+ * {@link DelegatingReducer} delegate to.
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public abstract class ObjectReducer extends ObjectProcessor {
+  public abstract void reduce(Object key,
+      Iterable<Object> values,
+      ReduceContext<Object,Object,Object,Object> context) throws IOException,InterruptedException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/Parameters.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/Parameters.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/Parameters.java
new file mode 100644
index 0000000..2fc690e
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/Parameters.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+/**
+ * Parameters used by the jobs to pass configuration settings 
+ * to the mappers, combiners, and reducers.
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class Parameters 
+{
+  public static final String MAPPER_IMPL_PATH = "hourglass.mapper.impl.path"; 
+  public static final String REDUCER_IMPL_PATH = "hourglass.reducer.impl.path"; 
+  public static final String COMBINER_IMPL_PATH = "hourglass.combiner.impl.path";
+}