You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ja...@apache.org on 2014/05/18 21:59:45 UTC

[07/14] DATAFU-44: Migrate Hourglass to Gradle

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/test/java/datafu/hourglass/test/jobs/counting/PartitionPreservingIncrementalCountJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/test/java/datafu/hourglass/test/jobs/counting/PartitionPreservingIncrementalCountJob.java b/contrib/hourglass/test/java/datafu/hourglass/test/jobs/counting/PartitionPreservingIncrementalCountJob.java
deleted file mode 100644
index 73215d1..0000000
--- a/contrib/hourglass/test/java/datafu/hourglass/test/jobs/counting/PartitionPreservingIncrementalCountJob.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.test.jobs.counting;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-
-
-import datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob;
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.model.KeyValueCollector;
-import datafu.hourglass.model.Mapper;
-
-public abstract class PartitionPreservingIncrementalCountJob extends AbstractPartitionPreservingIncrementalJob
-{   
-  public String getIntermediateValueSchemaName()
-  {
-    return getClass().getName() + "IntermediateValue";
-  }
-  
-  public String getOutputValueSchemaName()
-  {
-    return getClass().getName() + "OutputValue";
-  }
-
-  @Override
-  public String getOutputSchemaName()
-  {
-    return getClass().getName() + "Output";
-  }
-
-  @Override
-  public String getOutputSchemaNamespace()
-  {
-    return getClass().getPackage().getName();
-  }
-  
-  @Override
-  public void config(Configuration conf)
-  {
-    super.config(conf);    
-    conf.set(AbstractCounter.COUNT_VALUE_SCHEMA_PARAM, getIntermediateValueSchema().toString());
-  }
-  
-  public PartitionPreservingIncrementalCountJob(String name, Properties props) throws IOException
-  {
-    super(name, props);
-  }
-  
-  
-  @Override
-  public Accumulator<GenericRecord,GenericRecord> getReducerAccumulator() 
-  {
-    return new CountAccumulator(getOutputValueSchema());
-  }
-  
-  @Override
-  public Accumulator<GenericRecord,GenericRecord> getCombinerAccumulator() 
-  {
-    return new CountAccumulator(getIntermediateValueSchema());
-  }
-  
-  @Override
-  public Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper()
-  {
-    return getCounter();
-  }
-  
-  protected abstract AbstractCounter getCounter();
-      
-  @Override
-  protected Schema getIntermediateValueSchema()
-  {
-    return getCountSchema(getIntermediateValueSchemaName());
-  }
-  
-  @Override
-  protected Schema getOutputValueSchema()
-  {
-    return getCountSchema(getOutputValueSchemaName());
-  }
-  
-  protected Schema getCountSchema(String name)
-  {
-    Schema s = Schema.createRecord(name, null, getOutputSchemaNamespace(), false);
-    List<Field> fields = Arrays.asList(new Field("count", Schema.create(Type.LONG), null, null));
-    s.setFields(fields);
-    return s;
-  }
-  
-  protected static abstract class AbstractCounter implements Mapper<GenericRecord,GenericRecord,GenericRecord>, Configurable
-  {       
-    private transient Schema _valueSchema;
-    private CountWriterImpl writer;
-    private Configuration conf;
-    
-    public static final String COUNT_VALUE_SCHEMA_PARAM = "incremental.count.task.value.schema"; 
-    
-    public AbstractCounter()
-    { 
-      _valueSchema = null;
-    }
-    
-    @Override
-    public Configuration getConf()
-    {
-      return this.conf;
-    }
-    
-    @Override
-    public void setConf(Configuration conf)
-    { 
-      this.conf = conf;
-      
-      if (conf != null)
-      {
-        _valueSchema = new Schema.Parser().parse(conf.get(COUNT_VALUE_SCHEMA_PARAM));
-        this.writer = new CountWriterImpl();
-      }
-    }
-    
-    protected abstract void count(GenericRecord record, CountWriter writer) throws IOException, InterruptedException;    
-    
-    @Override
-    public void map(GenericRecord record, KeyValueCollector<GenericRecord,GenericRecord> context) throws IOException, InterruptedException
-    {
-      this.writer.setContext(context);
-      count(record,this.writer);
-    }
-    
-    private class CountWriterImpl implements CountWriter
-    {
-      private KeyValueCollector<GenericRecord,GenericRecord> context;
-      private GenericRecord value;
-      
-      public CountWriterImpl()
-      {
-        value = new GenericData.Record(_valueSchema);        
-      }
-      
-      public void setContext(KeyValueCollector<GenericRecord,GenericRecord> context)
-      {
-        this.context = context;
-      }
-      
-      @Override
-      public void count(GenericRecord key) throws IOException, InterruptedException
-      {         
-        count(key,1L);
-      }
-
-      @Override
-      public void count(GenericRecord key, long count) throws IOException, InterruptedException
-      {
-        value.put("count", count);
-        context.collect(key, value);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/test/java/datafu/hourglass/test/util/DailyTrackingWriter.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/test/java/datafu/hourglass/test/util/DailyTrackingWriter.java b/contrib/hourglass/test/java/datafu/hourglass/test/util/DailyTrackingWriter.java
deleted file mode 100644
index 34c05be..0000000
--- a/contrib/hourglass/test/java/datafu/hourglass/test/util/DailyTrackingWriter.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.test.util;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class DailyTrackingWriter
-{
-  private final Path _outputPath;
-  private final Schema _schema;
-  private final FileSystem _fs;
-    
-  private DataFileWriter<GenericRecord> _dataWriter;
-  private OutputStream _outputStream;
-  
-  public DailyTrackingWriter(Path outputPath, Schema schema, FileSystem fs)
-  {
-    _outputPath = outputPath;
-    _schema = schema;
-    _fs = fs;
-  }
-  
-  public void open(int year, int month, int day) throws IOException
-  {
-    if (_dataWriter != null)
-    {
-      throw new RuntimeException("Already have data writer");
-    }
-  
-    Path dailyPath = _outputPath;
-    Path path = new Path(dailyPath,String.format("%04d/%02d/%02d",year,month,day));
-    
-    _outputStream = _fs.create(new Path(path, "part-00000.avro"));
-    
-    GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>();
-    _dataWriter = new DataFileWriter<GenericRecord>(writer);        
-    _dataWriter.create(_schema, _outputStream);
-  }
-  
-  public void append(GenericRecord record) throws IOException
-  {
-    if (_dataWriter == null)
-    {
-      throw new RuntimeException("No data writer");
-    }
-    _dataWriter.append(record);
-  }
-  
-  public void close() throws IOException
-  {
-    if (_dataWriter == null)
-    {
-      throw new RuntimeException("No data writer");
-    }
-    _dataWriter.close();
-    _outputStream.close();
-    _dataWriter = null;
-    _outputStream = null; 
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/test/java/datafu/hourglass/test/util/TimestampDataWriter.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/test/java/datafu/hourglass/test/util/TimestampDataWriter.java b/contrib/hourglass/test/java/datafu/hourglass/test/util/TimestampDataWriter.java
deleted file mode 100644
index c9ea8f9..0000000
--- a/contrib/hourglass/test/java/datafu/hourglass/test/util/TimestampDataWriter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.test.util;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class TimestampDataWriter
-{
-  private final Path _outputPath;
-  private final Schema _schema;
-  private final FileSystem _fs;
-    
-  private DataFileWriter<GenericRecord> _dataWriter;
-  private OutputStream _outputStream;
-  
-  public TimestampDataWriter(Path outputPath, Schema schema, FileSystem fs)
-  {
-    _outputPath = outputPath;
-    _schema = schema;
-    _fs = fs;
-  }
-  
-  public void open(int year, int month, int day) throws IOException
-  {
-    if (_dataWriter != null)
-    {
-      throw new RuntimeException("Already have data writer");
-    }
-  
-    Path path = new Path(_outputPath,String.format("%04d%02d%02d",year,month,day));
-    
-    _outputStream = _fs.create(new Path(path, "part-00000.avro"));
-    
-    GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>();
-    _dataWriter = new DataFileWriter<GenericRecord>(writer);        
-    _dataWriter.create(_schema, _outputStream);
-  }
-  
-  public void append(GenericRecord record) throws IOException
-  {
-    if (_dataWriter == null)
-    {
-      throw new RuntimeException("No data writer");
-    }
-    _dataWriter.append(record);
-  }
-  
-  public void close() throws IOException
-  {
-    if (_dataWriter == null)
-    {
-      throw new RuntimeException("No data writer");
-    }
-    _dataWriter.close();
-    _outputStream.close();
-    _dataWriter = null;
-    _outputStream = null; 
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/test_in_background.sh
----------------------------------------------------------------------
diff --git a/contrib/hourglass/test_in_background.sh b/contrib/hourglass/test_in_background.sh
deleted file mode 100755
index a89b531..0000000
--- a/contrib/hourglass/test_in_background.sh
+++ /dev/null
@@ -1,7 +0,0 @@
-#!/usr/bin/env bash
-
-echo "Starting test"
-nohup sh test.sh > test.log &
-sleep 1
-echo "Process ID:"
-cat test.pid

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/.gitignore
----------------------------------------------------------------------
diff --git a/datafu-hourglass/.gitignore b/datafu-hourglass/.gitignore
new file mode 100644
index 0000000..685bfd2
--- /dev/null
+++ b/datafu-hourglass/.gitignore
@@ -0,0 +1 @@
+test-logs/

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/README.md
----------------------------------------------------------------------
diff --git a/datafu-hourglass/README.md b/datafu-hourglass/README.md
new file mode 100644
index 0000000..31d2361
--- /dev/null
+++ b/datafu-hourglass/README.md
@@ -0,0 +1,312 @@
+# DataFu: Hourglass
+
+Hourglass is a framework for incrementally processing partitioned data sets in Hadoop.  
+
+## Quick Start Example
+
+Let's walk through a use case where Hourglass is helpful.  Suppose that we have a website that tracks a particular event,
+and for each event a member ID is recorded.  These events are collected and stored in HDFS in Avro under paths having the
+format /data/event/yyyy/MM/dd.  Suppose for this example our Avro schema is:
+
+```json
+{
+  "type" : "record",
+  "name" : "ExampleEvent",
+  "namespace" : "datafu.hourglass.test",
+  "fields" : [ {
+    "name" : "id",
+    "type" : "long",
+    "doc" : "ID"
+  } ]
+}
+```
+
+Suppose that the goal is to count how many times this event has occurred per member over the entire history and produce
+a daily report summarizing these counts.  One solution is to simply consume all data under /data/event each day and 
+aggregate by member ID.  However, most of the data is the same day to day.  The only difference is that each day a new
+day of data appears in HDFS.  So while this solution works, it is wasteful.  Wouldn't it be better if we could merge
+the previous result with the new data?  With Hourglass you can.
+
+To continue our example, let's say there are two days of data currently available, 2013/03/15 and 2013/03/16, and that
+their contents are:
+
+```
+2013/03/15:
+{"id": 1}
+{"id": 1}
+{"id": 1}
+{"id": 2}
+{"id": 3}
+{"id": 3}
+
+2013/03/16:
+{"id": 1}
+{"id": 1}
+{"id": 2}
+{"id": 2}
+{"id": 3}
+```
+
+Let's aggregate the counts by member ID using Hourglass.  To perform the aggregation we will use `PartitionCollapsingIncrementalJob`,
+which essentially takes a partitioned data set like the one we have and collapses all the partitions together into a single output.
+
+```Java
+PartitionCollapsingIncrementalJob job = new PartitionCollapsingIncrementalJob(Example.class);
+```
+
+Next we will define the schemas for the key and value used by the job.  The key affects how data is grouped in the reducer when
+we perform the aggregation.  In this case it will be the member ID.  The value is the piece of data being aggregated, which will
+be an integer representing the count in this case.  Hourglass uses Avro for its data types.  Let's define the schemas:
+
+```Java
+final String namespace = "com.example";
+
+final Schema keySchema = Schema.createRecord("Key",null,namespace,false);
+keySchema.setFields(Arrays.asList(new Field("member_id",Schema.create(Type.LONG),null,null)));
+final String keySchemaString = keySchema.toString(true);
+
+final Schema valueSchema = Schema.createRecord("Value",null,namespace,false);
+valueSchema.setFields(Arrays.asList(new Field("count",Schema.create(Type.INT),null,null)));
+final String valueSchemaString = valueSchema.toString(true);
+```
+
+This produces schemas having the following representation:
+
+```json
+{
+  "type" : "record",
+  "name" : "Key",
+  "namespace" : "com.example",
+  "fields" : [ {
+    "name" : "member_id",
+    "type" : "long"
+  } ]
+}
+
+{
+  "type" : "record",
+  "name" : "Value",
+  "namespace" : "com.example",
+  "fields" : [ {
+    "name" : "count",
+    "type" : "int"
+  } ]
+}
+```
+
+Now we can tell the job what our schemas are.  Hourglass allows two different value types.  One is the intermediate value type
+that is output by the mapper and combiner.  The other is the output value type, the output of the reducer.  In this case we
+will use the same value type for each.
+
+```Java
+job.setKeySchema(keySchema);
+job.setIntermediateValueSchema(valueSchema);
+job.setOutputValueSchema(valueSchema);
+```
+
+Next we will tell Hourglass where to find the data, where to write the data, and that we want to reuse the previous output.
+
+```Java
+job.setInputPaths(Arrays.asList(new Path("/data/event")));
+job.setOutputPath(new Path("/output"));
+job.setReusePreviousOutput(true);
+```
+
+Now let's get into some application logic.  The mapper will produce a key-value pair from each record consisting of
+the member ID and a count, which for each input record will just be `1`.
+
+```Java
+job.setMapper(new Mapper<GenericRecord,GenericRecord,GenericRecord>() 
+{
+  private transient Schema kSchema;
+  private transient Schema vSchema;
+  
+  @Override
+  public void map(GenericRecord input,
+                  KeyValueCollector<GenericRecord, GenericRecord> collector) throws IOException,
+      InterruptedException
+  {
+    if (kSchema == null) kSchema = new Schema.Parser().parse(keySchemaString);
+    if (vSchema == null) vSchema = new Schema.Parser().parse(valueSchemaString);
+    GenericRecord key = new GenericData.Record(kSchema);
+    key.put("member_id", input.get("id"));
+    GenericRecord value = new GenericData.Record(vSchema);
+    value.put("count", 1);
+    collector.collect(key,value);
+  }      
+});
+```
+
+An accumulator is responsible for aggregating this data.  Records will be grouped by member and then passed to the accumulator
+one-by-one.  The accumulator keeps a running count and adds each input count to it.  When all data has been passed to it
+the `getFinal()` method will be called, which returns the output record containing the count.
+
+```Java
+job.setReducerAccumulator(new Accumulator<GenericRecord,GenericRecord>() 
+{
+  private transient int count;
+  private transient Schema vSchema;
+  
+  @Override
+  public void accumulate(GenericRecord value)
+  {
+    this.count += (Integer)value.get("count");
+  }
+
+  @Override
+  public GenericRecord getFinal()
+  {
+    if (vSchema == null) vSchema = new Schema.Parser().parse(valueSchemaString);
+    GenericRecord output = new GenericData.Record(vSchema);
+    output.put("count", count);
+    return output;
+  }
+
+  @Override
+  public void cleanup()
+  {
+    this.count = 0;
+  }      
+});
+```
+
+Since the intermediate and output values have the same schema, the accumulator can also be used for the combiner,
+so let's indicate that we want it to be used for that:
+
+```Java
+job.setCombinerAccumulator(job.getReducerAccumulator());
+job.setUseCombiner(true);
+```
+
+Finally, we run the job.
+
+```Java
+job.run();
+```
+
+When we inspect the output we find that the counts match what we expect:
+
+```
+{"key": {"member_id": 1}, "value": {"count": 5}}
+{"key": {"member_id": 2}, "value": {"count": 3}}
+{"key": {"member_id": 3}, "value": {"count": 3}}
+```
+
+Now suppose that a new day of data becomes available:
+
+```
+2013/03/17:
+{"id": 1}
+{"id": 1}
+{"id": 2}
+{"id": 2}
+{"id": 2}
+{"id": 3}
+{"id": 3}
+```
+
+Let's run the job again.
+Since Hourglass already has a result for the previous day, it consumes the new day of input and the previous output, rather
+than all the input data it already processed.  The previous output is passed to the accumulator implementation where it is
+aggregated with the new data.  This produces the output we expect:
+
+```json
+{"key": {"member_id": 1}, "value": {"count": 7}}
+{"key": {"member_id": 2}, "value": {"count": 6}}
+{"key": {"member_id": 3}, "value": {"count": 5}}
+```
+
+In this example we only have a few days of input data, so the impact of incrementally processing the new data is small.
+However, as the size of the input data grows, the benefit of incrementally processing data becomes very significant.
+
+## Motivation
+
+Data sets that are temporal in nature very often are stored in such a way that each directory corresponds to a separate
+time range.  For example, one convention could be to divide the data by day.  One benefit of a partitioning scheme such 
+as this is that it makes it possible to consume a subset of the data for specific time ranges, instead of consuming
+the entire data set.
+
+Very often computations on data such as this are performed daily over sliding windows.  For example, a metric of interest
+may be the last time each member logged into the site.  The most straightforward implementation is to consume all login events
+across all days.  This is inefficient however since day-to-day the input data is mostly the same.  
+A more efficient solution is to merge the previous output with new data since the last run.  As a result there is less 
+data to process.  Another metric of interest may be the number of pages viewed per member over the last 30 days.  
+A straightforward implementation is to consume the page view data over the last 30 days each time the job runs.
+However, again, the input data is mostly the same day-to-day.
+Instead, given the previous output of the job, the new output can be produced by adding the new data and subtracting the old data.
+
+Although these incremental jobs are conceptually easy to understand, the implementations can be complex.  
+Hourglass defines an easy-to-use programming model and provides jobs for incrementally processing partitioned data as just described.
+It handles the underlying details and complexity of an incremental system so that programmers can focus on
+application logic.
+
+## Capabilities
+
+Hourglass uses Avro for input, intermediate, and output data.  Input data must be partitioned by day according to the
+naming convention yyyy/MM/dd.  Joining multiple inputs is supported.
+
+Hourglass provides two types of jobs: partition-preserving and partition-collapsing.  A *partition-preserving* job 
+consumes input data partitioned by day and produces output data partitioned by day. This is equivalent to running a 
+MapReduce job for each individual day of input data, but much more efficient.  It compares the input data against 
+the existing output data and only processes input data with no corresponding output. A *partition-collapsing* job 
+consumes input data partitioned by day and produces a single output.  What distinguishes this job from a standard 
+MapReduce job is that it can reuse the previous output.  This enables it to process data much more efficiently.  
+Rather than consuming all input data on each run, it can consume only the new data since the previous run and 
+merges it with the previous output.  Since the partition-preserving job output partitioned data, the two jobs
+can be chained together.
+
+Given these two jobs, processing data over sliding windows can be done much more efficiently.  There are two types
+of sliding windows that are of particular interest that can be implemented using Hourglass.  
+A *fixed-start* sliding window has a start date that remains the same over multiple runs and an end date that is 
+flexible, where the end date advances forward as new input data becomes available.
+A *fixed-length* sliding window has a window length that remains the same over multiple runs and flexible start
+and end dates, where the start and end advance forward as new input data becomes available.
+Hourglass makes defining these sliding windows easy.
+
+An example of a fixed-start sliding window problem is computing the last login time for all members of a website
+using a login event that records the login time.  This could be solved efficiently by using a partition-collapsing
+job, which is capable of reusing the previous output and merging it with new login data as it arrives.
+
+An example of a fixed-length sliding window problem is computing the pages viewed per member of the last 30 days
+using a page-view event.  This could also be solved efficiently using a partition-collapsing job, which is
+capable of reusing the previous output and merging it with the new page-view data while subtracting off the old
+page-view data.
+
+For some problems it is not possible to subtract off the oldest day of data for a fixed-length sliding window problem.
+For example, suppose the goal is to estimate the distinct number of members who have logged into a website in the last
+30 days using a login event that records the member ID.  A HyperLogLog counter could be used to estimate the cardinality.
+The internal data for this counter could be serialized to bytes and stored as output alongside the estimated count.
+However, although multiple HyperLogLog counters can be merged together, they cannot be subtracted or unmerged.
+In other words the operation is not reversible.  So a partition-collapsing job by itself could not be used.
+However we could chain together a partition-preserving and partition-collapsing job.  The first job would estimate cardinality
+per day and store the value with the counter's byte representation.  The second job would merge the per day counters
+together to produce the estimate over the full 30 day window.  This makes the computation extremely efficient.
+
+## Programming Model
+
+To implement an incremental job, a developer must specify Avro schemas for the key, intermediate value, and output value types.
+The key and intermediate value types are used for the output of the mapper and an optional combiner.  The key and output value
+types are used for the output of the reducer.  The input schemas are automatically determined by the job by inspecting the
+input data.
+
+A developer must also define their application logic through interfaces that are based on the MapReduce programming model.
+The mapper is defined through a *Mapper* interface, which given a record produces zero or more key-value pairs.
+The key-value pairs must conform to the key and intermediate value schemas just mentioned.
+
+Reduce is defined through an *Accumulator* interface, which is passed all the records for a given key and then returns
+the final result.  Both the combiner and reducer use an Accumulator for the reduce functionality.
+This is similar to the standard reduce function of MapReduce.  The key difference is that
+no more than one record can be returned.  The input records to the Accumulator are of the intermediate value type; 
+the ouput is of the output value type.  
+
+If the intermediate and output value types are the same then the Accumulator can
+naturally be used to merge the new input data with the previous output.  However if they are different then a class implementing
+the *Merge* interface must be provided.  Merge is a a binary operation on two records of the output value type that returns a 
+record as a result.
+
+The other case where an implementation of Merge must be provided is when output is reused for a partition-collapsing job
+over a fixed-length sliding window.  Merge in this case is used to essentially subtract old output from the current output.
+
+## Contribute
+
+The source code is available under the Apache 2.0 license.  Contributions are welcome.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/build.gradle
----------------------------------------------------------------------
diff --git a/datafu-hourglass/build.gradle b/datafu-hourglass/build.gradle
new file mode 100644
index 0000000..9cc2b99
--- /dev/null
+++ b/datafu-hourglass/build.gradle
@@ -0,0 +1,123 @@
+apply plugin: 'java'
+apply plugin: 'license'
+apply plugin: 'maven-publish'
+
+import groovy.xml.MarkupBuilder
+
+buildscript {
+  repositories {
+      mavenCentral()
+  }
+  dependencies {
+  }
+}
+
+cleanEclipse { 
+  doLast { 
+    delete ".apt_generated"
+    delete ".settings"
+    delete ".factorypath"
+    delete "bin"
+  }
+}
+
+task sourcesJar(type: Jar) {
+  description 'Creates the sources jar'
+
+  classifier = 'sources'
+  from sourceSets.main.allJava
+}
+
+task javadocJar(type: Jar, dependsOn: javadoc) {
+  description 'Creates the javadoc jar'
+
+  classifier = 'javadoc'
+  from javadoc.destinationDir
+}
+
+artifacts {
+  archives sourcesJar
+  archives javadocJar
+  archives jar
+}
+
+// Note: alternate way to publish: https://github.com/Netflix/gradle-template
+
+publishing {
+  publications {
+    mavenJava(MavenPublication) {
+      artifact sourcesJar
+      artifact javadocJar
+      artifact jar
+
+      pom.withXml {
+        asNode().appendNode("packaging","jar")
+        asNode().appendNode("name","Apache DataFu Hourglass")        
+        asNode().appendNode("description","A framework for incrementally processing data in Hadoop.")
+        asNode().appendNode("url","http://datafu.incubator.apache.org/")
+
+        def licenseNode = asNode().appendNode("licenses").appendNode("license")
+        licenseNode.appendNode("name","The Apache Software License, Version 2.0")
+        licenseNode.appendNode("url","http://www.apache.org/licenses/LICENSE-2.0.txt")
+        licenseNode.appendNode("distribution","repo")
+
+        def dependenciesNode = asNode().appendNode("dependencies")
+        def dependency = dependenciesNode.appendNode("dependency")
+        dependency.appendNode("groupId","log4j")
+        dependency.appendNode("artifactId","log4j")
+        dependency.appendNode("version","$log4jVersion")
+
+        dependency = dependenciesNode.appendNode("dependency")
+        dependency.appendNode("groupId","org.apache.avro")
+        dependency.appendNode("artifactId","avro")
+        dependency.appendNode("version","$avroVersion")
+
+        dependency = dependenciesNode.appendNode("dependency")
+        dependency.appendNode("groupId","org.apache.avro")
+        dependency.appendNode("artifactId","avro-mapred")
+        dependency.appendNode("version","$avroVersion")
+
+        dependency = dependenciesNode.appendNode("dependency")
+        dependency.appendNode("groupId","org.apache.avro")
+        dependency.appendNode("artifactId","avro-compiler")
+        dependency.appendNode("version","$avroVersion")
+      }
+    }
+  }
+}
+
+// create tasks to automatically add the license header
+license {
+  header rootProject.file('HEADER')
+  skipExistingHeaders = true
+}
+
+dependencies {
+  // core dependencies, listed as dependencies in pom
+  compile "log4j:log4j:$log4jVersion"
+  compile "org.json:json:$jsonVersion"
+  compile "org.apache.avro:avro:$avroVersion"
+  compile "org.apache.avro:avro-mapred:$avroVersion"
+  compile "org.apache.avro:avro-compiler:$avroVersion"
+
+  // needed for compilation and testing, not listed as a dependencies in pom
+  compile "org.apache.hadoop:hadoop-core:$hadoopVersion"
+  compile "org.apache.hadoop:hadoop-tools:$hadoopVersion"
+
+  // needed for testing, not listed as a dependencies in pom
+  testCompile "org.apache.hadoop:hadoop-test:$hadoopVersion"
+  testCompile "com.clearspring.analytics:stream:$streamVersion"
+  testCompile "javax.ws.rs:jsr311-api:$jsr311Version"
+  testCompile "org.slf4j:slf4j-log4j12:$slf4jVersion"
+  testCompile "commons-io:commons-io:$commonsIoVersion"
+  testCompile "org.apache.avro:avro-tools:$avroVersion"
+  testCompile "org.testng:testng:$testngVersion"
+}
+
+
+test {
+  // enable TestNG support (default is JUnit)
+  useTestNG()
+
+  maxHeapSize = "2G"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/changes.md
----------------------------------------------------------------------
diff --git a/datafu-hourglass/changes.md b/datafu-hourglass/changes.md
new file mode 100644
index 0000000..28c78a0
--- /dev/null
+++ b/datafu-hourglass/changes.md
@@ -0,0 +1,22 @@
+# 0.1.3
+
+Fixes:
+
+* Fix bug reusing output in execution planner.  It now subtracts off the correct old inputs.
+* When reusing output, execution planner now considers whether it is better than not reusing.
+
+# 0.1.2
+
+Fixes:
+
+* Correctly load schema for input paths.
+
+# 0.1.1
+
+Additions:
+
+* Jobs now support joins on inconsistent schemas (Issue #67)
+
+# 0.1.0
+
+Initial release.

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/overview.html
----------------------------------------------------------------------
diff --git a/datafu-hourglass/overview.html b/datafu-hourglass/overview.html
new file mode 100644
index 0000000..f976ab9
--- /dev/null
+++ b/datafu-hourglass/overview.html
@@ -0,0 +1,3 @@
+<body>
+A framework for incrementally processing data in Hadoop.
+</body>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroDateRangeMetadata.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroDateRangeMetadata.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroDateRangeMetadata.java
new file mode 100644
index 0000000..0136ee9
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroDateRangeMetadata.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.avro;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import datafu.hourglass.fs.DateRange;
+import datafu.hourglass.fs.PathUtils;
+
+/**
+ * Manages the storage and retrieval of date ranges in the metadata of Avro files.
+ * This is used by {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob} so that when reusing previous
+ * output it can determine the date range the data corresponds to.
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class AvroDateRangeMetadata
+{
+  public static String METADATA_DATE_START = "hourglass.date.start";
+  public static String METADATA_DATE_END = "hourglass.date.end";
+  
+  /**
+   * Reads the date range from the metadata stored in an Avro file.
+   * 
+   * @param fs file system to access path
+   * @param path path to get date range for
+   * @return date range
+   * @throws IOException
+   */
+  public static DateRange getOutputFileDateRange(FileSystem fs, Path path) throws IOException
+  {
+    path = fs.listStatus(path, PathUtils.nonHiddenPathFilter)[0].getPath();
+    FSDataInputStream dataInputStream = fs.open(path);
+    DatumReader <GenericRecord> reader = new GenericDatumReader<GenericRecord>();
+    DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(dataInputStream, reader);
+    
+    try
+    {
+      return new DateRange(new Date(Long.parseLong(dataFileStream.getMetaString(METADATA_DATE_START))),
+                           new Date(Long.parseLong(dataFileStream.getMetaString(METADATA_DATE_END))));
+    }
+    finally
+    {
+      dataFileStream.close();
+      dataInputStream.close();
+    }
+  }
+  
+  /**
+   * Updates the Hadoop configuration so that the Avro files which are written have date range
+   * information stored in the metadata.  This should be used in conjunction with 
+   * {@link AvroKeyValueWithMetadataRecordWriter}.
+   * 
+   * @param conf configuration to store date range in
+   * @param dateRange date range
+   */
+  public static void configureOutputDateRange(Configuration conf, DateRange dateRange)
+  {
+    // store the date range in the output file's metadata
+    conf.set(AvroKeyValueWithMetadataRecordWriter.TEXT_PREFIX + METADATA_DATE_START, Long.toString(dateRange.getBeginDate().getTime()));
+    conf.set(AvroKeyValueWithMetadataRecordWriter.TEXT_PREFIX + METADATA_DATE_END, Long.toString(dateRange.getEndDate().getTime()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyValueWithMetadataOutputFormat.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyValueWithMetadataOutputFormat.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyValueWithMetadataOutputFormat.java
new file mode 100644
index 0000000..4574c69
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyValueWithMetadataOutputFormat.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package datafu.hourglass.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.hadoop.io.AvroDatumConverter;
+import org.apache.avro.hadoop.io.AvroDatumConverterFactory;
+import org.apache.avro.mapreduce.AvroOutputFormatBase;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * FileOutputFormat for writing Avro container files of key/value pairs.
+ *
+ * <p>Since Avro container files can only contain records (not key/value pairs), this
+ * output format puts the key and value into an Avro generic record with two fields, named
+ * 'key' and 'value'.</p>
+ *
+ * <p>The keys and values given to this output format may be Avro objects wrapped in
+ * <code>AvroKey</code> or <code>AvroValue</code> objects.  The basic Writable types are
+ * also supported (e.g., IntWritable, Text); they will be converted to their corresponding
+ * Avro types.</p>
+ *
+ * @param <K> The type of key. If an Avro type, it must be wrapped in an <code>AvroKey</code>.
+ * @param <V> The type of value. If an Avro type, it must be wrapped in an <code>AvroValue</code>.
+ */
+public class AvroKeyValueWithMetadataOutputFormat<K, V> extends AvroOutputFormatBase<K, V> {
+  /** {@inheritDoc} */
+  @Override
+  @SuppressWarnings("unchecked")
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException {
+    AvroDatumConverterFactory converterFactory = new AvroDatumConverterFactory(
+        context.getConfiguration());
+
+    AvroDatumConverter<K, ?> keyConverter = converterFactory.create(
+        (Class<K>) context.getOutputKeyClass());
+    AvroDatumConverter<V, ?> valueConverter = converterFactory.create(
+        (Class<V>) context.getOutputValueClass());
+    
+    return new AvroKeyValueWithMetadataRecordWriter<K, V>(keyConverter, valueConverter,
+        getCompressionCodec(context), getAvroFileOutputStream(context), context.getConfiguration());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyValueWithMetadataRecordWriter.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyValueWithMetadataRecordWriter.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyValueWithMetadataRecordWriter.java
new file mode 100644
index 0000000..4511d24
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyValueWithMetadataRecordWriter.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package datafu.hourglass.avro;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map.Entry;
+
+import org.apache.avro.Schema;
+import org.apache.avro.hadoop.io.AvroKeyValue;
+import org.apache.avro.hadoop.io.AvroDatumConverter;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Writes key/value pairs to an Avro container file.
+ *
+ * <p>Each entry in the Avro container file will be a generic record with two fields,
+ * named 'key' and 'value'.  The input types may be basic Writable objects like Text or
+ * IntWritable, or they may be AvroWrapper subclasses (AvroKey or AvroValue).  Writable
+ * objects will be converted to their corresponding Avro types when written to the generic
+ * record key/value pair.</p>
+ *
+ * @param <K> The type of key to write.
+ * @param <V> The type of value to write.
+ */
+public class AvroKeyValueWithMetadataRecordWriter<K, V> extends RecordWriter<K, V> {
+  /** A writer for the Avro container file. */
+  private final DataFileWriter<GenericRecord> mAvroFileWriter;
+
+  /** The writer schema for the generic record entries of the Avro container file. */
+  private final Schema mKeyValuePairSchema;
+
+  /** A reusable Avro generic record for writing key/value pairs to the file. */
+  private final AvroKeyValue<Object, Object> mOutputRecord;
+
+  /** A helper object that converts the input key to an Avro datum. */
+  private final AvroDatumConverter<K, ?> mKeyConverter;
+
+  /** A helper object that converts the input value to an Avro datum. */
+  private final AvroDatumConverter<V, ?> mValueConverter;
+
+  /** The configuration key prefix for a text output metadata. */
+  public static final String TEXT_PREFIX = "avro.meta.text.";
+  
+  public AvroKeyValueWithMetadataRecordWriter(AvroDatumConverter<K, ?> keyConverter,
+      AvroDatumConverter<V, ?> valueConverter, CodecFactory compressionCodec,
+      OutputStream outputStream, Configuration conf) throws IOException {
+    // Create the generic record schema for the key/value pair.
+    mKeyValuePairSchema = AvroKeyValue.getSchema(
+        keyConverter.getWriterSchema(), valueConverter.getWriterSchema());
+
+    // Create an Avro container file and a writer to it.
+    mAvroFileWriter = new DataFileWriter<GenericRecord>(
+        new ReflectDatumWriter<GenericRecord>(mKeyValuePairSchema));
+    mAvroFileWriter.setCodec(compressionCodec);
+    
+    for (Entry<String,String> e : conf)
+    {
+      if (e.getKey().startsWith(TEXT_PREFIX))
+        mAvroFileWriter.setMeta(e.getKey().substring(TEXT_PREFIX.length()),
+                                e.getValue());
+    }
+    
+    mAvroFileWriter.create(mKeyValuePairSchema, outputStream);
+
+    // Keep a reference to the converters.
+    mKeyConverter = keyConverter;
+    mValueConverter = valueConverter;
+
+    // Create a reusable output record.
+    mOutputRecord = new AvroKeyValue<Object, Object>(new GenericData.Record(mKeyValuePairSchema));
+  }
+
+  /**
+   * Gets the writer schema for the key/value pair generic record.
+   *
+   * @return The writer schema used for entries of the Avro container file.
+   */
+  public Schema getWriterSchema() {
+    return mKeyValuePairSchema;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void write(K key, V value) throws IOException {
+    mOutputRecord.setKey(mKeyConverter.convert(key));
+    mOutputRecord.setValue(mValueConverter.convert(value));
+    mAvroFileWriter.append(mOutputRecord.get());
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close(TaskAttemptContext context) throws IOException {
+    mAvroFileWriter.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyWithMetadataOutputFormat.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyWithMetadataOutputFormat.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyWithMetadataOutputFormat.java
new file mode 100644
index 0000000..b37c559
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyWithMetadataOutputFormat.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package datafu.hourglass.avro;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroOutputFormatBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * FileOutputFormat for writing Avro container files.
+ *
+ * <p>Since Avro container files only contain records (not key/value pairs), this output
+ * format ignores the value.</p>
+ *
+ * @param <T> The (java) type of the Avro data to write.
+ */
+public class AvroKeyWithMetadataOutputFormat<T> extends AvroOutputFormatBase<AvroKey<T>, NullWritable> {
+  /** A factory for creating record writers. */
+  @SuppressWarnings("rawtypes")
+  private final RecordWriterFactory mRecordWriterFactory;
+
+  /**
+   * Constructor.
+   */
+  @SuppressWarnings("rawtypes")
+  public AvroKeyWithMetadataOutputFormat() {
+    this(new RecordWriterFactory());
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param recordWriterFactory A factory for creating record writers.
+   */
+  protected AvroKeyWithMetadataOutputFormat(@SuppressWarnings("rawtypes") RecordWriterFactory recordWriterFactory) {
+    mRecordWriterFactory = recordWriterFactory;
+  }
+
+  /**
+   * A factory for creating record writers.
+   *
+   * @param <T> The java type of the avro record to write.
+   */
+  protected static class RecordWriterFactory<T> {
+    /**
+     * Creates a new record writer instance.
+     *
+     * @param writerSchema The writer schema for the records to write.
+     * @param compressionCodec The compression type for the writer file.
+     * @param outputStream The target output stream for the records.
+     */
+    protected RecordWriter<AvroKey<T>, NullWritable> create(
+        Schema writerSchema, CodecFactory compressionCodec, OutputStream outputStream, Configuration conf)
+        throws IOException {
+      return new AvroKeyWithMetadataRecordWriter<T>(writerSchema, compressionCodec, outputStream, conf);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  @SuppressWarnings("unchecked")
+  public RecordWriter<AvroKey<T>, NullWritable> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    // Get the writer schema.
+    Schema writerSchema = AvroJob.getOutputKeySchema(context.getConfiguration());
+    if (null == writerSchema) {
+      throw new IOException(
+          "AvroKeyOutputFormat requires an output schema. Use AvroJob.setOutputKeySchema().");
+    }
+
+    return mRecordWriterFactory.create(
+        writerSchema, getCompressionCodec(context), getAvroFileOutputStream(context), context.getConfiguration());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyWithMetadataRecordWriter.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyWithMetadataRecordWriter.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyWithMetadataRecordWriter.java
new file mode 100644
index 0000000..0e61d87
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyWithMetadataRecordWriter.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package datafu.hourglass.avro;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map.Entry;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Writes Avro records to an Avro container file output stream.
+ *
+ * @param <T> The Java type of the Avro data to write.
+ */
+public class AvroKeyWithMetadataRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable> {
+  /** A writer for the Avro container file. */
+  private final DataFileWriter<T> mAvroFileWriter;
+
+  /** The configuration key prefix for a text output metadata. */
+  public static final String TEXT_PREFIX = "avro.meta.text.";
+  
+  /**
+   * Constructor.
+   *
+   * @param writerSchema The writer schema for the records in the Avro container file.
+   * @param compressionCodec A compression codec factory for the Avro container file.
+   * @param outputStream The output stream to write the Avro container file to.
+   * @throws IOException If the record writer cannot be opened.
+   */
+  public AvroKeyWithMetadataRecordWriter(Schema writerSchema, CodecFactory compressionCodec,
+      OutputStream outputStream, Configuration conf) throws IOException {
+    // Create an Avro container file and a writer to it.
+    mAvroFileWriter = new DataFileWriter<T>(new ReflectDatumWriter<T>(writerSchema));
+    mAvroFileWriter.setCodec(compressionCodec);
+    
+    for (Entry<String,String> e : conf)
+    {
+      if (e.getKey().startsWith(TEXT_PREFIX))
+        mAvroFileWriter.setMeta(e.getKey().substring(TEXT_PREFIX.length()),
+                                e.getValue());
+    }
+    
+    mAvroFileWriter.create(writerSchema, outputStream);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void write(AvroKey<T> record, NullWritable ignore) throws IOException {
+    mAvroFileWriter.append(record.datum());
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close(TaskAttemptContext context) throws IOException {
+    mAvroFileWriter.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroMultipleInputsKeyInputFormat.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroMultipleInputsKeyInputFormat.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroMultipleInputsKeyInputFormat.java
new file mode 100644
index 0000000..733826f
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroMultipleInputsKeyInputFormat.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroKeyRecordReader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * A MapReduce InputFormat that can handle Avro container files and multiple inputs.
+ * The input schema is determine based on the split.  The mapping from input path
+ * to schema is stored in the job configuration.
+ *
+ * <p>Keys are AvroKey wrapper objects that contain the Avro data.  Since Avro
+ * container files store only records (not key/value pairs), the value from
+ * this InputFormat is a NullWritable.</p>
+ */
+public class AvroMultipleInputsKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> 
+{  
+  /** {@inheritDoc} */
+  @Override
+  public RecordReader<AvroKey<T>, NullWritable> createRecordReader(
+      InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException 
+  {    
+    Schema readerSchema = AvroMultipleInputsUtil.getInputKeySchemaForSplit(context.getConfiguration(), split);
+    if (readerSchema == null)
+    {
+      throw new RuntimeException("Could not determine input schema");
+    }
+    return new AvroKeyRecordReader<T>(readerSchema);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroMultipleInputsUtil.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroMultipleInputsUtil.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroMultipleInputsUtil.java
new file mode 100644
index 0000000..8afe192
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroMultipleInputsUtil.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.avro;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.log4j.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Helper methods for dealing with multiple Avro input schemas.  A mapping is stored in the configuration
+ * that maps each input path to its corresponding schema.  Methods in this class help with loading and
+ * storing these schema mappings.
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class AvroMultipleInputsUtil
+{
+  private static final Logger _log = Logger.getLogger(AvroMultipleInputsUtil.class);
+  
+  private static final String CONF_INPUT_KEY_SCHEMA = "avro.schema.multiple.inputs.keys";
+  
+  /**
+   * Gets the schema for a particular input split. 
+   * 
+   * @param conf configuration to get schema from
+   * @param split input split to get schema for
+   * @return schema
+   */
+  public static Schema getInputKeySchemaForSplit(Configuration conf, InputSplit split) 
+  {
+    String path = ((FileSplit)split).getPath().toString();
+    _log.info("Determining schema for input path " + path);
+    JSONObject schemas;
+    try
+    {
+      schemas = getInputSchemas(conf);
+    }
+    catch (JSONException e1)
+    {
+      throw new RuntimeException(e1);
+    }
+    Schema schema = null;
+    if (schemas != null)
+    {
+      for (String key : JSONObject.getNames(schemas))
+      {
+        _log.info("Checking against " + key);
+        if (path.startsWith(key))
+        {
+          try
+          {
+            schema = new Schema.Parser().parse(schemas.getString(key));
+            _log.info("Input schema found: " + schema.toString(true));
+            break;
+          }
+          catch (JSONException e)
+          {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }
+    if (schema == null)
+    {
+      _log.info("Could not determine input schema");
+    }
+    return schema;
+  }
+  
+  /**
+   * Sets the job input key schema for a path.
+   *
+   * @param job The job to configure.
+   * @param schema The input key schema.
+   * @param path the path to set the schema for
+   */
+  public static void setInputKeySchemaForPath(Job job, Schema schema, String path) 
+  { 
+    JSONObject schemas;    
+    try
+    {
+      schemas = getInputSchemas(job.getConfiguration());
+      schemas.put(path, schema.toString());
+    }
+    catch (JSONException e)
+    {
+      throw new RuntimeException(e);
+    }         
+    job.getConfiguration().set(CONF_INPUT_KEY_SCHEMA, schemas.toString());
+  }
+  
+  /**
+   * Get a mapping from path to input schema.
+   * 
+   * @param conf
+   * @return mapping from path to input schem
+   * @throws JSONException
+   */
+  private static JSONObject getInputSchemas(Configuration conf) throws JSONException
+  {
+    JSONObject schemas;
+    
+    String schemasJson = conf.get(CONF_INPUT_KEY_SCHEMA);
+    
+    if (schemasJson == null)
+    {
+      schemas = new JSONObject();
+    }
+    else
+    {
+      schemas = new JSONObject(schemasJson);
+    }   
+    
+    return schemas;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/CombinedAvroKeyInputFormat.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/CombinedAvroKeyInputFormat.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/CombinedAvroKeyInputFormat.java
new file mode 100644
index 0000000..e6beae2
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/CombinedAvroKeyInputFormat.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyRecordReader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.log4j.Logger;
+
+import datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob;
+
+/**
+ * A combined input format for reading Avro data.
+ * 
+ * @author "Matthew Hayes"
+ *
+ * @param <T> Type of data to be read
+ */
+public class CombinedAvroKeyInputFormat<T> extends CombineFileInputFormat<AvroKey<T>, NullWritable> 
+{
+  private final Logger LOG = Logger.getLogger(AbstractPartitionPreservingIncrementalJob.class);
+
+  public static class CombinedAvroKeyRecordReader<T> extends AvroKeyRecordReader<T>
+  {
+    private CombineFileSplit inputSplit;
+    private Integer idx;
+    
+    public CombinedAvroKeyRecordReader(CombineFileSplit inputSplit, TaskAttemptContext context, Integer idx)
+    {
+      super(AvroJob.getInputKeySchema(context.getConfiguration()));
+      this.inputSplit = inputSplit;
+      this.idx = idx;            
+    }
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
+        InterruptedException
+    {
+      this.inputSplit = (CombineFileSplit)inputSplit;
+      
+      FileSplit fileSplit = new FileSplit(this.inputSplit.getPath(idx), 
+                                          this.inputSplit.getOffset(idx), 
+                                          this.inputSplit.getLength(idx), 
+                                          this.inputSplit.getLocations());
+      
+      super.initialize(fileSplit, context);
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Override
+  public RecordReader<AvroKey<T>, NullWritable> createRecordReader(InputSplit inputSplit,
+                                                                   TaskAttemptContext context) throws IOException
+  {
+    Schema readerSchema = AvroJob.getInputKeySchema(context.getConfiguration());
+    if (null == readerSchema) {
+      LOG.warn("Reader schema was not set. Use AvroJob.setInputKeySchema() if desired.");
+      LOG.info("Using a reader schema equal to the writer schema.");
+    }
+        
+    Object c = CombinedAvroKeyRecordReader.class;
+    return new CombineFileRecordReader<AvroKey<T>, NullWritable>((CombineFileSplit) inputSplit, context, (Class<? extends RecordReader<AvroKey<T>, NullWritable>>)c);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/package-info.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/package-info.java
new file mode 100644
index 0000000..2ae23ab
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/package-info.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Input and output formats for using Avro in incremental Hadoop jobs.
+ * These are used internally by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob}
+ * and {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob}.
+ */
+package datafu.hourglass.avro;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/fs/DatePath.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/fs/DatePath.java b/datafu-hourglass/src/main/java/datafu/hourglass/fs/DatePath.java
new file mode 100644
index 0000000..8229574
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/fs/DatePath.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.fs;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Represents a path and the corresponding date that is associated with it.
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class DatePath implements Comparable<DatePath>
+{
+  private static final SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyyMMddHHmmss");
+  
+  private final Date date;
+  private final Path path;
+  
+  static
+  {
+    timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+  }
+  
+  public DatePath(Date date, Path path)
+  {
+    this.date = date;
+    this.path = path;
+  }
+  
+  public Date getDate() { return this.date; }
+  public Path getPath() { return this.path; }
+  
+  public static DatePath createDatedPath(Path parent, Date date)
+  {
+    return new DatePath(date,new Path(parent,PathUtils.datedPathFormat.format(date)));
+  }
+  
+  public static DatePath createNestedDatedPath(Path parent, Date date)
+  {
+    return new DatePath(date,new Path(parent,PathUtils.nestedDatedPathFormat.format(date)));
+  }
+  
+  @Override
+  public String toString()
+  {
+    return String.format("[date=%s, path=%s]",timestampFormat.format(this.date), this.path.toString());
+  }
+
+  @Override
+  public int compareTo(DatePath o)
+  {
+    return this.date.compareTo(o.date);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((date == null) ? 0 : date.hashCode());
+    result = prime * result + ((path == null) ? 0 : path.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    DatePath other = (DatePath) obj;
+    if (date == null)
+    {
+      if (other.date != null)
+        return false;
+    }
+    else if (!date.equals(other.date))
+      return false;
+    if (path == null)
+    {
+      if (other.path != null)
+        return false;
+    }
+    else if (!path.equals(other.path))
+      return false;
+    return true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/fs/DateRange.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/fs/DateRange.java b/datafu-hourglass/src/main/java/datafu/hourglass/fs/DateRange.java
new file mode 100644
index 0000000..3cac627
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/fs/DateRange.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.fs;
+
+import java.util.Date;
+
+/**
+ * A date range, consisting of a start and end date.
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class DateRange
+{
+  private final Date _beginDate;
+  private final Date _endDate;
+  
+  public DateRange(Date beginDate, Date endDate)
+  {
+    _beginDate = beginDate;
+    _endDate = endDate;
+  }
+  
+  public Date getBeginDate()
+  {
+    return _beginDate;
+  }
+  
+  public Date getEndDate()
+  {
+    return _endDate;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/fs/PathUtils.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/fs/PathUtils.java b/datafu-hourglass/src/main/java/datafu/hourglass/fs/PathUtils.java
new file mode 100644
index 0000000..25e66ea
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/fs/PathUtils.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.fs;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.log4j.Logger;
+
+
+/**
+ * A collection of utility methods for dealing with files and paths.
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class PathUtils
+{
+  private static Logger _log = Logger.getLogger(PathUtils.class);
+  
+  public final static TimeZone timeZone = TimeZone.getTimeZone("UTC");    
+  public static final SimpleDateFormat datedPathFormat = new SimpleDateFormat("yyyyMMdd");  
+  public static final SimpleDateFormat nestedDatedPathFormat = new SimpleDateFormat("yyyy/MM/dd");  
+  private static final Pattern timestampPathPattern = Pattern.compile(".+/(\\d{8})");
+  private static final Pattern dailyPathPattern = Pattern.compile("(.+)/(\\d{4}/\\d{2}/\\d{2})");
+
+  /**
+   * Filters out paths starting with "." and "_".
+   */
+  public static final PathFilter nonHiddenPathFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path)
+    {
+      String s = path.getName().toString();
+      return !s.startsWith(".") && !s.startsWith("_");        
+    }
+  };
+  
+  static
+  {
+    datedPathFormat.setTimeZone(timeZone);
+    nestedDatedPathFormat.setTimeZone(timeZone);
+  }
+  
+  /**
+   * Delete all but the last N days of paths matching the "yyyyMMdd" format.
+   * 
+   * @param fs
+   * @param path
+   * @param retentionCount
+   * @throws IOException
+   */
+  public static void keepLatestDatedPaths(FileSystem fs, Path path, int retentionCount) throws IOException
+  {
+    LinkedList<DatePath> outputs = new LinkedList<DatePath>(PathUtils.findDatedPaths(fs, path));
+    
+    while (outputs.size() > retentionCount)
+    {
+      DatePath toDelete = outputs.removeFirst();
+      _log.info(String.format("Removing %s",toDelete.getPath()));
+      fs.delete(toDelete.getPath(),true);
+    }
+  }
+  
+  /**
+   * Delete all but the last N days of paths matching the "yyyy/MM/dd" format.
+   * 
+   * @param fs
+   * @param path
+   * @param retentionCount
+   * @throws IOException
+   */
+  public static void keepLatestNestedDatedPaths(FileSystem fs, Path path, int retentionCount) throws IOException
+  {
+    List<DatePath> outputPaths = PathUtils.findNestedDatedPaths(fs, path);
+    
+    if (outputPaths.size() > retentionCount)
+    {
+      Collections.sort(outputPaths);
+      _log.info(String.format("Applying retention range policy"));
+      for (DatePath output : outputPaths.subList(0, outputPaths.size() - retentionCount))
+      {
+        _log.info(String.format("Removing %s because it is outside retention range",output.getPath()));
+        fs.delete(output.getPath(),true);
+      }
+    }
+  }
+  
+  /**
+   * List all paths matching the "yyyy/MM/dd" format under a given path.
+   * 
+   * @param fs file system
+   * @param input path to search under
+   * @return paths
+   * @throws IOException
+   */
+  public static List<DatePath> findNestedDatedPaths(FileSystem fs, Path input) throws IOException
+  {
+    List<DatePath> inputDates = new ArrayList<DatePath>();
+    
+    FileStatus[] pathsStatus = fs.globStatus(new Path(input,"*/*/*"), nonHiddenPathFilter);
+    
+    if (pathsStatus == null)
+    {
+      return inputDates;
+    }
+        
+    for (FileStatus pathStatus : pathsStatus)
+    {
+      Matcher matcher = dailyPathPattern.matcher(pathStatus.getPath().toString());
+      if (matcher.matches())
+      {
+        String datePath = matcher.group(2);
+        Date date;
+        try
+        {
+          date = nestedDatedPathFormat.parse(datePath);
+        }
+        catch (ParseException e)
+        {
+          continue;
+        }
+        
+        Calendar cal = Calendar.getInstance(timeZone);
+        
+        cal.setTimeInMillis(date.getTime());
+        
+        inputDates.add(new DatePath(cal.getTime(), pathStatus.getPath()));
+      }
+    }
+    
+    return inputDates;
+  }
+  
+  /**
+   * List all paths matching the "yyyyMMdd" format under a given path.
+   * 
+   * @param fs file system
+   * @param path path to search under
+   * @return paths
+   * @throws IOException
+   */
+  public static List<DatePath> findDatedPaths(FileSystem fs, Path path) throws IOException
+  {
+    FileStatus[] outputPaths = fs.listStatus(path, nonHiddenPathFilter);
+    
+    List<DatePath> outputs = new ArrayList<DatePath>();
+    
+    if (outputPaths != null)
+    {
+      for (FileStatus outputPath : outputPaths)
+      {
+        Date date;
+        try
+        {
+          date = datedPathFormat.parse(outputPath.getPath().getName());
+        }
+        catch (ParseException e)
+        {
+          continue;
+        }
+        
+        outputs.add(new DatePath(date,outputPath.getPath()));
+      }
+    }
+    
+    Collections.sort(outputs);
+    
+    return outputs;
+  }
+  
+  /**
+   * Gets the schema from a given Avro data file.
+   * 
+   * @param fs 
+   * @param path
+   * @return The schema read from the data file's metadata.
+   * @throws IOException
+   */
+  public static Schema getSchemaFromFile(FileSystem fs, Path path) throws IOException
+  {
+    FSDataInputStream dataInputStream = fs.open(path);
+    DatumReader <GenericRecord> reader = new GenericDatumReader<GenericRecord>();
+    DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(dataInputStream, reader);
+    try
+    {
+      return dataFileStream.getSchema();
+    }
+    finally
+    {
+      dataFileStream.close();
+    }
+  }
+  
+  /**
+   * Gets the schema for the first Avro file under the given path.
+   * 
+   * @param path path to fetch schema for
+   * @return Avro schema
+   * @throws IOException
+   */
+  public static Schema getSchemaFromPath(FileSystem fs, Path path) throws IOException
+  {
+    return getSchemaFromFile(fs,fs.listStatus(path, nonHiddenPathFilter)[0].getPath());
+  }
+  
+  /**
+   * Sums the size of all files listed under a given path. 
+   * 
+   * @param fs file system
+   * @param path path to count bytes for
+   * @return total bytes under path
+   * @throws IOException
+   */
+  public static long countBytes(FileSystem fs, Path path) throws IOException
+  {
+    FileStatus[] files = fs.listStatus(path, nonHiddenPathFilter);
+    long totalForPath = 0L;
+    for (FileStatus file : files)
+    {
+      totalForPath += file.getLen();
+    }    
+    return totalForPath;
+  }
+  
+  /**
+   * Gets the date for a path in the "yyyyMMdd" format.
+   * 
+   * @param path path to check
+   * @return date for path
+   */
+  public static Date getDateForDatedPath(Path path)
+  {
+    Matcher matcher = timestampPathPattern.matcher(path.toString());
+    
+    if (!matcher.matches())
+    {
+      throw new RuntimeException("Unexpected input filename: " + path);
+    }
+         
+    try
+    {
+      return PathUtils.datedPathFormat.parse(matcher.group(1));
+    }
+    catch (ParseException e)
+    {
+      throw new RuntimeException("Unexpected input filename: " + path);
+    }
+  }
+  
+  /**
+   * Gets the date for a path in the "yyyy/MM/dd" format.
+   * 
+   * @param path path to check
+   * @return date
+   */
+  public static Date getDateForNestedDatedPath(Path path)
+  {
+    Matcher matcher = dailyPathPattern.matcher(path.toString());
+    
+    if (!matcher.matches())
+    {
+      throw new RuntimeException("Unexpected input filename: " + path);
+    }
+    
+    try
+    {
+      return PathUtils.nestedDatedPathFormat.parse(matcher.group(2));
+    }
+    catch (ParseException e)
+    {
+      throw new RuntimeException("Unexpected input filename: " + path);
+    }
+  }
+  
+  /**
+   * Gets the root path for a path in the "yyyy/MM/dd" format.  This is part of the path preceding the
+   * "yyyy/MM/dd" portion.
+   * 
+   * @param path
+   * @return
+   */
+  public static Path getNestedPathRoot(Path path)
+  {
+    Matcher matcher = dailyPathPattern.matcher(path.toString());
+    
+    if (!matcher.matches())
+    {
+      throw new RuntimeException("Unexpected input filename: " + path);
+    }
+    
+    return new Path(matcher.group(1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/fs/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/fs/package-info.java b/datafu-hourglass/src/main/java/datafu/hourglass/fs/package-info.java
new file mode 100644
index 0000000..f1fcdaf
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/fs/package-info.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Classes for working with the file system.
+ * These are used internally by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob}
+ * and {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob}.
+ */
+package datafu.hourglass.fs;
\ No newline at end of file