You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ja...@apache.org on 2014/05/18 21:59:51 UTC

[13/14] DATAFU-44: Migrate Hourglass to Gradle

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsKeyInputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsKeyInputFormat.java b/contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsKeyInputFormat.java
deleted file mode 100644
index 2686c32..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsKeyInputFormat.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroKeyRecordReader;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- * A MapReduce InputFormat that can handle Avro container files and multiple inputs.
- * The input schema is determine based on the split.  The mapping from input path
- * to schema is stored in the job configuration.
- *
- * <p>Keys are AvroKey wrapper objects that contain the Avro data.  Since Avro
- * container files store only records (not key/value pairs), the value from
- * this InputFormat is a NullWritable.</p>
- */
-public class AvroMultipleInputsKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> 
-{  
-  /** {@inheritDoc} */
-  @Override
-  public RecordReader<AvroKey<T>, NullWritable> createRecordReader(
-      InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException 
-  {    
-    Schema readerSchema = AvroMultipleInputsUtil.getInputKeySchemaForSplit(context.getConfiguration(), split);
-    if (readerSchema == null)
-    {
-      throw new RuntimeException("Could not determine input schema");
-    }
-    return new AvroKeyRecordReader<T>(readerSchema);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsUtil.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsUtil.java b/contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsUtil.java
deleted file mode 100644
index b15041a..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsUtil.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.avro;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.log4j.Logger;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-/**
- * Helper methods for dealing with multiple Avro input schemas.  A mapping is stored in the configuration
- * that maps each input path to its corresponding schema.  Methods in this class help with loading and
- * storing these schema mappings.
- * 
- * @author "Matthew Hayes"
- *
- */
-public class AvroMultipleInputsUtil
-{
-  private static final Logger _log = Logger.getLogger(AvroMultipleInputsUtil.class);
-  
-  private static final String CONF_INPUT_KEY_SCHEMA = "avro.schema.multiple.inputs.keys";
-  
-  /**
-   * Gets the schema for a particular input split. 
-   * 
-   * @param conf configuration to get schema from
-   * @param split input split to get schema for
-   * @return schema
-   */
-  public static Schema getInputKeySchemaForSplit(Configuration conf, InputSplit split) 
-  {
-    String path = ((FileSplit)split).getPath().toString();
-    _log.info("Determining schema for input path " + path);
-    JSONObject schemas;
-    try
-    {
-      schemas = getInputSchemas(conf);
-    }
-    catch (JSONException e1)
-    {
-      throw new RuntimeException(e1);
-    }
-    Schema schema = null;
-    if (schemas != null)
-    {
-      for (String key : JSONObject.getNames(schemas))
-      {
-        _log.info("Checking against " + key);
-        if (path.startsWith(key))
-        {
-          try
-          {
-            schema = new Schema.Parser().parse(schemas.getString(key));
-            _log.info("Input schema found: " + schema.toString(true));
-            break;
-          }
-          catch (JSONException e)
-          {
-            throw new RuntimeException(e);
-          }
-        }
-      }
-    }
-    if (schema == null)
-    {
-      _log.info("Could not determine input schema");
-    }
-    return schema;
-  }
-  
-  /**
-   * Sets the job input key schema for a path.
-   *
-   * @param job The job to configure.
-   * @param schema The input key schema.
-   * @param path the path to set the schema for
-   */
-  public static void setInputKeySchemaForPath(Job job, Schema schema, String path) 
-  { 
-    JSONObject schemas;    
-    try
-    {
-      schemas = getInputSchemas(job.getConfiguration());
-      schemas.put(path, schema.toString());
-    }
-    catch (JSONException e)
-    {
-      throw new RuntimeException(e);
-    }         
-    job.getConfiguration().set(CONF_INPUT_KEY_SCHEMA, schemas.toString());
-  }
-  
-  /**
-   * Get a mapping from path to input schema.
-   * 
-   * @param conf
-   * @return mapping from path to input schem
-   * @throws JSONException
-   */
-  private static JSONObject getInputSchemas(Configuration conf) throws JSONException
-  {
-    JSONObject schemas;
-    
-    String schemasJson = conf.get(CONF_INPUT_KEY_SCHEMA);
-    
-    if (schemasJson == null)
-    {
-      schemas = new JSONObject();
-    }
-    else
-    {
-      schemas = new JSONObject(schemasJson);
-    }   
-    
-    return schemas;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/avro/CombinedAvroKeyInputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/avro/CombinedAvroKeyInputFormat.java b/contrib/hourglass/src/java/datafu/hourglass/avro/CombinedAvroKeyInputFormat.java
deleted file mode 100644
index d2b171d..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/avro/CombinedAvroKeyInputFormat.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyRecordReader;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.log4j.Logger;
-
-import datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob;
-
-/**
- * A combined input format for reading Avro data.
- * 
- * @author "Matthew Hayes"
- *
- * @param <T> Type of data to be read
- */
-public class CombinedAvroKeyInputFormat<T> extends CombineFileInputFormat<AvroKey<T>, NullWritable> 
-{
-  private final Logger LOG = Logger.getLogger(AbstractPartitionPreservingIncrementalJob.class);
-
-  public static class CombinedAvroKeyRecordReader<T> extends AvroKeyRecordReader<T>
-  {
-    private CombineFileSplit inputSplit;
-    private Integer idx;
-    
-    public CombinedAvroKeyRecordReader(CombineFileSplit inputSplit, TaskAttemptContext context, Integer idx)
-    {
-      super(AvroJob.getInputKeySchema(context.getConfiguration()));
-      this.inputSplit = inputSplit;
-      this.idx = idx;            
-    }
-
-    @Override
-    public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
-        InterruptedException
-    {
-      this.inputSplit = (CombineFileSplit)inputSplit;
-      
-      FileSplit fileSplit = new FileSplit(this.inputSplit.getPath(idx), 
-                                          this.inputSplit.getOffset(idx), 
-                                          this.inputSplit.getLength(idx), 
-                                          this.inputSplit.getLocations());
-      
-      super.initialize(fileSplit, context);
-    }
-  }
-  
-  @SuppressWarnings("unchecked")
-  @Override
-  public RecordReader<AvroKey<T>, NullWritable> createRecordReader(InputSplit inputSplit,
-                                                                   TaskAttemptContext context) throws IOException
-  {
-    Schema readerSchema = AvroJob.getInputKeySchema(context.getConfiguration());
-    if (null == readerSchema) {
-      LOG.warn("Reader schema was not set. Use AvroJob.setInputKeySchema() if desired.");
-      LOG.info("Using a reader schema equal to the writer schema.");
-    }
-        
-    Object c = CombinedAvroKeyRecordReader.class;
-    return new CombineFileRecordReader<AvroKey<T>, NullWritable>((CombineFileSplit) inputSplit, context, (Class<? extends RecordReader<AvroKey<T>, NullWritable>>)c);
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/avro/package-info.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/avro/package-info.java b/contrib/hourglass/src/java/datafu/hourglass/avro/package-info.java
deleted file mode 100644
index 54a428f..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/avro/package-info.java
+++ /dev/null
@@ -1,6 +0,0 @@
-/**
- * Input and output formats for using Avro in incremental Hadoop jobs.
- * These are used internally by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob}
- * and {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob}.
- */
-package datafu.hourglass.avro;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/fs/DatePath.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/fs/DatePath.java b/contrib/hourglass/src/java/datafu/hourglass/fs/DatePath.java
deleted file mode 100644
index 0ffba6e..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/fs/DatePath.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.fs;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.TimeZone;
-
-import org.apache.hadoop.fs.Path;
-
-/**
- * Represents a path and the corresponding date that is associated with it.
- * 
- * @author "Matthew Hayes"
- *
- */
-public class DatePath implements Comparable<DatePath>
-{
-  private static final SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyyMMddHHmmss");
-  
-  private final Date date;
-  private final Path path;
-  
-  static
-  {
-    timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-  }
-  
-  public DatePath(Date date, Path path)
-  {
-    this.date = date;
-    this.path = path;
-  }
-  
-  public Date getDate() { return this.date; }
-  public Path getPath() { return this.path; }
-  
-  public static DatePath createDatedPath(Path parent, Date date)
-  {
-    return new DatePath(date,new Path(parent,PathUtils.datedPathFormat.format(date)));
-  }
-  
-  public static DatePath createNestedDatedPath(Path parent, Date date)
-  {
-    return new DatePath(date,new Path(parent,PathUtils.nestedDatedPathFormat.format(date)));
-  }
-  
-  @Override
-  public String toString()
-  {
-    return String.format("[date=%s, path=%s]",timestampFormat.format(this.date), this.path.toString());
-  }
-
-  @Override
-  public int compareTo(DatePath o)
-  {
-    return this.date.compareTo(o.date);
-  }
-
-  @Override
-  public int hashCode()
-  {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((date == null) ? 0 : date.hashCode());
-    result = prime * result + ((path == null) ? 0 : path.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj)
-  {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    DatePath other = (DatePath) obj;
-    if (date == null)
-    {
-      if (other.date != null)
-        return false;
-    }
-    else if (!date.equals(other.date))
-      return false;
-    if (path == null)
-    {
-      if (other.path != null)
-        return false;
-    }
-    else if (!path.equals(other.path))
-      return false;
-    return true;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/fs/DateRange.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/fs/DateRange.java b/contrib/hourglass/src/java/datafu/hourglass/fs/DateRange.java
deleted file mode 100644
index 485dd82..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/fs/DateRange.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.fs;
-
-import java.util.Date;
-
-/**
- * A date range, consisting of a start and end date.
- * 
- * @author "Matthew Hayes"
- *
- */
-public class DateRange
-{
-  private final Date _beginDate;
-  private final Date _endDate;
-  
-  public DateRange(Date beginDate, Date endDate)
-  {
-    _beginDate = beginDate;
-    _endDate = endDate;
-  }
-  
-  public Date getBeginDate()
-  {
-    return _beginDate;
-  }
-  
-  public Date getEndDate()
-  {
-    return _endDate;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/fs/PathUtils.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/fs/PathUtils.java b/contrib/hourglass/src/java/datafu/hourglass/fs/PathUtils.java
deleted file mode 100644
index 4dfd5d1..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/fs/PathUtils.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.fs;
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.log4j.Logger;
-
-
-/**
- * A collection of utility methods for dealing with files and paths.
- * 
- * @author "Matthew Hayes"
- *
- */
-public class PathUtils
-{
-  private static Logger _log = Logger.getLogger(PathUtils.class);
-  
-  public final static TimeZone timeZone = TimeZone.getTimeZone("UTC");    
-  public static final SimpleDateFormat datedPathFormat = new SimpleDateFormat("yyyyMMdd");  
-  public static final SimpleDateFormat nestedDatedPathFormat = new SimpleDateFormat("yyyy/MM/dd");  
-  private static final Pattern timestampPathPattern = Pattern.compile(".+/(\\d{8})");
-  private static final Pattern dailyPathPattern = Pattern.compile("(.+)/(\\d{4}/\\d{2}/\\d{2})");
-
-  /**
-   * Filters out paths starting with "." and "_".
-   */
-  public static final PathFilter nonHiddenPathFilter = new PathFilter() {
-    @Override
-    public boolean accept(Path path)
-    {
-      String s = path.getName().toString();
-      return !s.startsWith(".") && !s.startsWith("_");        
-    }
-  };
-  
-  static
-  {
-    datedPathFormat.setTimeZone(timeZone);
-    nestedDatedPathFormat.setTimeZone(timeZone);
-  }
-  
-  /**
-   * Delete all but the last N days of paths matching the "yyyyMMdd" format.
-   * 
-   * @param fs
-   * @param path
-   * @param retentionCount
-   * @throws IOException
-   */
-  public static void keepLatestDatedPaths(FileSystem fs, Path path, int retentionCount) throws IOException
-  {
-    LinkedList<DatePath> outputs = new LinkedList<DatePath>(PathUtils.findDatedPaths(fs, path));
-    
-    while (outputs.size() > retentionCount)
-    {
-      DatePath toDelete = outputs.removeFirst();
-      _log.info(String.format("Removing %s",toDelete.getPath()));
-      fs.delete(toDelete.getPath(),true);
-    }
-  }
-  
-  /**
-   * Delete all but the last N days of paths matching the "yyyy/MM/dd" format.
-   * 
-   * @param fs
-   * @param path
-   * @param retentionCount
-   * @throws IOException
-   */
-  public static void keepLatestNestedDatedPaths(FileSystem fs, Path path, int retentionCount) throws IOException
-  {
-    List<DatePath> outputPaths = PathUtils.findNestedDatedPaths(fs, path);
-    
-    if (outputPaths.size() > retentionCount)
-    {
-      Collections.sort(outputPaths);
-      _log.info(String.format("Applying retention range policy"));
-      for (DatePath output : outputPaths.subList(0, outputPaths.size() - retentionCount))
-      {
-        _log.info(String.format("Removing %s because it is outside retention range",output.getPath()));
-        fs.delete(output.getPath(),true);
-      }
-    }
-  }
-  
-  /**
-   * List all paths matching the "yyyy/MM/dd" format under a given path.
-   * 
-   * @param fs file system
-   * @param input path to search under
-   * @return paths
-   * @throws IOException
-   */
-  public static List<DatePath> findNestedDatedPaths(FileSystem fs, Path input) throws IOException
-  {
-    List<DatePath> inputDates = new ArrayList<DatePath>();
-    
-    FileStatus[] pathsStatus = fs.globStatus(new Path(input,"*/*/*"), nonHiddenPathFilter);
-    
-    if (pathsStatus == null)
-    {
-      return inputDates;
-    }
-        
-    for (FileStatus pathStatus : pathsStatus)
-    {
-      Matcher matcher = dailyPathPattern.matcher(pathStatus.getPath().toString());
-      if (matcher.matches())
-      {
-        String datePath = matcher.group(2);
-        Date date;
-        try
-        {
-          date = nestedDatedPathFormat.parse(datePath);
-        }
-        catch (ParseException e)
-        {
-          continue;
-        }
-        
-        Calendar cal = Calendar.getInstance(timeZone);
-        
-        cal.setTimeInMillis(date.getTime());
-        
-        inputDates.add(new DatePath(cal.getTime(), pathStatus.getPath()));
-      }
-    }
-    
-    return inputDates;
-  }
-  
-  /**
-   * List all paths matching the "yyyyMMdd" format under a given path.
-   * 
-   * @param fs file system
-   * @param path path to search under
-   * @return paths
-   * @throws IOException
-   */
-  public static List<DatePath> findDatedPaths(FileSystem fs, Path path) throws IOException
-  {
-    FileStatus[] outputPaths = fs.listStatus(path, nonHiddenPathFilter);
-    
-    List<DatePath> outputs = new ArrayList<DatePath>();
-    
-    if (outputPaths != null)
-    {
-      for (FileStatus outputPath : outputPaths)
-      {
-        Date date;
-        try
-        {
-          date = datedPathFormat.parse(outputPath.getPath().getName());
-        }
-        catch (ParseException e)
-        {
-          continue;
-        }
-        
-        outputs.add(new DatePath(date,outputPath.getPath()));
-      }
-    }
-    
-    Collections.sort(outputs);
-    
-    return outputs;
-  }
-  
-  /**
-   * Gets the schema from a given Avro data file.
-   * 
-   * @param fs 
-   * @param path
-   * @return The schema read from the data file's metadata.
-   * @throws IOException
-   */
-  public static Schema getSchemaFromFile(FileSystem fs, Path path) throws IOException
-  {
-    FSDataInputStream dataInputStream = fs.open(path);
-    DatumReader <GenericRecord> reader = new GenericDatumReader<GenericRecord>();
-    DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(dataInputStream, reader);
-    try
-    {
-      return dataFileStream.getSchema();
-    }
-    finally
-    {
-      dataFileStream.close();
-    }
-  }
-  
-  /**
-   * Gets the schema for the first Avro file under the given path.
-   * 
-   * @param path path to fetch schema for
-   * @return Avro schema
-   * @throws IOException
-   */
-  public static Schema getSchemaFromPath(FileSystem fs, Path path) throws IOException
-  {
-    return getSchemaFromFile(fs,fs.listStatus(path, nonHiddenPathFilter)[0].getPath());
-  }
-  
-  /**
-   * Sums the size of all files listed under a given path. 
-   * 
-   * @param fs file system
-   * @param path path to count bytes for
-   * @return total bytes under path
-   * @throws IOException
-   */
-  public static long countBytes(FileSystem fs, Path path) throws IOException
-  {
-    FileStatus[] files = fs.listStatus(path, nonHiddenPathFilter);
-    long totalForPath = 0L;
-    for (FileStatus file : files)
-    {
-      totalForPath += file.getLen();
-    }    
-    return totalForPath;
-  }
-  
-  /**
-   * Gets the date for a path in the "yyyyMMdd" format.
-   * 
-   * @param path path to check
-   * @return date for path
-   */
-  public static Date getDateForDatedPath(Path path)
-  {
-    Matcher matcher = timestampPathPattern.matcher(path.toString());
-    
-    if (!matcher.matches())
-    {
-      throw new RuntimeException("Unexpected input filename: " + path);
-    }
-         
-    try
-    {
-      return PathUtils.datedPathFormat.parse(matcher.group(1));
-    }
-    catch (ParseException e)
-    {
-      throw new RuntimeException("Unexpected input filename: " + path);
-    }
-  }
-  
-  /**
-   * Gets the date for a path in the "yyyy/MM/dd" format.
-   * 
-   * @param path path to check
-   * @return date
-   */
-  public static Date getDateForNestedDatedPath(Path path)
-  {
-    Matcher matcher = dailyPathPattern.matcher(path.toString());
-    
-    if (!matcher.matches())
-    {
-      throw new RuntimeException("Unexpected input filename: " + path);
-    }
-    
-    try
-    {
-      return PathUtils.nestedDatedPathFormat.parse(matcher.group(2));
-    }
-    catch (ParseException e)
-    {
-      throw new RuntimeException("Unexpected input filename: " + path);
-    }
-  }
-  
-  /**
-   * Gets the root path for a path in the "yyyy/MM/dd" format.  This is part of the path preceding the
-   * "yyyy/MM/dd" portion.
-   * 
-   * @param path
-   * @return
-   */
-  public static Path getNestedPathRoot(Path path)
-  {
-    Matcher matcher = dailyPathPattern.matcher(path.toString());
-    
-    if (!matcher.matches())
-    {
-      throw new RuntimeException("Unexpected input filename: " + path);
-    }
-    
-    return new Path(matcher.group(1));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/fs/package-info.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/fs/package-info.java b/contrib/hourglass/src/java/datafu/hourglass/fs/package-info.java
deleted file mode 100644
index 9d81006..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/fs/package-info.java
+++ /dev/null
@@ -1,6 +0,0 @@
-/**
- * Classes for working with the file system.
- * These are used internally by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob}
- * and {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob}.
- */
-package datafu.hourglass.fs;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractJob.java
deleted file mode 100644
index 3b998a9..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractJob.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Base class for Hadoop jobs.
- * 
- * <p>
- * This class defines a set of common methods and configuration shared by Hadoop jobs.
- * Jobs can be configured either by providing properties or by calling setters.
- * Each property has a corresponding setter.
- * </p>
- * 
- * This class recognizes the following properties:
- * 
- * <ul>
- *   <li><em>input.path</em> - Input path job will read from</li>
- *   <li><em>output.path</em> - Output path job will write to</li>
- *   <li><em>temp.path</em> - Temporary path under which intermediate files are stored</li>
- *   <li><em>retention.count</em> - Number of days to retain in output directory</li>
- *   <li><em>num.reducers</em> - Number of reducers to use</li>
- *   <li><em>use.combiner</em> - Whether to use a combiner or not</li>
- *   <li><em>counters.path</em> - Path to store job counters in</li>
- * </ul>
- * 
- * <p>
- * The <em>input.path</em> property may be a comma-separated list of paths.  When there is more
- * than one it implies a join is to be performed.  Alternatively the paths may be listed separately.
- * For example, <em>input.path.first</em> and <em>input.path.second</em> define two separate input
- * paths.
- * </p>
- * 
- * <p>
- * The <em>num.reducers</em> fixes the number of reducers.  When not set the number of reducers
- * is computed based on the input size.
- * </p>
- * 
- * <p>
- * The <em>temp.path</em> property defines the parent directory for temporary paths, not the
- * temporary path itself.  Temporary paths are created under this directory with an <em>hourglass-</em>
- * prefix followed by a GUID.
- * </p>
- * 
- * <p> 
- * The input and output paths are the only required parameters.  The rest are optional.
- * </p>
- * 
- * <p>
- * Hadoop configuration may be provided by setting a property with the prefix <em>hadoop-conf.</em>.
- * For example, <em>mapred.min.split.size</em> can be configured by setting property
- * <em>hadoop-conf.mapred.min.split.size</em> to the desired value. 
- * </p>
- * 
- * @author "Matthew Hayes"
- *
- */
-public abstract class AbstractJob extends Configured
-{
-  private static String HADOOP_PREFIX = "hadoop-conf.";
-  
-  private Properties _props;
-  private String _name;
-  private boolean _useCombiner;
-  private Path _countersParentPath;
-  private Integer _numReducers;
-  private Integer _retentionCount;
-  private List<Path> _inputPaths;
-  private Path _outputPath;
-  private Path _tempPath = new Path("/tmp");
-  private FileSystem _fs;
-  
-  /**
-   * Initializes the job.
-   */
-  public AbstractJob()
-  {    
-    setConf(new Configuration());
-  }
-    
-  /**
-   * Initializes the job with a job name and properties.
-   * 
-   * @param name Job name
-   * @param props Configuration properties
-   */
-  public AbstractJob(String name, Properties props)
-  {        
-    this();
-    setName(name);
-    setProperties(props);
-  }
-  
-  /**
-   * Gets the job name
-   * 
-   * @return Job name
-   */
-  public String getName()
-  {
-    return _name;
-  }
-  
-  /**
-   * Sets the job name
-   * 
-   * @param name Job name
-   */
-  public void setName(String name)
-  {
-    _name = name;
-  }
-  
-  /**
-   * Gets the configuration properties.
-   * 
-   * @return Configuration properties
-   */
-  public Properties getProperties()
-  {
-    return _props;
-  }
-  
-  /**
-   * Sets the configuration properties.
-   * 
-   * @param props Properties
-   */
-  public void setProperties(Properties props)
-  {
-    _props = props;
-    updateConfigurationFromProps(_props);
-    
-    if (_props.get("input.path") != null) 
-    {
-      String[] pathSplit = ((String)_props.get("input.path")).split(",");
-      List<Path> paths = new ArrayList<Path>();
-      for (String path : pathSplit)
-      {
-        if (path != null && path.length() > 0)
-        {
-          path = path.trim();
-          if (path.length() > 0)
-          {
-            paths.add(new Path(path));
-          }
-        }
-      }
-      if (paths.size() > 0)
-      {
-        setInputPaths(paths);
-      }
-      else
-      {
-        throw new RuntimeException("Could not extract input paths from: " + _props.get("input.path"));
-      }
-    }
-    else
-    {
-      List<Path> inputPaths = new ArrayList<Path>();
-      for (Object o : _props.keySet())
-      {
-        String prop = o.toString();
-        if (prop.startsWith("input.path."))
-        {
-          inputPaths.add(new Path(_props.getProperty(prop)));
-        }
-      }
-      if (inputPaths.size() > 0)
-      {
-        setInputPaths(inputPaths);
-      }
-    }
-    
-    if (_props.get("output.path") != null) 
-    {
-      setOutputPath(new Path((String)_props.get("output.path")));
-    }
-    
-    if (_props.get("temp.path") != null) 
-    {
-      setTempPath(new Path((String)_props.get("temp.path")));
-    }
-    
-    if (_props.get("retention.count") != null)
-    {
-      setRetentionCount(Integer.parseInt((String)_props.get("retention.count")));
-    }
-    
-    if (_props.get("num.reducers") != null)
-    {
-      setNumReducers(Integer.parseInt((String)_props.get("num.reducers")));
-    }
-    
-    if (_props.get("use.combiner") != null)
-    {
-      setUseCombiner(Boolean.parseBoolean((String)_props.get("use.combiner")));
-    }   
-    
-    if (_props.get("counters.path") != null)
-    {
-      setCountersParentPath(new Path((String)_props.get("counters.path")));
-    }
-  }
-  
-  /**
-   * Overridden to provide custom configuration before the job starts.
-   * 
-   * @param conf
-   */
-  public void config(Configuration conf)
-  {    
-  }
-  
-  /**
-   * Gets the number of reducers to use.
-   * 
-   * @return Number of reducers
-   */
-  public Integer getNumReducers()
-  {
-    return _numReducers;
-  }
-
-  /**
-   * Sets the number of reducers to use.  Can also be set with <em>num.reducers</em> property.
-   * 
-   * @param numReducers Number of reducers to use
-   */
-  public void setNumReducers(Integer numReducers)
-  {
-    this._numReducers = numReducers;
-  }
-  
-  /**
-   * Gets whether the combiner should be used.
-   * 
-   * @return True if combiner should be used, otherwise false.
-   */
-  public boolean isUseCombiner()
-  {
-    return _useCombiner;
-  }
-
-  /**
-   * Sets whether the combiner should be used.  Can also be set with <em>use.combiner</em>.
-   * 
-   * @param useCombiner True if a combiner should be used, otherwise false.
-   */
-  public void setUseCombiner(boolean useCombiner)
-  {
-    this._useCombiner = useCombiner;
-  }
-
-  /**
-   * Gets the path where counters will be stored.
-   * 
-   * @return Counters path
-   */
-  public Path getCountersParentPath()
-  {
-    return _countersParentPath;
-  }
-
-  /**
-   * Sets the path where counters will be stored.  Can also be set with <em>counters.path</em>.
-   * 
-   * @param countersParentPath Counters path
-   */
-  public void setCountersParentPath(Path countersParentPath)
-  {
-    this._countersParentPath = countersParentPath;
-  }
-
-  /**
-   * Gets the number of days of data which will be retained in the output path.
-   * Only the latest will be kept.  Older paths will be removed.
-   * 
-   * @return retention count
-   */
-  public Integer getRetentionCount()
-  {
-    return _retentionCount;
-  }
-
-  /**
-   * Sets the number of days of data which will be retained in the output path.
-   * Only the latest will be kept.  Older paths will be removed.
-   * Can also be set with <em>retention.count</em>.
-   * 
-   * @param retentionCount
-   */
-  public void setRetentionCount(Integer retentionCount)
-  {
-    this._retentionCount = retentionCount;
-  }
-  
-  /**
-   * Gets the input paths.  Multiple input paths imply a join is to be performed.
-   * 
-   * @return input paths
-   */
-  public List<Path> getInputPaths()
-  {
-    return _inputPaths;
-  }
-
-  /**
-   * Sets the input paths.  Multiple input paths imply a join is to be performed.
-   * Can also be set with <em>input.path</em> or several properties starting with
-   * <em>input.path.</em>.
-   * 
-   * @param inputPaths input paths
-   */
-  public void setInputPaths(List<Path> inputPaths)
-  {
-    this._inputPaths = inputPaths;
-  }
-
-  /**
-   * Gets the output path.
-   * 
-   * @return output path
-   */
-  public Path getOutputPath()
-  {
-    return _outputPath;
-  }
-
-  /**
-   * Sets the output path.  Can also be set with <em>output.path</em>.
-   * 
-   * @param outputPath output path
-   */
-  public void setOutputPath(Path outputPath)
-  {
-    this._outputPath = outputPath;
-  }
-  
-  /**
-   * Gets the temporary path under which intermediate files will be stored.  Defaults to /tmp.
-   * 
-   * @return Temporary path
-   */
-  public Path getTempPath()
-  {
-    return _tempPath;
-  }
-
-  /**
-   * Sets the temporary path where intermediate files will be stored.  Defaults to /tmp. 
-   * 
-   * @param tempPath Temporary path
-   */
-  public void setTempPath(Path tempPath)
-  {
-    this._tempPath = tempPath;
-  }
-  
-  /**
-   * Gets the file system.
-   * 
-   * @return File system
-   * @throws IOException 
-   */
-  protected FileSystem getFileSystem()
-  {
-    if (_fs == null)
-    {
-      try
-      {
-        _fs = FileSystem.get(getConf());
-      }
-      catch (IOException e)
-      {
-        throw new RuntimeException(e);
-      }
-    }
-    return _fs;
-  }
-  
-  /**
-   * Generates a random temporary path within the file system.  This does not create the path.
-   * 
-   * @return Random temporary path
-   */
-  protected Path randomTempPath()
-  {
-    return new Path(_tempPath,String.format("hourglass-%s",UUID.randomUUID()));
-  }
-  
-  /**
-   * Creates a random temporary path within the file system.
-   * 
-   * @return Random temporary path
-   * @throws IOException
-   */
-  protected Path createRandomTempPath() throws IOException
-  {
-    return ensurePath(randomTempPath());
-  }
-  
-  /**
-   * Creates a path, if it does not already exist.
-   * 
-   * @param path Path to create
-   * @return The same path that was provided
-   * @throws IOException
-   */
-  protected Path ensurePath(Path path) throws IOException
-  {
-    if (!getFileSystem().exists(path))
-    {
-      getFileSystem().mkdirs(path);
-    }
-    return path;
-  }
-  
-  /**
-   * Validation required before running job.
-   */
-  protected void validate()
-  {
-    if (_inputPaths == null || _inputPaths.size() == 0) 
-    {
-      throw new IllegalArgumentException("Input path is not specified.");
-    }
-    
-    if (_outputPath == null) 
-    {
-      throw new IllegalArgumentException("Output path is not specified.");
-    }
-  }
-  
-  /**
-   * Initialization required before running job.
-   */
-  protected void initialize()
-  {
-  }
-  
-  /**
-   * Run the job.
-   * 
-   * @throws IOException
-   * @throws InterruptedException
-   * @throws ClassNotFoundException
-   */
-  public abstract void run() throws IOException, InterruptedException, ClassNotFoundException;
-  
-  /**
-   * Creates Hadoop configuration using the provided properties.
-   * 
-   * @param props
-   * @return
-   */
-  private void updateConfigurationFromProps(Properties props)
-  {
-    Configuration config = getConf();
-    
-    if (config == null)
-    {
-      config = new Configuration();
-    }
-    
-    // to enable unit tests to inject configuration  
-    if (props.containsKey("test.conf"))
-    {
-      try
-      {
-        byte[] decoded = Base64.decodeBase64(props.getProperty("test.conf"));        
-        ByteArrayInputStream byteInput = new ByteArrayInputStream(decoded);
-        DataInputStream inputStream = new DataInputStream(byteInput);
-        config.readFields(inputStream);
-      }
-      catch (IOException e)
-      {
-        throw new RuntimeException(e);
-      }
-    }
-    else
-    {
-      for (String key : props.stringPropertyNames()) 
-      {               
-          String newKey = key;
-          String value = props.getProperty(key);
-          
-          if (key.toLowerCase().startsWith(HADOOP_PREFIX)) {
-            newKey = key.substring(HADOOP_PREFIX.length());
-            config.set(newKey, value);
-          }
-      }      
-    }
-  }   
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractNonIncrementalJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractNonIncrementalJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractNonIncrementalJob.java
deleted file mode 100644
index 1c09cb4..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractNonIncrementalJob.java
+++ /dev/null
@@ -1,425 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.log4j.Logger;
-
-
-import datafu.hourglass.avro.CombinedAvroKeyInputFormat;
-import datafu.hourglass.fs.DatePath;
-import datafu.hourglass.fs.DateRange;
-import datafu.hourglass.fs.PathUtils;
-
-/**
- * Base class for Hadoop jobs that consume time-partitioned data
- * in a non-incremental way.  Typically this is only used for comparing incremental
- * jobs against a non-incremental baseline.
- * It is essentially the same as {@link AbstractPartitionCollapsingIncrementalJob}
- * without all the incremental features.
- * 
- * <p>
- * Jobs extending this class consume input data partitioned according to yyyy/MM/dd.
- * Only a single input path is supported.  The output will be written to a directory
- * in the output path with name format yyyyMMdd derived from the end of the time
- * window that is consumed.
- * </p>
- * 
- * <p>
- * This class has the same configuration and methods as {@link TimeBasedJob}.
- * In addition it also recognizes the following properties:
- * </p>
- * 
- *  <ul>
- *   <li><em>combine.inputs</em> - True if inputs should be combined (defaults to false)</li>
- *   <li><em>num.reducers.bytes.per.reducer</em> - Number of input bytes per reducer</li>
- * </ul>
- * 
- * <p>
- * When <em>combine.inputs</em> is true, then CombinedAvroKeyInputFormat is used
- * instead of AvroKeyInputFormat.  This enables a single map task to consume more than
- * one file.
- * </p>
- * 
- * <p>
- * The <em>num.reducers.bytes.per.reducer</em> property controls the number of reducers to 
- * use based on the input size.  The total size of the input files is divided by this number
- * and then rounded up.
- * </p>
- * 
- * @author "Matthew Hayes"
- *
- */
-public abstract class AbstractNonIncrementalJob extends TimeBasedJob
-{
-  private final Logger _log = Logger.getLogger(AbstractNonIncrementalJob.class);
-    
-  private boolean _combineInputs;
-  private Report _report;
-  
-  /**
-   * Initializes the job.
-   * 
-   * @param name job name
-   * @param props configuration properties
-   * @throws IOException
-   */
-  public AbstractNonIncrementalJob(String name, Properties props) throws IOException
-  {        
-    super(name,props);
-    
-    if (props.containsKey("combine.inputs"))
-    {
-      setCombineInputs(Boolean.parseBoolean(props.getProperty("combine.inputs")));
-    }
-  }
-  
-  /**
-   * Gets whether inputs should be combined.
-   * 
-   * @return true if inputs are to be combined
-   */
-  public boolean getCombineInputs()
-  {
-    return _combineInputs;
-  }
-  
-  /**
-   * Sets whether inputs should be combined.
-   * 
-   * @param combineInputs true to combine inputs
-   */
-  public void setCombineInputs(boolean combineInputs)
-  {
-    _combineInputs = combineInputs;
-  }
-  
-  /**
-   * Gets a report summarizing the run.
-   * 
-   * @return report
-   */
-  public Report getReport()
-  {
-    return _report;
-  }
-  
-  /**
-   * Runs the job.
-   * 
-   * @throws IOException
-   * @throws InterruptedException
-   * @throws ClassNotFoundException
-   */
-  @Override
-  public void run() throws IOException, InterruptedException, ClassNotFoundException
-  {     
-    _report = new Report();
-    
-    Calendar cal = Calendar.getInstance(PathUtils.timeZone);   
-    
-    if (!getFileSystem().exists(getOutputPath()))
-    {
-      getFileSystem().mkdirs(getOutputPath());
-    }
-    
-    if (getInputPaths().size() > 1)
-    {
-      throw new RuntimeException("Only a single input is supported");
-    }
-    
-    List<DatePath> inputs = PathUtils.findNestedDatedPaths(getFileSystem(), getInputPaths().get(0));
-    
-    DatePath latestInput = (inputs.size() > 0) ? inputs.get(inputs.size() - 1) : null; 
-    
-    if (inputs.size() == 0)
-    {
-      throw new RuntimeException("no input data available");
-    }
-    
-    List<Date> dates = new ArrayList<Date>();
-    for (DatePath dp : inputs)
-    {
-      dates.add(dp.getDate());
-    }
-    
-    DateRange dateRange = DateRangePlanner.getDateRange(getStartDate(), getEndDate(), dates, getDaysAgo(), getNumDays(), true);
-    
-    Map<Date,DatePath> existingInputs = new HashMap<Date,DatePath>();
-    for (DatePath input : inputs)
-    {
-      existingInputs.put(input.getDate(), input);
-    }
-    
-    _log.info("Getting schema for input " + latestInput.getPath());
-    Schema inputSchema = PathUtils.getSchemaFromPath(getFileSystem(),latestInput.getPath());
-    
-    ReduceEstimator estimator = new ReduceEstimator(getFileSystem(),getProperties());
-    
-    List<String> inputPaths = new ArrayList<String>();
-    for (Date currentDate=dateRange.getBeginDate(); currentDate.compareTo(dateRange.getEndDate()) <= 0; )
-    { 
-      DatePath input = existingInputs.get(currentDate);  
-      if (input != null)
-      { 
-        _log.info(String.format("Processing %s",input.getPath()));
-        inputPaths.add(input.getPath().toString());
-        estimator.addInputPath(input.getPath());
-        _report.inputFiles.add(input);
-        latestInput = input;
-      }
-      else
-      {
-        throw new RuntimeException(String.format("Missing input for %s",PathUtils.datedPathFormat.format(currentDate)));
-      }
-      
-      cal.setTime(currentDate);
-      cal.add(Calendar.DAY_OF_MONTH, 1);
-      currentDate = cal.getTime();
-    }
-        
-    Path timestampOutputPath = new Path(getOutputPath(),PathUtils.datedPathFormat.format(latestInput.getDate()));
-    
-    final StagedOutputJob job = StagedOutputJob.createStagedJob(
-                                          getConf(),
-                                          getName() + "-" + PathUtils.datedPathFormat.format(latestInput.getDate()),            
-                                          inputPaths,
-                                          "/tmp" + timestampOutputPath.toString(),
-                                          timestampOutputPath.toString(),
-                                          _log);
-    
-    
-    job.setCountersParentPath(getCountersParentPath());
-    
-    if (_combineInputs)
-    {
-      job.setInputFormatClass(CombinedAvroKeyInputFormat.class);
-    }
-    else
-    {
-      job.setInputFormatClass(AvroKeyInputFormat.class);
-    }
-    
-    job.setOutputFormatClass(AvroKeyOutputFormat.class);
-    
-    AvroJob.setInputKeySchema(job, inputSchema);
-    AvroJob.setMapOutputKeySchema(job, getMapOutputKeySchema());
-    AvroJob.setMapOutputValueSchema(job, getMapOutputValueSchema());
-    AvroJob.setOutputKeySchema(job, getReduceOutputSchema());
-    
-    int numReducers;
-    if (getNumReducers() != null)
-    {
-      numReducers = getNumReducers();        
-      _log.info(String.format("Using %d reducers (fixed)",numReducers));
-    }
-    else      
-    {         
-      numReducers = estimator.getNumReducers();        
-      _log.info(String.format("Using %d reducers (computed)",numReducers));
-    }
-        
-    job.setNumReduceTasks(numReducers);
-    
-    job.setMapperClass(getMapperClass());
-    job.setReducerClass(getReducerClass());
-    
-    if (isUseCombiner() && getCombinerClass() != null)
-    {
-      job.setCombinerClass(getCombinerClass());
-    }
-    
-    config(job.getConfiguration());
-    
-    if (!job.waitForCompletion(true))
-    {
-      _log.error("Job failed! Quitting...");
-      throw new RuntimeException("Job failed");
-    }
-        
-    _report.jobId = job.getJobID().toString();
-    _report.jobName = job.getJobName();
-    _report.countersPath = job.getCountersPath();
-    _report.outputFile = new DatePath(latestInput.getDate(),timestampOutputPath);
-    
-    if (getRetentionCount() != null)
-    {
-      PathUtils.keepLatestDatedPaths(getFileSystem(), getOutputPath(), getRetentionCount());
-    }   
-  }
-  
-  /**
-   * Gets the key schema for the map output.
-   * 
-   * @return map output key schema
-   */
-  protected abstract Schema getMapOutputKeySchema();
-  
-  /**
-   * Gets the value schema for the map output.
-   * 
-   * @return map output value schema
-   */
-  protected abstract Schema getMapOutputValueSchema();
-  
-  /**
-   * Gets the reduce output schema.
-   * 
-   * @return reduce output schema
-   */
-  protected abstract Schema getReduceOutputSchema();
-  
-  /**
-   * Gets the mapper class.
-   * 
-   * @return the mapper
-   */
-  public abstract Class<? extends BaseMapper> getMapperClass();
- 
-  /**
-   * Gets the reducer class.
-   * 
-   * @return the reducer
-   */
-  public abstract Class<? extends BaseReducer> getReducerClass();
-  
-  /**
-   * Gets the combiner class.
-   * 
-   * @return the combiner
-   */
-  public Class<? extends BaseCombiner> getCombinerClass()
-  {
-    return null;
-  }  
-  
-  /**
-   * Mapper base class for {@link AbstractNonIncrementalJob}.
-   * 
-   * @author "Matthew Hayes"
-   *
-   */
-  public static abstract class BaseMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<GenericRecord>, AvroValue<GenericRecord>>
-  {    
-  }
-  
-  /**
-   * Combiner base class for {@link AbstractNonIncrementalJob}.
-   * 
-   * @author "Matthew Hayes"
-   *
-   */
-  public static abstract class BaseCombiner extends Reducer<AvroKey<GenericRecord>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, AvroValue<GenericRecord>>
-  {    
-  }
-  
-  /**
-   * Reducer base class for {@link AbstractNonIncrementalJob}.
-   * 
-   * @author "Matthew Hayes"
-   *
-   */
-  public static abstract class BaseReducer extends Reducer<AvroKey<GenericRecord>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable>
-  {    
-  }
-  
-  /**
-   * Reports files created and processed for an iteration of the job.
-   * 
-   * @author "Matthew Hayes"
-   *
-   */
-  public static class Report
-  {
-    private String jobName;
-    private String jobId;
-    private Path countersPath;
-    private List<DatePath> inputFiles = new ArrayList<DatePath>();
-    private DatePath outputFile;
-    
-    /**
-     * Gets the job name.
-     * 
-     * @return job name
-     */
-    public String getJobName()
-    {
-      return jobName;
-    }
-    
-    /**
-     * Gets the job ID.
-     * 
-     * @return job ID
-     */
-    public String getJobId()    
-    {
-      return jobId;
-    }
-    
-    /**
-     * Gets the path to the counters file, if one was written.
-     * 
-     * @return counters path
-     */
-    public Path getCountersPath()
-    {
-      return countersPath;
-    }
-    
-    /**
-     * Gets input files that were processed.  These are files that are within
-     * the desired date range.
-     * 
-     * @return input files
-     */
-    public List<DatePath> getInputFiles()
-    {
-      return Collections.unmodifiableList(inputFiles);
-    }
-    
-    /**
-     * Gets the output file that was produced by the job.
-     * 
-     * @return output file
-     */
-    public DatePath getOutputFile()
-    {
-      return outputFile;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractPartitionCollapsingIncrementalJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractPartitionCollapsingIncrementalJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractPartitionCollapsingIncrementalJob.java
deleted file mode 100644
index b01eaad..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractPartitionCollapsingIncrementalJob.java
+++ /dev/null
@@ -1,709 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-import java.sql.Date;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
-import org.apache.log4j.Logger;
-
-
-import datafu.hourglass.avro.AvroDateRangeMetadata;
-import datafu.hourglass.avro.AvroKeyWithMetadataOutputFormat;
-import datafu.hourglass.avro.AvroMultipleInputsKeyInputFormat;
-import datafu.hourglass.avro.AvroMultipleInputsUtil;
-import datafu.hourglass.fs.DatePath;
-import datafu.hourglass.fs.DateRange;
-import datafu.hourglass.fs.PathUtils;
-import datafu.hourglass.mapreduce.AvroKeyValueIdentityMapper;
-import datafu.hourglass.mapreduce.CollapsingCombiner;
-import datafu.hourglass.mapreduce.CollapsingMapper;
-import datafu.hourglass.mapreduce.CollapsingReducer;
-import datafu.hourglass.mapreduce.DelegatingCombiner;
-import datafu.hourglass.mapreduce.DelegatingMapper;
-import datafu.hourglass.mapreduce.DelegatingReducer;
-import datafu.hourglass.mapreduce.DistributedCacheHelper;
-import datafu.hourglass.mapreduce.Parameters;
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.model.Mapper;
-import datafu.hourglass.model.Merger;
-import datafu.hourglass.schemas.PartitionCollapsingSchemas;
-
-/**
- * An {@link IncrementalJob} that consumes partitioned input data and collapses the
- * partitions to produce a single output.  This job can be used to process data
- * using a sliding window.  It is capable of reusing the previous output, which
- * means that it can process data more efficiently.
- * Only Avro is supported for the input, intermediate, and output data.
- * 
- * <p>
- * Implementations of this class must provide key, intermediate value, and output value schemas.
- * The key and intermediate value schemas define the output for the mapper and combiner.
- * The key and output value schemas define the output for the reducer.
- * These are defined by overriding {@link #getKeySchema()}, {@link #getIntermediateValueSchema()},
- * and {@link #getOutputValueSchema()}.
- * </p>
- * 
- * <p>
- * Implementations must also provide a mapper by overriding {@link #getMapper()} and an accumulator
- * for the reducer by overriding {@link #getReducerAccumulator()}.  An optional combiner may be
- * provided by overriding {@link #getCombinerAccumulator()}.  For the combiner to be used
- * the property <em>use.combiner</em> must also be set to true.
- * </p>
- * 
- * <p>
- * The input path can be provided either through the property <em>input.path</em>
- * or by calling {@link #setInputPaths(List)}.  If multiple input paths are provided then
- * this implicitly means a join is to be performed.  Multiple input paths can be provided via
- * properties by prefixing each with <em>input.path.</em>, such as <em>input.path.first</em>
- * and <em>input.path.second</em>.
- * Input data must be partitioned by day according to the naming convention yyyy/MM/dd.
- * The output path can be provided either through the property <em>output.path</em> 
- * or by calling {@link #setOutputPath(Path)}.
- * Output data will be written using the naming convention yyyyMMdd, where the date used
- * to format the output path is the same as the end of the desired time range to process.
- * For example, if the desired time range to process is 2013/01/01 through 2013/01/14,
- * then the output will be named 20130114. 
- * By default the job will fail if any input data in the desired time window is missing.  This can be overriden by setting
- * <em>fail.on.missing</em> to false.
- * </p>
- * 
- * <p>
- * The job will not process input if the corresponding output has already been produced.  For example, if the desired date
- * range is 2013/01/01 through 2013/01/14 and the output 20130114 already exists, then it assumes the work has alreaday
- * been completed.
- * </p>
- * 
- * <p>
- * By default only the latest output will be kept.  All other outputs will be removed.  This can be controlled
- * by setting the property <em>retention.count</em>, or by calling {@link #setRetentionCount(Integer)}.
- * </p>
- * 
- * <p>
- * Two types of sliding windows may be used: <em>fixed-length</em> and <em>fixed-start</em>.  For a fixed-length
- * sliding window, the size of the window is fixed; the start and end move according to the
- * availability of input data.  For a fixed-start window, the size of the window is flexible;
- * the start is fixed and the end moves according to the availability of input data.
- * </p>
- * 
- * <p>
- * A fixed-length sliding window can be defined either by setting the property <em>num.days</em>
- * or by calling {@link #setNumDays(Integer)}.  This sets how many days of input data will be
- * consumed.  By default the end of the window will be the same as the date of the latest available
- * input data.  The start is then determine by the number of days to consume.  The end date can
- * be moved back relative to the latest input data by setting the <em>days.ago</em> property or
- * by calling {@link #setDaysAgo(Integer)}.  Since the end date is determined by the availability
- * of input data, as new data arrives the window will advance forward.
- * </p>
- * 
- * <p>
- * A fixed-start sliding window can be defined by setting the property <em>start.date</em> or
- * by calling {@link #setStartDate(java.util.Date)}.  The end date will be the same as the date of
- * the latest available input data.  The end date can
- * be moved back relative to the latest input data by setting the <em>days.ago</em> property or
- * by calling {@link #setDaysAgo(Integer)}.
- * Because the end date is determined by the availability of input data, as new data arrives the window 
- * will grow to include it.
- * </p>
- * 
- * <p>
- * Previous output can be reused by setting the <em>reuse.previous.output</em> property to true, or
- * by calling {@link #setReusePreviousOutput(boolean)}.  Reusing the previous output is often more efficient
- * because only input data outside of the time window covered by the previous output needs to be consumed.
- * For example, given a fixed-start sliding window job, if one new day of input data is available since the
- * last time the job ran, then the job can reuse the previous output and only read the newest day of data, rather
- * than reading all the input data again.  Given a fixed-length sliding window in the same scenario, the new output
- * can be produced by adding the newest input to the previous output and subtracting the oldest input from the old
- * window.
- * </p>
- * 
- * <p>
- * For a fixed-start sliding window, if the schema for the intermediate and output values are the same then no additional
- * changes are necessary, as the reducer's accumulator should be capable of adding the new input to the previous output.
- * However if they are different then a record must be defined by overriding {@link #getRecordMerger()} so that the previous
- * output can be merged with the partial output produced by reducing the new input data.
- * For the fixed-length sliding window one must override {@link #getOldRecordMerger()} to reuse the previous output.
- * This method essentially unmerges old, partial output data from the current output.  For this case as well if the intermediate
- * and output schemas are the same the {@link #getRecordMerger()} method does not need to be overriden.
- * </p>
- * 
- * <p>
- * The number of reducers to use is automatically determined based on the size of the data to process.  
- * The total size is computed and then divided by the value of the property <em>num.reducers.bytes.per.reducer</em>, which
- * defaults to 256 MB.  This is the number of reducers that will be used.  This calculation includes
- * the input data as well as previous output that will be reused.    It is also possible calculate the number of reducers
- * separately for the input and previous output through the properties <em>num.reducers.input.bytes.per.reducer</em>
- * and <em>num.reducers.previous.bytes.per.reducer</em>.  The reducers will be computed separately for the two sets of data
- * and then added together.  The number of reducers can also be set to a fixed value through the property <em>num.reducers</em>.
- * </p>
- * 
- * <p>
- * This type of job is capable of performing its work over multiple iterations if previous output can be reused.  
- * The number of days to process at a time can be limited by setting the property <em>max.days.to.process</em>,
- * or by calling {@link #setMaxToProcess(Integer)}.  The default is 90 days.  
- * This can be useful when there are restrictions on how many tasks 
- * can be used by a single MapReduce job in the cluster.  When this property is set, the job will process no more than
- * this many days at a time, and it will perform one or more iterations if necessary to complete the work.
- * The number of iterations can be limited by setting the property <em>max.iterations</em>, or by calling {@link #setMaxIterations(Integer)}.
- * If the number of iterations is exceeded the job will fail.  By default the maximum number of iterations is 20.
- * </p>
- *  
- * <p>
- * Hadoop configuration may be provided by setting a property with the prefix <em>hadoop-conf.</em>.
- * For example, <em>mapred.min.split.size</em> can be configured by setting property
- * <em>hadoop-conf.mapred.min.split.size</em> to the desired value. 
- * </p>
- * 
- * @author "Matthew Hayes"
- *
- */
-public abstract class AbstractPartitionCollapsingIncrementalJob extends IncrementalJob
-{
-  private final Logger _log = Logger.getLogger(AbstractPartitionCollapsingIncrementalJob.class);
-  
-  private List<Report> _reports = new ArrayList<Report>();
-  protected boolean _reusePreviousOutput;
-  private FileCleaner _garbage;
-  
-  /**
-   * Initializes the job.
-   */
-  public AbstractPartitionCollapsingIncrementalJob() throws IOException
-  {    
-  }
-  
-  /**
-   * Initializes the job with a job name and properties.
-   * 
-   * @param name job name
-   * @param props configuration properties
-   */
-  public AbstractPartitionCollapsingIncrementalJob(String name, Properties props) throws IOException
-  { 
-    super(name,props);
-  }
-  
-  /**
-   * Gets the mapper.
-   * 
-   * @return mapper
-   */
-  public abstract Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper();
-  
-  /**
-   * Gets the accumulator used for the combiner.
-   * 
-   * @return combiner accumulator
-   */
-  public Accumulator<GenericRecord,GenericRecord> getCombinerAccumulator()
-  {
-    return null;
-  }
-  
-  /**
-   * Gets the accumulator used for the reducer.
-   * 
-   * @return reducer accumulator
-   */
-  public abstract Accumulator<GenericRecord,GenericRecord> getReducerAccumulator(); 
-  
-  /**
-   * Gets the record merger that is capable of merging previous output with a new partial output.
-   * This is only needed when reusing previous output where the intermediate and output schemas are different.
-   * New partial output is produced by the reducer from new input that is after the previous output.
-   * 
-   * @return merger
-   */
-  public Merger<GenericRecord> getRecordMerger()
-  {
-    return null;
-  }
-
-  /**
-   * Gets the record merger that is capable of unmerging old partial output from the new output.
-   * This is only needed when reusing previous output for a fixed-length sliding window.
-   * The new output is the result of merging the previous output with the new partial output.
-   * The old partial output is produced by the reducer from old input data before the time range of
-   * the previous output. 
-   * 
-   * @return merger
-   */
-  public Merger<GenericRecord> getOldRecordMerger()
-  {
-    return null;
-  }
-  
-  /**
-   * Get the name for the reduce output schema. 
-   * By default this is the name of the class with "Output" appended.
-   * 
-   * @return output schema name
-   */
-  protected String getOutputSchemaName()
-  {
-    return this.getClass().getSimpleName() + "Output";
-  }
-  
-  /**
-   * Get the namespace for the reduce output schema.
-   * By default this is the package of the class.
-   * 
-   * @return output schema namespace
-   */
-  protected String getOutputSchemaNamespace()
-  {
-    return this.getClass().getPackage().getName();
-  }
-  
-  @Override
-  public void setProperties(Properties props)
-  {
-    super.setProperties(props);
-    
-    if (getProperties().get("reuse.previous.output") != null)
-    {
-      setReusePreviousOutput(Boolean.parseBoolean((String)getProperties().get("reuse.previous.output")));
-    }
-  }
-  
-  /**
-   * Get whether previous output should be reused.
-   * 
-   * @return true if previous output should be reused
-   */
-  public boolean getReusePreviousOutput()
-  {
-    return _reusePreviousOutput;
-  }
-  
-  /**
-   * Set whether previous output should be reused.
-   * 
-   * @param reuse true if previous output should be reused
-   */
-  public void setReusePreviousOutput(boolean reuse)
-  {
-    _reusePreviousOutput = reuse;
-  }
-  
-  @Override
-  protected void initialize()
-  { 
-    _garbage = new FileCleaner(getFileSystem());
-    
-    if (getMaxIterations() == null)
-    {
-      setMaxIterations(20);
-    }
-    
-    if (getMaxToProcess() == null)
-    {
-      if (getNumDays() != null)
-      {
-        setMaxToProcess(getNumDays());
-      }
-      else
-      {
-        setMaxToProcess(90);
-      }
-    }
-    
-    if (getRetentionCount() == null)
-    {
-      setRetentionCount(1);
-    }
-    
-    super.initialize();
-  }
-  
-  @Override
-  public void run() throws IOException, InterruptedException, ClassNotFoundException
-  {
-    try
-    {
-      initialize();
-      validate();
-      execute();
-    }
-    finally
-    {
-      cleanup();
-    }
-  }  
-  
-  /**
-   * Get reports that summarize each of the job iterations.
-   * 
-   * @return reports
-   */
-  public List<Report> getReports()
-  {
-    return Collections.unmodifiableList(_reports);
-  }
-  
-  /**
-   * Execute the job.
-   * 
-   * @throws IOException
-   * @throws InterruptedException
-   * @throws ClassNotFoundException
-   */
-  private void execute() throws IOException, InterruptedException, ClassNotFoundException
-  { 
-    int iterations = 0;
-    
-    while (true)
-    {
-      PartitionCollapsingExecutionPlanner planner = new PartitionCollapsingExecutionPlanner(getFileSystem(),getProperties());
-      planner.setInputPaths(getInputPaths());
-      planner.setOutputPath(getOutputPath());
-      planner.setStartDate(getStartDate());
-      planner.setEndDate(getEndDate());
-      planner.setDaysAgo(getDaysAgo());
-      planner.setNumDays(getNumDays());
-      planner.setMaxToProcess(getMaxToProcess());
-      planner.setReusePreviousOutput(getReusePreviousOutput());
-      planner.setFailOnMissing(isFailOnMissing());
-      planner.createPlan();      
-      
-      if (planner.getInputsToProcess().size() == 0)
-      {
-        _log.info("Nothing to do");
-        break;
-      }
-      
-      if (iterations >= getMaxIterations())
-      {
-        throw new RuntimeException(String.format("Already completed %d iterations but the max is %d and there are still %d inputs to process",
-                                                 iterations,
-                                                 getMaxIterations(),
-                                                 planner.getInputsToProcess().size()));
-      }
-      
-      Report report = new Report();
-      
-      report.inputFiles.addAll(planner.getNewInputsToProcess());
-      report.oldInputFiles.addAll(planner.getOldInputsToProcess());
-      if (planner.getPreviousOutputToProcess() != null)
-      {
-        report.reusedOutput = planner.getPreviousOutputToProcess();
-      }
-      
-      DatePath outputPath = DatePath.createDatedPath(getOutputPath(), planner.getCurrentDateRange().getEndDate());
-      
-      _log.info("Output path: " + outputPath);
-      
-      Path tempOutputPath = createRandomTempPath();  
-      
-      _garbage.add(tempOutputPath);
-            
-      final StagedOutputJob job = StagedOutputJob.createStagedJob(
-          getConf(),
-          getName() + "-" + PathUtils.datedPathFormat.format(planner.getCurrentDateRange().getEndDate()),            
-          null, // no input paths specified here, will add multiple inputs down below
-          tempOutputPath.toString(),
-          outputPath.getPath().toString(),
-          _log);
-      
-      job.setCountersParentPath(getCountersParentPath());
-
-      if (planner.getNewInputsToProcess() != null && planner.getNewInputsToProcess().size() > 0)
-      {
-        _log.info("*** New Input data:");
-        for (DatePath inputPath : planner.getNewInputsToProcess())
-        {
-          _log.info(inputPath.getPath());
-          MultipleInputs.addInputPath(job, inputPath.getPath(), AvroMultipleInputsKeyInputFormat.class, DelegatingMapper.class);
-        }
-      }
-      
-      if (planner.getOldInputsToProcess() != null && planner.getOldInputsToProcess().size() > 0)
-      {
-        _log.info("*** Old Input data:");
-        for (DatePath inputPath : planner.getOldInputsToProcess())
-        {
-          _log.info(inputPath.getPath());
-          MultipleInputs.addInputPath(job, inputPath.getPath(), AvroMultipleInputsKeyInputFormat.class, DelegatingMapper.class);
-        }
-      }
-      
-      if (planner.getPreviousOutputToProcess() != null)
-      {
-        _log.info("*** Previous output data:");
-        _log.info(planner.getPreviousOutputToProcess().getPath());
-        MultipleInputs.addInputPath(job, planner.getPreviousOutputToProcess().getPath(), AvroKeyInputFormat.class, AvroKeyValueIdentityMapper.class);
-      }
-      
-      final Configuration conf = job.getConfiguration();
-      
-      config(conf);
-      
-      AvroDateRangeMetadata.configureOutputDateRange(conf, planner.getCurrentDateRange());
-                  
-      PartitionCollapsingSchemas spSchemas = new PartitionCollapsingSchemas(getSchemas(), planner.getInputSchemasByPath(), getOutputSchemaName(), getOutputSchemaNamespace());
-      
-      job.setOutputFormatClass(AvroKeyWithMetadataOutputFormat.class);
-      
-      _log.info("Setting input path to schema mappings");
-      for (String path : spSchemas.getMapInputSchemas().keySet())
-      {
-        Schema schema = spSchemas.getMapInputSchemas().get(path);
-        _log.info("*** " + path);
-        _log.info("*** => " + schema.toString());
-        AvroMultipleInputsUtil.setInputKeySchemaForPath(job, schema, path);
-      }
-      
-      AvroJob.setMapOutputKeySchema(job, spSchemas.getMapOutputKeySchema());
-      AvroJob.setMapOutputValueSchema(job, spSchemas.getMapOutputValueSchema());
-      AvroJob.setOutputKeySchema(job, spSchemas.getReduceOutputSchema());
-                    
-      int numReducers;
-            
-      if (getNumReducers() != null)
-      {
-        numReducers = getNumReducers();        
-        _log.info(String.format("Using %d reducers (fixed)",numReducers));
-      }
-      else      
-      {         
-        numReducers = planner.getNumReducers();      
-        _log.info(String.format("Using %d reducers (computed)",numReducers));
-      }
-          
-      job.setNumReduceTasks(numReducers);
-      
-      job.setReducerClass(DelegatingReducer.class);
-      
-      Path mapperPath = new Path(tempOutputPath,".mapper_impl");
-      Path reducerPath = new Path(tempOutputPath,".reducer_impl");
-      Path combinerPath = new Path(tempOutputPath,".combiner_impl");
-      
-      CollapsingMapper mapper = new CollapsingMapper();
-      CollapsingReducer reducer = new CollapsingReducer();
-      
-      mapper.setSchemas(spSchemas);
-      reducer.setSchemas(spSchemas);
-      
-      mapper.setMapper(getMapper());
-      reducer.setAccumulator(getReducerAccumulator());
-      reducer.setRecordMerger(getRecordMerger());
-      reducer.setOldRecordMerger(getOldRecordMerger());
-
-      mapper.setReuseOutput(_reusePreviousOutput);
-      reducer.setReuseOutput(_reusePreviousOutput);
-      
-      configureOutputDateRange(job.getConfiguration(),planner.getCurrentDateRange(), reducer);
-      
-      DistributedCacheHelper.writeObject(conf, mapper, mapperPath);
-      DistributedCacheHelper.writeObject(conf, reducer, reducerPath);
-      
-      conf.set(Parameters.REDUCER_IMPL_PATH, reducerPath.toString());
-      conf.set(Parameters.MAPPER_IMPL_PATH, mapperPath.toString());
-      
-      if (isUseCombiner())
-      {
-        CollapsingCombiner combiner = new CollapsingCombiner();
-        configureOutputDateRange(job.getConfiguration(),planner.getCurrentDateRange(), combiner);
-        combiner.setReuseOutput(_reusePreviousOutput);
-        combiner.setSchemas(spSchemas);
-        combiner.setAccumulator(getCombinerAccumulator());
-        conf.set(Parameters.COMBINER_IMPL_PATH, combinerPath.toString());
-        job.setCombinerClass(DelegatingCombiner.class);
-        DistributedCacheHelper.writeObject(conf, combiner, combinerPath);
-      }
-      
-      if (!job.waitForCompletion(true))
-      {
-        _log.error("Job failed! Quitting...");
-        throw new RuntimeException("Job failed");
-      }
-      
-      report.jobId = job.getJobID().toString();
-      report.jobName = job.getJobName();
-      report.countersPath = job.getCountersPath();
-      report.outputPath = outputPath;
-      
-      _reports.add(report);
-      
-      applyRetention();
-      
-      if (!planner.getNeedsAnotherPass())
-      {
-        break;
-      }
-      
-      cleanup();
-      
-      iterations++;
-    }
-  }
-  
-  /**
-   * Removes all but the more recent ouputs that are within the retention period, if one is specified.
-   * 
-   * @throws IOException
-   */
-  private void applyRetention() throws IOException
-  {
-    if (getRetentionCount() != null)
-    {
-      PathUtils.keepLatestDatedPaths(getFileSystem(), getOutputPath(), getRetentionCount());
-    }
-  }
-  
-  /**
-   * Configures the output date range for processing components. 
-   * 
-   * @param conf configuration
-   * @param dateRange output date range
-   * @param proc processor
-   */
-  private static void configureOutputDateRange(Configuration conf, DateRange dateRange, DateRangeConfigurable proc)
-  {
-    Calendar cal = Calendar.getInstance(PathUtils.timeZone);
-    long beginTime = 0L;
-    long endTime = Long.MAX_VALUE;
-    
-    if (dateRange.getBeginDate() != null)
-    { 
-      cal.setTime(dateRange.getBeginDate());
-      beginTime = cal.getTimeInMillis();
-    }
-    
-    if (dateRange.getEndDate() != null)
-    {
-      cal.setTime(dateRange.getEndDate());
-      cal.getTimeInMillis();
-    }
-    
-    proc.setOutputDateRange(new DateRange(new Date(beginTime),new Date(endTime)));
-  }
-  
-  /**
-   * Remove all temporary paths. 
-   * 
-   * @throws IOException
-   */
-  private void cleanup() throws IOException
-  {
-    if (_garbage != null)
-    {
-      _garbage.clean();
-    }
-  }
-    
-  /**
-   * Reports files created and processed for an iteration of the job.
-   * 
-   * @author "Matthew Hayes"
-   *
-   */
-  public static class Report
-  {
-    private String jobName;
-    private String jobId;
-    private Path countersPath;
-    private DatePath outputPath;
-    private List<DatePath> inputFiles = new ArrayList<DatePath>();
-    private List<DatePath> oldInputFiles = new ArrayList<DatePath>();
-    private DatePath reusedOutput;
-    
-    /**
-     * Gets the job name.
-     * 
-     * @return job name
-     */
-    public String getJobName()
-    {
-      return jobName;
-    }
-    
-    /**
-     * Gets the job ID.
-     * 
-     * @return job ID
-     */
-    public String getJobId()    
-    {
-      return jobId;
-    }
-    
-    /**
-     * Gets the path to the counters file, if one was written.
-     * 
-     * @return counters path
-     */
-    public Path getCountersPath()
-    {
-      return countersPath;
-    }
-    
-    /**
-     * Gets the path to the output which was produced by the job. 
-     * 
-     * @return output path
-     */
-    public DatePath getOutputPath()
-    {
-      return outputPath;
-    }
-    
-    /**
-     * Gets the output that was reused, if one was reused. 
-     * 
-     * @return reused output path
-     */
-    public DatePath getReusedOutput()
-    {
-      return reusedOutput;
-    }
-    
-    /**
-     * Gets new input files that were processed.  These are files that are within
-     * the desired date range.
-     * 
-     * @return input files
-     */
-    public List<DatePath> getInputFiles()
-    {
-      return Collections.unmodifiableList(inputFiles);
-    }
-    
-    /**
-     * Gets old input files that were processed.  These are files that are before
-     * the desired date range and were subtracted from the reused output.
-     * 
-     * @return output files
-     */
-    public List<DatePath> getOldInputFiles()
-    {
-      return Collections.unmodifiableList(oldInputFiles);
-    }
-  }
-}