You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ja...@apache.org on 2014/05/18 21:59:50 UTC

[12/14] DATAFU-44: Migrate Hourglass to Gradle

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractPartitionPreservingIncrementalJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractPartitionPreservingIncrementalJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractPartitionPreservingIncrementalJob.java
deleted file mode 100644
index 74a8bc2..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractPartitionPreservingIncrementalJob.java
+++ /dev/null
@@ -1,700 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.log4j.Logger;
-
-
-import datafu.hourglass.avro.AvroMultipleInputsKeyInputFormat;
-import datafu.hourglass.avro.AvroMultipleInputsUtil;
-import datafu.hourglass.fs.DatePath;
-import datafu.hourglass.fs.PathUtils;
-import datafu.hourglass.mapreduce.DelegatingCombiner;
-import datafu.hourglass.mapreduce.DelegatingMapper;
-import datafu.hourglass.mapreduce.DelegatingReducer;
-import datafu.hourglass.mapreduce.DistributedCacheHelper;
-import datafu.hourglass.mapreduce.ObjectMapper;
-import datafu.hourglass.mapreduce.ObjectReducer;
-import datafu.hourglass.mapreduce.Parameters;
-import datafu.hourglass.mapreduce.PartitioningCombiner;
-import datafu.hourglass.mapreduce.PartitioningMapper;
-import datafu.hourglass.mapreduce.PartitioningReducer;
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.model.Mapper;
-import datafu.hourglass.schemas.PartitionPreservingSchemas;
-
-/**
- * An {@link IncrementalJob} that consumes partitioned input data and produces
- * output data having the same partitions.
- * Typically this is used in conjunction with {@link AbstractPartitionCollapsingIncrementalJob}
- * when computing aggregates over sliding windows.  A partition-preserving job can perform
- * initial aggregation per-day, which can then be consumed by a partition-collapsing job to
- * produce the final aggregates over the time window. 
- * Only Avro is supported for the input, intermediate, and output data.
- * 
- * <p>
- * Implementations of this class must provide key, intermediate value, and output value schemas.
- * The key and intermediate value schemas define the output for the mapper and combiner.
- * The key and output value schemas define the output for the reducer.
- * These are defined by overriding {@link #getKeySchema()}, {@link #getIntermediateValueSchema()},
- * and {@link #getOutputValueSchema()}.
- * </p>
- * 
- * <p>
- * Implementations must also provide a mapper by overriding {@link #getMapper()} and an accumulator
- * for the reducer by overriding {@link #getReducerAccumulator()}.  An optional combiner may be
- * provided by overriding {@link #getCombinerAccumulator()}.  For the combiner to be used
- * the property <em>use.combiner</em> must also be set to true.
- * </p>
- * 
- * <p>
- * The distinguishing feature this type of job is that the input partitioning is preserved in the ouput.
- * The data from each partition is processed independently of other partitions and then output separately.
- * For example, input that is partitioned by day can be aggregated by day and then output by day.
- * This is achieved by attaching a long value to each key, which represents the partition, so that the reducer
- * receives data grouped by the key and partition together.  Multiple outputs are then used so that the output
- * will have the same partitions as the input.
- * </p>
- * 
- * <p>
- * The input path can be provided either through the property <em>input.path</em>
- * or by calling {@link #setInputPaths(List)}.  If multiple input paths are provided then
- * this implicitly means a join is to be performed.  Multiple input paths can be provided via
- * properties by prefixing each with <em>input.path.</em>, such as <em>input.path.first</em>
- * and <em>input.path.second</em>.
- * Input data must be partitioned by day according to the naming convention yyyy/MM/dd.
- * The output path can be provided either through the property <em>output.path</em> 
- * or by calling {@link #setOutputPath(Path)}.
- * Output data will be written using the same naming convention as the input, namely yyyy/MM/dd, where the date used
- * to format the output path is the same the date for the input it was derived from.
- * For example, if the desired time range to process is 2013/01/01 through 2013/01/14,
- * then the output will be named 2013/01/01 through 2013/01/14. 
- * By default the job will fail if any input data in the desired time window is missing.  This can be overriden by setting
- * <em>fail.on.missing</em> to false.
- * </p>
- * 
- * <p>
- * The job will not process input for which a corresponding output already exists.  For example, if the desired date
- * range is 2013/01/01 through 2013/01/14 and the outputs 2013/01/01 through 2013/01/12 exist, then only
- * 2013/01/13 and 2013/01/14 will be processed and only 2013/01/13 and 2013/01/14 will be produced.  
- * </p>
- * 
- * <p>
- * The number of paths in the output to retain can be configured through the property <em>retention.count</em>, 
- * or by calling {@link #setRetentionCount(Integer)}.  When this property is set only the latest paths in the output
- * will be kept; the remainder will be removed.  By default there is no retention count set so all output paths are kept.
- * </p>
- * 
- * <p>
- * The inputs to process can be controlled by defining a desired date range.  By default the job will process all input
- * data available.  To limit the number of days of input to process one can set the property <em>num.days</em>
- * or call {@link #setNumDays(Integer)}.  This would define a processing window with the same number of days,
- * where the end date of the window is the latest available input and the start date is <em>num.days</em> ago.  
- * Only inputs within this window would be processed.
- * Because the end date is the same as the latest available input, as new input data becomes available the end of the
- * window will advance forward to include it.  The end date can be adjusted backwards relative to the latest input
- * through the property <em>days.ago</em>, or by calling {@link #setDaysAgo(Integer)}.  This subtracts as many days
- * from the latest available input date to determine the end date.  The start date or end date can also be fixed
- * by setting the properties <em>start.date</em> or <em>end.date</em>, or by calling {@link #setStartDate(Date)}
- * or {@link #setEndDate(Date)}. 
- * </p>
- * 
- * <p>
- * The number of reducers to use is automatically determined based on the size of the data to process.  
- * The total size is computed and then divided by the value of the property <em>num.reducers.bytes.per.reducer</em>, which
- * defaults to 256 MB.  This is the number of reducers that will be used.  
- * The number of reducers can also be set to a fixed value through the property <em>num.reducers</em>.
- * </p>
- * 
- * <p>
- * This type of job is capable of performing its work over multiple iterations.  
- * The number of days to process at a time can be limited by setting the property <em>max.days.to.process</em>,
- * or by calling {@link #setMaxToProcess(Integer)}.  The default is 90 days.  
- * This can be useful when there are restrictions on how many tasks 
- * can be used by a single MapReduce job in the cluster.  When this property is set, the job will process no more than
- * this many days at a time, and it will perform one or more iterations if necessary to complete the work.
- * The number of iterations can be limited by setting the property <em>max.iterations</em>, or by calling {@link #setMaxIterations(Integer)}.
- * If the number of iterations is exceeded the job will fail.  By default the maximum number of iterations is 20.
- * </p>
- * 
- * <p>
- * Hadoop configuration may be provided by setting a property with the prefix <em>hadoop-conf.</em>.
- * For example, <em>mapred.min.split.size</em> can be configured by setting property
- * <em>hadoop-conf.mapred.min.split.size</em> to the desired value. 
- * </p>
- * 
- * @author "Matthew Hayes"
- *
- */
-public abstract class AbstractPartitionPreservingIncrementalJob extends IncrementalJob
-{
-  private final Logger _log = Logger.getLogger(AbstractPartitionPreservingIncrementalJob.class);
-  
-  private List<Report> _reports = new ArrayList<Report>();
-  private PartitioningMapper _mapper;
-  private PartitioningCombiner _combiner;
-  private PartitioningReducer _reducer;
-  private FileCleaner _garbage;
-  
-  /**
-   * Initializes the job.
-   */
-  public AbstractPartitionPreservingIncrementalJob() throws IOException
-  {     
-  }
-  
-  /**
-   * Initializes the job with a job name and properties.
-   * 
-   * @param name job name
-   * @param props configuration properties
-   */
-  public AbstractPartitionPreservingIncrementalJob(String name, Properties props) throws IOException
-  { 
-    super(name,props);
-  }
-  
-  /**
-   * Gets the mapper.
-   * 
-   * @return mapper
-   */
-  public abstract Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper();
-  
-  /**
-   * Gets the accumulator used for the combiner.
-   * 
-   * @return combiner accumulator
-   */
-  public Accumulator<GenericRecord,GenericRecord> getCombinerAccumulator()
-  {
-    return null;
-  }
-  
-  /**
-   * Gets the accumulator used for the reducer.
-   * 
-   * @return reducer accumulator
-   */
-  public abstract Accumulator<GenericRecord,GenericRecord> getReducerAccumulator();
-  
-  /**
-   * Run the job.
-   * 
-   * @throws IOException
-   * @throws InterruptedException
-   * @throws ClassNotFoundException
-   */
-  @Override
-  public void run() throws IOException, InterruptedException, ClassNotFoundException
-  {     
-    try
-    {
-      initialize();
-      validate();
-      execute();
-    }
-    finally
-    {
-      cleanup();
-    }
-  }
-  
-  /**
-   * Get reports that summarize each of the job iterations.
-   * 
-   * @return reports
-   */
-  public List<Report> getReports()
-  {
-    return Collections.unmodifiableList(_reports);
-  }
-  
-  @Override
-  protected void initialize()
-  {
-    _garbage = new FileCleaner(getFileSystem());
-    
-    if (getMaxIterations() == null)
-    {
-      setMaxIterations(20);
-    }
-    
-    if (getMaxToProcess() == null)
-    {
-      if (getNumDays() != null)
-      {
-        setMaxToProcess(getNumDays());
-      }
-      else
-      {
-        setMaxToProcess(90);
-      }
-    }
-    
-    super.initialize();
-  }
-  
-  /**
-   * Get the name for the reduce output schema. 
-   * By default this is the name of the class with "Output" appended.
-   * 
-   * @return output schema name
-   */
-  protected String getOutputSchemaName()
-  {
-    return this.getClass().getSimpleName() + "Output";
-  }
-  
-  /**
-   * Get the namespace for the reduce output schema.
-   * By default this is the package of the class.
-   * 
-   * @return output schema namespace
-   */
-  protected String getOutputSchemaNamespace()
-  {
-    return this.getClass().getPackage().getName();
-  }
-  
-  protected ObjectMapper getMapProcessor()
-  {
-    return _mapper;
-  }
-  
-  protected ObjectReducer getCombineProcessor()
-  {
-    return _combiner;
-  }
-  
-  protected ObjectReducer getReduceProcessor()
-  {
-    return _reducer;
-  }
-   
-  /**
-   * Execute the job.
-   * 
-   * @throws IOException
-   * @throws InterruptedException
-   * @throws ClassNotFoundException
-   */
-  private void execute() throws IOException, InterruptedException, ClassNotFoundException
-  {  
-    int iterations = 0;
-    
-    while(true)
-    {
-      PartitionPreservingExecutionPlanner planner = new PartitionPreservingExecutionPlanner(getFileSystem(),getProperties());
-      planner.setInputPaths(getInputPaths());
-      planner.setOutputPath(getOutputPath());
-      planner.setStartDate(getStartDate());
-      planner.setEndDate(getEndDate());
-      planner.setDaysAgo(getDaysAgo());
-      planner.setNumDays(getNumDays());
-      planner.setMaxToProcess(getMaxToProcess());
-      planner.setFailOnMissing(isFailOnMissing());
-      planner.createPlan();
-      
-      if (planner.getInputsToProcess().size() == 0)
-      {
-        _log.info("Found all necessary incremental data");
-        break;
-      }
-      
-      if (iterations >= getMaxIterations())
-      {
-        throw new RuntimeException(String.format("Already completed %d iterations but the max is %d and there are still %d inputs to process",
-                                                 iterations,
-                                                 getMaxIterations(),
-                                                 planner.getInputsToProcess().size()));
-      }
-      
-      Path jobTempPath = createRandomTempPath();  
-      _garbage.add(jobTempPath);
-      ensurePath(getOutputPath());
-      
-      Path incrementalStagingPath = ensurePath(new Path(jobTempPath,".incremental-staging"));
-      Path incrementalStagingTmpPath = ensurePath(new Path(jobTempPath,".incremental-staging-tmp"));
-      
-      Report report = new Report();    
-           
-      // create input paths for job
-      List<String> inputPaths = new ArrayList<String>();
-      for (DatePath input : planner.getInputsToProcess())
-      {
-        inputPaths.add(input.getPath().toString());
-        report.inputFiles.add(input);
-      }
-      
-      _log.info("Staging path: " + incrementalStagingPath);
-      final StagedOutputJob job = StagedOutputJob.createStagedJob(
-                                    getConf(),
-                                    getName() + "-" + "incremental",            
-                                    inputPaths,
-                                    incrementalStagingTmpPath.toString(),
-                                    incrementalStagingPath.toString(),
-                                    _log);
-              
-      job.setCountersParentPath(getCountersParentPath());
-      
-      final Configuration conf = job.getConfiguration();
-      
-      config(conf);
-      
-      PartitionPreservingSchemas fpSchemas = new PartitionPreservingSchemas(getSchemas(), planner.getInputSchemasByPath(), getOutputSchemaName(), getOutputSchemaNamespace() );
-      
-      job.setInputFormatClass(AvroMultipleInputsKeyInputFormat.class);
-      
-      job.setOutputFormatClass(AvroKeyOutputFormat.class);
-      
-      _log.info("Setting input path to schema mappings");
-      for (String path : fpSchemas.getMapInputSchemas().keySet())
-      {
-        Schema schema = fpSchemas.getMapInputSchemas().get(path);
-        _log.info("*** " + path);
-        _log.info("*** => " + schema.toString());
-        AvroMultipleInputsUtil.setInputKeySchemaForPath(job, schema, path);
-      }
-            
-      AvroJob.setMapOutputKeySchema(job, fpSchemas.getMapOutputKeySchema());
-      AvroJob.setMapOutputValueSchema(job, fpSchemas.getMapOutputValueSchema());
-      AvroJob.setOutputKeySchema(job, fpSchemas.getReduceOutputSchema());
-            
-      StringBuilder inputTimesJoined = new StringBuilder();
-      for (Date input : planner.getDatesToProcess())
-      {
-        String namedOutput = PathUtils.datedPathFormat.format(input);
-        _log.info(String.format("Adding named output %s",namedOutput));
-        AvroMultipleOutputs.addNamedOutput(job, 
-                                           namedOutput, 
-                                           AvroKeyOutputFormat.class, 
-                                           fpSchemas.getReduceOutputSchema());
-        
-        inputTimesJoined.append(Long.toString(input.getTime()));
-        inputTimesJoined.append(",");
-      }
-      
-      int numReducers;
-      
-      if (getNumReducers() != null)
-      {
-        numReducers = getNumReducers();
-        _log.info(String.format("Using %d reducers (fixed)",numReducers));
-      }
-      else
-      {
-        numReducers = planner.getNumReducers();
-        _log.info(String.format("Using %d reducers (computed)",numReducers));
-      }
-      
-      int avgReducersPerInput = (int)Math.ceil(numReducers/(double)planner.getDatesToProcess().size());
-      
-      _log.info(String.format("Reducers per input path: %d", avgReducersPerInput));    
-      
-      // counters for multiple outputs
-//      conf.set("mo.counters", "true");
-      
-      conf.set(TimePartitioner.REDUCERS_PER_INPUT, Integer.toString(avgReducersPerInput));
-      conf.set(TimePartitioner.INPUT_TIMES, inputTimesJoined.substring(0,inputTimesJoined.length()-1));    
-              
-      job.setNumReduceTasks(numReducers);
-      
-      Path mapperPath = new Path(incrementalStagingPath,".mapper_impl");
-      Path reducerPath = new Path(incrementalStagingPath,".reducer_impl");
-      Path combinerPath = new Path(incrementalStagingPath,".combiner_impl");
-      
-      conf.set(Parameters.REDUCER_IMPL_PATH, reducerPath.toString());
-      conf.set(Parameters.MAPPER_IMPL_PATH, mapperPath.toString());
-      
-      _mapper = new PartitioningMapper();
-      _mapper.setSchemas(fpSchemas);
-      _mapper.setMapper(getMapper());
-
-      _reducer = new PartitioningReducer();
-      _reducer.setSchemas(fpSchemas);
-      _reducer.setAccumulator(getReducerAccumulator());
-      
-      DistributedCacheHelper.writeObject(conf, getMapProcessor(), mapperPath);
-      DistributedCacheHelper.writeObject(conf, getReduceProcessor(), reducerPath);
-      
-      job.setMapperClass(DelegatingMapper.class);
-      job.setReducerClass(DelegatingReducer.class);
-      
-      if (isUseCombiner())
-      {
-        _combiner = new PartitioningCombiner();
-        _combiner.setAccumulator(getCombinerAccumulator());
-        conf.set(Parameters.COMBINER_IMPL_PATH, combinerPath.toString());
-        job.setCombinerClass(DelegatingCombiner.class);
-        DistributedCacheHelper.writeObject(conf, getCombineProcessor(), combinerPath);
-      }
-      
-      job.setPartitionerClass(TimePartitioner.class);
-            
-      if (!job.waitForCompletion(true))
-      {
-        _log.error("Job failed! Quitting...");
-        throw new RuntimeException("Job failed");
-      }
-      
-      report.jobName = job.getJobName();
-      report.jobId = job.getJobID().toString();
-        
-      moveStagedFiles(report,incrementalStagingPath);
-      
-      if (getCountersParentPath() == null && job.getCountersPath() != null)
-      {
-        // save the counters in the target path, for lack of a better place to put it
-        Path counters = job.getCountersPath();
-        if (getFileSystem().exists(counters))
-        {
-          Path target = new Path(getOutputPath(),counters.getName());
-          if (getFileSystem().exists(target))
-          {
-            _log.info(String.format("Removing old counters at %s",target));
-            getFileSystem().delete(target, true);
-          }
-          _log.info(String.format("Moving %s to %s",counters.getName(),getOutputPath()));
-          getFileSystem().rename(counters, target);
-          
-          report.countersPath = target;
-        }
-        else
-        {
-          _log.error("Could not find counters at " + counters);
-        }
-      }
-
-      applyRetention();
-              
-      _reports.add(report);
-      
-      if (!planner.getNeedsAnotherPass())
-      {
-        break;
-      }
-      
-      cleanup();
-      
-      iterations++;
-    }
-  }
-  
-  /**
-   * Remove all temporary paths. 
-   * 
-   * @throws IOException
-   */
-  private void cleanup() throws IOException
-  {
-    if (_garbage != null)
-    {
-      _garbage.clean();
-    }
-  }
-  
-  /**
-   * Removes all but the more recent days from the ouput that are within the retention period, if one is specified.
-   * 
-   * @throws IOException
-   */
-  private void applyRetention() throws IOException
-  {
-    if (getRetentionCount() != null)
-    {
-      PathUtils.keepLatestNestedDatedPaths(getFileSystem(), getOutputPath(), getRetentionCount());
-    }
-  }
-  
-  /**
-   * Moves files from the staging path to the final output path.
-   * 
-   * @param report report to update with output paths
-   * @param sourcePath source of data to move
-   * @throws IOException
-   */
-  private void moveStagedFiles(Report report, Path sourcePath) throws IOException
-  {
-    _log.info("Following files produced in staging path:");
-    for (FileStatus stat : getFileSystem().globStatus(new Path(sourcePath,"*.avro")))
-    {
-      _log.info(String.format("* %s (%d bytes)",stat.getPath(),stat.getLen()));
-    }
-    
-    FileStatus[] incrementalParts = getFileSystem().globStatus(new Path(sourcePath,"*"), new PathFilter() {
-      @Override
-      public boolean accept(Path path)
-      {
-        String[] pathParts = path.getName().split("-");
-        try
-        {
-          Long.parseLong(pathParts[0]);
-          return true;
-        }
-        catch (NumberFormatException e)
-        {
-          return false;
-        }
-      }
-    });
-    
-    // collect the new incremental data from the temp folder and move to subfolders
-    Map<String,Path> incrementalTargetPaths = new HashMap<String,Path>();
-    for (FileStatus stat : incrementalParts)
-    {        
-      String[] pathParts = stat.getPath().getName().split("-");
-      try
-      {        
-        String timestamp = pathParts[0];
-        
-        if (!incrementalTargetPaths.containsKey(timestamp))
-        {
-          Path parent = new Path(sourcePath,timestamp);
-          
-          if (!getFileSystem().exists(parent))
-          {
-            getFileSystem().mkdirs(parent);
-          }
-          else
-          {
-            throw new RuntimeException("already exists: " + parent.toString());
-          }
-          
-          incrementalTargetPaths.put(timestamp,parent);
-        }
-        
-        Path parent = incrementalTargetPaths.get(timestamp);
-        _log.info(String.format("Moving %s to %s",stat.getPath().getName(),parent.toString()));
-        getFileSystem().rename(stat.getPath(), new Path(parent,stat.getPath().getName()));
-      }
-      catch (NumberFormatException e)
-      {
-        throw new RuntimeException(e);
-      }
-    }
-    
-    for (Path src : incrementalTargetPaths.values())
-    {
-      Date srcDate;
-      try
-      {
-        srcDate = PathUtils.datedPathFormat.parse(src.getName());
-      }
-      catch (ParseException e)
-      {
-        throw new RuntimeException(e);
-      }
-      Path target = new Path(getOutputPath(),PathUtils.nestedDatedPathFormat.format(srcDate));
-      _log.info(String.format("Moving %s to %s",src.getName(),target));
-      
-      getFileSystem().mkdirs(target.getParent());
-      
-      if (!getFileSystem().rename(src, target))
-      {
-        throw new RuntimeException("Failed to rename " + src + " to " + target);
-      }
-      
-      report.outputFiles.add(new DatePath(srcDate,target));
-    }
-  }
-  
-  /**
-   * Reports files created and processed for an iteration of the job.
-   * 
-   * @author "Matthew Hayes"
-   *
-   */
-  public static class Report
-  {
-    private String jobName;
-    private String jobId;
-    private Path countersPath;
-    private List<DatePath> inputFiles = new ArrayList<DatePath>();
-    private List<DatePath> outputFiles = new ArrayList<DatePath>();
-    
-    /**
-     * Gets the job name.
-     * 
-     * @return job name
-     */
-    public String getJobName()
-    {
-      return jobName;
-    }
-    
-    /**
-     * Gets the job ID.
-     * 
-     * @return job ID
-     */
-    public String getJobId()    
-    {
-      return jobId;
-    }
-    
-    /**
-     * Gets the path to the counters file, if one was written.
-     * 
-     * @return counters path
-     */
-    public Path getCountersPath()
-    {
-      return countersPath;
-    }
-    
-    /**
-     * Gets input files that were processed.  These are files that are within
-     * the desired date range.
-     * 
-     * @return new input files
-     */
-    public List<DatePath> getInputFiles()
-    {
-      return Collections.unmodifiableList(inputFiles);
-    }
-    
-    /**
-     * Gets the output files that were produced by the job.
-     * 
-     * @return old input files
-     */
-    public List<DatePath> getOutputFiles()
-    {
-      return Collections.unmodifiableList(outputFiles);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/DateRangeConfigurable.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/DateRangeConfigurable.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/DateRangeConfigurable.java
deleted file mode 100644
index 5a9211b..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/DateRangeConfigurable.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import datafu.hourglass.fs.DateRange;
-
-/**
- * An interface for an object with a configurable output date range.
- * 
- * @author "Matthew Hayes"
- *
- */
-public interface DateRangeConfigurable 
-{
-  /**
-   * Sets the date range for the output.
-   * 
-   * @param dateRange output date range
-   */
-  public void setOutputDateRange(DateRange dateRange);
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/DateRangePlanner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/DateRangePlanner.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/DateRangePlanner.java
deleted file mode 100644
index 651a27f..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/DateRangePlanner.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-
-import org.apache.log4j.Logger;
-
-import datafu.hourglass.fs.DateRange;
-import datafu.hourglass.fs.PathUtils;
-
-/**
- * Determines the date range of inputs which should be processed.
- * 
- * @author "Matthew Hayes"
- *
- */
-public class DateRangePlanner
-{
-  private final static Logger _log = Logger.getLogger(DateRangePlanner.class);
-  
-  /**
-   * Determines the date range of inputs which should be processed.
-   *  
-   * @param beginDateOverride Begin date
-   * @param endDateOverride End date
-   * @param available The input dates which are available
-   * @param daysAgo Number of days to subtract from the end date
-   * @param numDays Number of days to process
-   * @param failOnMissing true to fail if days are missing in desired date range
-   * @return desired date range for inputs to be processed
-   */
-  public static DateRange getDateRange(Date beginDateOverride, 
-                                       Date endDateOverride,
-                                       Collection<Date> available,
-                                       Integer daysAgo,
-                                       Integer numDays,
-                                       boolean failOnMissing)
-  {
-    Date beginDate = null;
-    Date endDate = null;
-    
-    Calendar cal = Calendar.getInstance(PathUtils.timeZone);
-            
-    // determine the range of available input data
-    Date beginAvailable = null;
-    Date endAvailable = null;    
-    if (available != null && available.size() > 0)
-    {
-      ArrayList<Date> sortedAvailable = new ArrayList<Date>(available);
-      Collections.sort(sortedAvailable);
-      beginAvailable = sortedAvailable.get(0);
-      endAvailable = sortedAvailable.get(sortedAvailable.size()-1);
-    }
-    else
-    {
-      throw new IllegalArgumentException("No data available");
-    }
-    
-    if (endDateOverride != null && beginDateOverride != null)
-    {
-      beginDate = beginDateOverride;
-      endDate = endDateOverride;
-      
-      _log.info(String.format("Specified begin date is %s",
-                              PathUtils.datedPathFormat.format(beginDate)));
-      _log.info(String.format("Specified end date is %s",
-                              PathUtils.datedPathFormat.format(endDate)));
-      
-      if (daysAgo != null)
-      {
-        throw new IllegalArgumentException("Cannot specify days ago when begin and end date set");
-      }
-       
-      if (numDays != null)
-      {
-        throw new IllegalArgumentException("Cannot specify num days when begin and end date set");
-      }
-    }
-    else if (beginDateOverride != null)
-    {
-      beginDate = beginDateOverride;
-      
-      _log.info(String.format("Specified begin date is %s",
-                              PathUtils.datedPathFormat.format(beginDate)));
-      
-      if (numDays != null)
-      { 
-        cal.setTime(beginDate);
-        cal.add(Calendar.DAY_OF_MONTH, numDays-1);
-        endDate = cal.getTime();
-        _log.info(String.format("Num days is %d, giving end date of %s",
-                                numDays, PathUtils.datedPathFormat.format(endDate)));
-      }
-    }
-    else if (endDateOverride != null)
-    {
-      endDate = endDateOverride;
-      
-      _log.info(String.format("Specified end date is %s",
-                              PathUtils.datedPathFormat.format(endDate)));
-      
-      if (numDays != null)
-      {
-        cal.setTime(endDate);
-        cal.add(Calendar.DAY_OF_MONTH, -(numDays-1));
-        beginDate = cal.getTime();
-        _log.info(String.format("Num days is %d, giving begin date of %s",
-                                numDays, PathUtils.datedPathFormat.format(beginDate)));
-      }
-    }
-    
-    if (endDate == null)
-    {
-      endDate = endAvailable;
-            
-      _log.info(String.format("No end date specified, using date for latest available input: %s",
-                              PathUtils.datedPathFormat.format(endDate)));
-      
-      if (daysAgo != null)
-      {
-        cal.setTime(endDate);
-        cal.add(Calendar.DAY_OF_MONTH, -daysAgo);
-        endDate = cal.getTime();
-        _log.info(String.format("However days ago is %d, giving end date of %s",
-                                daysAgo, PathUtils.datedPathFormat.format(endDate)));
-      }
-    }
-    
-    if (endAvailable.compareTo(endDate) < 0 && failOnMissing)
-    {
-      throw new IllegalArgumentException(String.format("Latest available date %s is less than desired end date %s",
-                                                       PathUtils.datedPathFormat.format(endAvailable),
-                                                       PathUtils.datedPathFormat.format(endDate)));
-    }
-    
-    if (beginDate == null)
-    {
-      beginDate = beginAvailable;
-      
-      if (numDays != null)
-      {
-        cal.setTime(endDate);
-        cal.add(Calendar.DAY_OF_MONTH, -(numDays-1));
-        beginDate = cal.getTime();
-        _log.info(String.format("Num days is %d, giving begin date of %s",
-                                numDays, PathUtils.datedPathFormat.format(beginDate)));
-      }
-    }
-    
-    if (beginAvailable.compareTo(beginDate) > 0 && failOnMissing)
-    {
-      throw new IllegalArgumentException(String.format("Desired begin date is %s but the next available date is %s",                                                       
-                                                       PathUtils.datedPathFormat.format(beginDate),
-                                                       PathUtils.datedPathFormat.format(beginAvailable)));
-    }
-    
-    _log.info(String.format("Determined date range of inputs to consume is [%s,%s]",
-                           PathUtils.datedPathFormat.format(beginDate),
-                           PathUtils.datedPathFormat.format(endDate)));
-    
-    return new DateRange(beginDate, endDate);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/ExecutionPlanner.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/ExecutionPlanner.java
deleted file mode 100644
index e075770..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/ExecutionPlanner.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Properties;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-import datafu.hourglass.fs.DatePath;
-import datafu.hourglass.fs.DateRange;
-import datafu.hourglass.fs.PathUtils;
-
-/**
- * Base class for execution planners.  An execution planner determines which files should be processed
- * for a particular run.
- * 
- * @author "Matthew Hayes"
- *
- */
-public abstract class ExecutionPlanner
-{
-  private final Logger _log = Logger.getLogger(ExecutionPlanner.class);
-  
-  private FileSystem _fileSystem;
-  private Properties _props;
-  private Date _startDate;
-  private Date _endDate;
-  private Integer _daysAgo;
-  private Integer _numDays;
-  private Integer _maxToProcess;
-  private DateRange _range;
-  private Path _outputPath;
-  private boolean _failOnMissing;
-  private List<Path> _inputPaths = new ArrayList<Path>();
-  private List<SortedMap<Date,DatePath>> _inputPathsByDate;
-  private Map<Date,List<DatePath>> _availableInputsByDate = new HashMap<Date,List<DatePath>>();
-  
-  /**
-   * Initializes the execution planner.
-   * 
-   * @param fs file system to use
-   * @param props configuration properties
-   */
-  public ExecutionPlanner(FileSystem fs, Properties props)
-  {
-    _props = props;
-    _fileSystem = fs;
-  }
-  
-  /**
-   * Gets the output path.
-   * 
-   * @return output path
-   */
-  public Path getOutputPath()
-  {
-    return _outputPath;
-  }
-
-  /**
-   * Gets the input paths.
-   * 
-   * @return input paths
-   */
-  public List<Path> getInputPaths()
-  {
-    return _inputPaths;
-  }
-
-  /**
-   * Sets the output path.
-   * 
-   * @param outputPath output path
-   */
-  public void setOutputPath(Path outputPath)
-  {
-    this._outputPath = outputPath;
-  }
-
-  /**
-   * Sets the input paths.
-   * 
-   * @param inputPaths input paths
-   */
-  public void setInputPaths(List<Path> inputPaths)
-  {
-    this._inputPaths = inputPaths;
-  }
-  
-  /**
-   * Sets the start date.
-   * 
-   * @param startDate start date
-   */
-  public void setStartDate(Date startDate)
-  {
-    this._startDate = startDate;
-  }
-  
-  /**
-   * Gets the start date
-   * 
-   * @return start date
-   */
-  public Date getStartDate()
-  {
-    return _startDate;
-  }
-
-  /**
-   * Sets the end date.
-   * 
-   * @param endDate end date
-   */
-  public void setEndDate(Date endDate)
-  {
-    this._endDate = endDate;
-  }
-  
-  /**
-   * Gets the end date
-   * 
-   * @return end date
-   */
-  public Date getEndDate()
-  {
-    return _endDate;
-  }
-  
-  /**
-   * Sets the number of days to subtract off the end date. 
-   * 
-   * @param daysAgo days ago
-   */
-  public void setDaysAgo(Integer daysAgo)
-  {
-    this._daysAgo = daysAgo;
-  }
-  
-  /**
-   * Gets the number of days to subtract off the end date. 
-   * 
-   * @return days ago
-   */
-  public Integer getDaysAgo()
-  {
-    return _daysAgo;
-  }
-
-  /**
-   * Sets the number of days to process.
-   * 
-   * @param numDays number of days to process
-   */
-  public void setNumDays(Integer numDays)
-  {
-    this._numDays = numDays;
-  }
-  
-  /**
-   * Gets the number of days to process.
-   * 
-   * @return number of days to process
-   */
-  public Integer getNumDays()
-  {
-    return _numDays;
-  }
-
-  /**
-   * Sets the maximum number of days to process at a time.
-   * 
-   * @param maxToProcess maximum number of days
-   */
-  public void setMaxToProcess(Integer maxToProcess)
-  {
-    this._maxToProcess = maxToProcess;
-  }
-  
-  /**
-   * Gets the maximum number of days to process at a time.
-   * 
-   * @return maximum number of days
-   */
-  public Integer getMaxToProcess()
-  {
-    return _maxToProcess;
-  }
-  
-  /**
-   * Gets whether the job should fail if data is missing within the desired date range.
-   * 
-   * @return true if the job should fail on missing data
-   */
-  public boolean isFailOnMissing()
-  {
-    return _failOnMissing;
-  }
-
-  /**
-   * Sets whether the job should fail if data is missing within the desired date range.
-   * 
-   * @param failOnMissing true if the job should fail on missing data
-   */
-  public void setFailOnMissing(boolean failOnMissing)
-  {
-    this._failOnMissing = failOnMissing;
-  }
-  
-  /**
-   * Gets the desired input date range to process based on the configuration and available inputs.
-   * 
-   * @return desired date range
-   */
-  public DateRange getDateRange()
-  {
-    return _range;
-  }
-
-  /**
-   * Gets the file system.
-   * 
-   * @return file system
-   */
-  protected FileSystem getFileSystem()
-  {
-    return _fileSystem;
-  }
-  
-  /**
-   * Gets the configuration properties.
-   * 
-   * @return properties
-   */
-  protected Properties getProps()
-  {
-    return _props;
-  }
-  
-  /**
-   * Gets a map from date to available input data.
-   * 
-   * @return map from date to available input data
-   */
-  protected Map<Date,List<DatePath>> getAvailableInputsByDate()
-  {
-    return _availableInputsByDate;
-  }
-  
-  /**
-   * Get a map from date to path for all paths matching yyyy/MM/dd under the given path.
-   * 
-   * @param path path to search under
-   * @return map of date to path
-   * @throws IOException
-   */
-  protected SortedMap<Date,DatePath> getDailyData(Path path) throws IOException
-  {
-    SortedMap<Date,DatePath> data = new TreeMap<Date,DatePath>();
-    for (DatePath datePath : PathUtils.findNestedDatedPaths(getFileSystem(),path))
-    {
-      data.put(datePath.getDate(),datePath);
-    }
-    return data;
-  }
-  
-  /**
-   * Get a map from date to path for all paths matching yyyyMMdd under the given path.
-   * 
-   * @param path path to search under
-   * @return map of date to path
-   * @throws IOException
-   */
-  protected SortedMap<Date,DatePath> getDatedData(Path path) throws IOException
-  {
-    SortedMap<Date,DatePath> data = new TreeMap<Date,DatePath>();
-    for (DatePath datePath : PathUtils.findDatedPaths(getFileSystem(),path))
-    {
-      data.put(datePath.getDate(),datePath);
-    }
-    return data;
-  }
-  
-  /**
-   * Determine what input data is available.
-   * 
-   * @throws IOException
-   */
-  protected void loadInputData() throws IOException
-  {
-    _inputPathsByDate = new ArrayList<SortedMap<Date,DatePath>>();
-    for (Path inputPath : getInputPaths())
-    {
-      _log.info(String.format("Searching for available input data in " + inputPath));
-      _inputPathsByDate.add(getDailyData(inputPath));
-    }
-  }
-  
-  /**
-   * Determines what input data is available.
-   */
-  protected void determineAvailableInputDates()
-  {
-    // first find the latest date available for all inputs
-    PriorityQueue<Date> dates = new PriorityQueue<Date>();
-    for (SortedMap<Date,DatePath> pathMap : _inputPathsByDate)
-    {
-      for (Date date : pathMap.keySet())
-      {
-        dates.add(date);
-      }
-    }
-    if (dates.size() == 0)
-    {
-      throw new RuntimeException("No input data!");
-    }
-    List<Date> available = new ArrayList<Date>();
-    Date currentDate = dates.peek();
-    int found = 0;
-    int needed = getInputPaths().size();
-    while (currentDate != null)
-    {
-      Date date = dates.poll();
-      
-      if (date != null && date.equals(currentDate))
-      {
-        found++;
-      }
-      else
-      {
-        if (found == needed)
-        {
-          available.add(currentDate);
-        }
-        else if (available.size() > 0)
-        {
-          _log.info("Did not find all input data for date " + PathUtils.datedPathFormat.format(currentDate));
-          _log.info("Paths found for " + PathUtils.datedPathFormat.format(currentDate) + ":");
-          // collect what's available for this date
-          for (SortedMap<Date,DatePath> pathMap : _inputPathsByDate)
-          {
-            DatePath path = pathMap.get(currentDate);
-            if (path != null)
-            {
-              _log.info("=> " + path);
-            }
-          }
-          
-          if (_failOnMissing)
-          {
-            throw new RuntimeException("Did not find all input data for date " + PathUtils.datedPathFormat.format(currentDate));
-          }
-          else
-          {
-            available.add(currentDate);
-          }
-        }
-        
-        found = 0;          
-        currentDate = date;
-        
-        if (currentDate != null)
-        {
-          found++;
-        }
-      }        
-      
-      if (found > needed)
-      {
-        throw new RuntimeException("found more than needed");
-      }        
-    }
-    
-    _availableInputsByDate.clear();
-    
-    for (Date date : available)
-    {
-      List<DatePath> paths = new ArrayList<DatePath>();
-      for (SortedMap<Date,DatePath> map : _inputPathsByDate)
-      {
-        DatePath path = map.get(date);
-        if (path != null)
-        {
-          paths.add(path);
-        }
-      }
-      _availableInputsByDate.put(date, paths);
-    }
-  }
-  
-  /**
-   * Determine the date range for inputs to process based on the configuration and available inputs.
-   */
-  protected void determineDateRange()
-  {
-    _log.info("Determining range of input data to consume");
-    _range = DateRangePlanner.getDateRange(getStartDate(), getEndDate(), getAvailableInputsByDate().keySet(), getDaysAgo(), getNumDays(), isFailOnMissing());
-    if (_range.getBeginDate() == null || _range.getEndDate() == null)
-    {
-      throw new RuntimeException("Expected start and end date");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/FileCleaner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/FileCleaner.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/FileCleaner.java
deleted file mode 100644
index 1dfc24c..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/FileCleaner.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-/**
- * Used to remove files from the file system when they are no longer needed.
- * 
- * @author "Matthew Hayes"
- *
- */
-public class FileCleaner
-{
-  private final Logger log = Logger.getLogger(FileCleaner.class);
-  
-  private final Set<Path> garbage = new HashSet<Path>();
-  private final FileSystem fs;
-  
-  public FileCleaner(FileSystem fs)
-  {
-    this.fs = fs;
-  }
-  
-  /**
-   * Add a path to be removed later.
-   * 
-   * @param path
-   * @return added path
-   */
-  public Path add(Path path)
-  {
-    garbage.add(path);
-    return path;
-  }
-  
-  /**
-   * Add a path to be removed later.
-   * 
-   * @param path
-   * @return added path
-   */
-  public String add(String path)
-  {
-    garbage.add(new Path(path));
-    return path;
-  }
-  
-  /**
-   * Removes added paths from the file system.
-   * 
-   * @throws IOException
-   */
-  @SuppressWarnings("unchecked")
-  public void clean() throws IOException
-  {
-    List<Path> sorted = new ArrayList<Path>(garbage);
-    Collections.sort(sorted);
-    for (Path p : sorted)
-    {
-      if (fs.exists(p))
-      {
-        log.info(String.format("Removing %s",p));
-        fs.delete(p, true);
-      }
-    }
-    garbage.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/IncrementalJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/IncrementalJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/IncrementalJob.java
deleted file mode 100644
index 0a25cba..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/IncrementalJob.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.util.Properties;
-
-import org.apache.avro.Schema;
-
-import datafu.hourglass.schemas.TaskSchemas;
-
-/**
- * Base class for incremental jobs.  Incremental jobs consume day-partitioned input data.  
- * 
- * <p>
- * Implementations of this class must provide key, intermediate value, and output value schemas.
- * The key and intermediate value schemas define the output for the mapper and combiner.
- * The key and output value schemas define the output for the reducer.
- * </p>
- * 
- * <p>
- * This class has the same configuration and methods as {@link TimeBasedJob}.
- * In addition it also recognizes the following properties:
- * </p>
- * 
- * <ul>
- *   <li><em>max.iterations</em> - maximum number of iterations for the job</li>
- *   <li><em>max.days.to.process</em> - maximum number of days of input data to process in a single run</li>
- *   <li><em>fail.on.missing</em> - whether the job should fail if input data within the desired range is missing</li>
- * </ul>
- * 
- * @author "Matthew Hayes"
- *
- */
-public abstract class IncrementalJob extends TimeBasedJob
-{
-  private Integer _maxToProcess;
-  private Integer _maxIterations;
-  private boolean _failOnMissing;
-  private TaskSchemas _schemas;
-  
-  /**
-   * Initializes the job.
-   */
-  public IncrementalJob()
-  {    
-  }
-
-  /**
-   * Initializes the job with a job name and properties.
-   * 
-   * @param name job name
-   * @param props configuration properties
-   */
-  public IncrementalJob(String name, Properties props)
-  {        
-    super(name,props);
-  }
-  
-  public void setProperties(Properties props)
-  {
-    super.setProperties(props);
-        
-    if (getProperties().get("max.iterations") != null)
-    {
-      setMaxIterations(Integer.parseInt((String)getProperties().get("max.iterations")));
-    }
-    
-    if (getProperties().get("max.days.to.process") != null)
-    {
-      setMaxToProcess(Integer.parseInt((String)getProperties().get("max.days.to.process")));
-    }
-    
-    if (getProperties().get("fail.on.missing") != null)
-    {
-      setFailOnMissing(Boolean.parseBoolean((String)getProperties().get("max.days.to.process")));
-    }
-  }
-  
-  protected void initialize()
-  {
-    super.initialize();
-    
-    if (getKeySchema() == null)
-    {
-      throw new RuntimeException("Key schema not specified");
-    }
-
-    if (getIntermediateValueSchema() == null)
-    {
-      throw new RuntimeException("Intermediate schema not specified");
-    }
-
-    if (getOutputValueSchema() == null)
-    {
-      throw new RuntimeException("Output schema not specified");
-    }
-    
-    _schemas = new TaskSchemas.Builder()
-      .setKeySchema(getKeySchema())
-      .setIntermediateValueSchema(getIntermediateValueSchema())
-      .setOutputValueSchema(getOutputValueSchema())
-      .build();
-  }
-  
-  /**
-   * Gets the Avro schema for the key.
-   * <p>
-   * This is also used as the key for the map output.
-   * 
-   * @return key schema.
-   */
-  protected abstract Schema getKeySchema();
-  
-  /**
-   * Gets the Avro schema for the intermediate value.
-   * <p>
-   * This is also used for the value for the map output.
-   * 
-   * @return intermediate value schema
-   */
-  protected abstract Schema getIntermediateValueSchema();
-  
-  /**
-   * Gets the Avro schema for the output data.
-   * 
-   * @return output data schema
-   */
-  protected abstract Schema getOutputValueSchema();
-  
-  /**
-   * Gets the schemas.
-   * 
-   * @return schemas
-   */
-  protected TaskSchemas getSchemas()
-  {
-    return _schemas;
-  }
-  
-  /**
-   * Gets the maximum number of days of input data to process in a single run.
-   * 
-   * @return maximum number of days to process
-   */
-  public Integer getMaxToProcess()
-  {
-    return _maxToProcess;
-  }
-
-  /**
-   * Sets the maximum number of days of input data to process in a single run.
-   * 
-   * @param maxToProcess maximum number of days to process
-   */
-  public void setMaxToProcess(Integer maxToProcess)
-  {
-    _maxToProcess = maxToProcess;
-  }
-
-  /**
-   * Gets the maximum number of iterations for the job.  Multiple iterations will only occur
-   * when there is a maximum set for the number of days to process in a single run.
-   * An error should be thrown if this number will be exceeded.
-   * 
-   * @return maximum number of iterations
-   */
-  public Integer getMaxIterations()
-  {
-    return _maxIterations;
-  }
-
-  /**
-   * Sets the maximum number of iterations for the job.  Multiple iterations will only occur
-   * when there is a maximum set for the number of days to process in a single run.
-   * An error should be thrown if this number will be exceeded.
-   * 
-   * @param maxIterations maximum number of iterations
-   */
-  public void setMaxIterations(Integer maxIterations)
-  {
-    _maxIterations = maxIterations;
-  }
-
-  /**
-   * Gets whether the job should fail if input data within the desired range is missing. 
-   * 
-   * @return true if the job should fail on missing data
-   */
-  public boolean isFailOnMissing()
-  {
-    return _failOnMissing;
-  }
-
-  /**
-   * Sets whether the job should fail if input data within the desired range is missing. 
-   * 
-   * @param failOnMissing true if the job should fail on missing data
-   */
-  public void setFailOnMissing(boolean failOnMissing)
-  {
-    _failOnMissing = failOnMissing;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/MaxInputDataExceededException.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/MaxInputDataExceededException.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/MaxInputDataExceededException.java
deleted file mode 100644
index f991752..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/MaxInputDataExceededException.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package datafu.hourglass.jobs;
-
-public class MaxInputDataExceededException extends Throwable
-{
-  public MaxInputDataExceededException()
-  {    
-  }
-  
-  public MaxInputDataExceededException(String message)
-  {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java
deleted file mode 100644
index 35e9294..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java
+++ /dev/null
@@ -1,555 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.SortedMap;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-import datafu.hourglass.avro.AvroDateRangeMetadata;
-import datafu.hourglass.fs.DatePath;
-import datafu.hourglass.fs.DateRange;
-import datafu.hourglass.fs.PathUtils;
-
-/**
- * Execution planner used by {@link AbstractPartitionCollapsingIncrementalJob} and its derived classes.
- * This creates a plan to process partitioned input data and collapse the partitions into a single output.
- * 
- * <p>
- * To use this class, the input and output paths must be specified.  In addition the desired input date
- * range can be specified through several methods.  Then {@link #createPlan()} can be called and the
- * execution plan will be created.  The inputs to process will be available from {@link #getInputsToProcess()},
- * the number of reducers to use will be available from {@link #getNumReducers()}, and the input schemas
- * will be available from {@link #getInputSchemas()}.
- * </p>
- * 
- * <p>
- * Previous output may be reused by using {@link #setReusePreviousOutput(boolean)}.  If previous output exists
- * and it is to be reused then it will be available from {@link #getPreviousOutputToProcess()}.  New input data
- * to process that is after the previous output time range is available from {@link #getNewInputsToProcess()}.
- * Old input data to process that is before the previous output time range and should be subtracted from the
- * previous output is available from {@link #getOldInputsToProcess()}.
- * </p>
- * 
- * <p>
- * Configuration properties are used to configure a {@link ReduceEstimator} instance.  This is used to 
- * calculate how many reducers should be used.  
- * The number of reducers to use is based on the input data size and the 
- * <em>num.reducers.bytes.per.reducer</em> property.  This setting can be controlled more granularly
- * through <em>num.reducers.input.bytes.per.reducer</em> and <em>num.reducers.previous.bytes.per.reducer</em>.
- * Check {@link ReduceEstimator} for more details on how the properties are used.
- * </p>
- * 
- * @author "Matthew Hayes"
- *
- */
-public class PartitionCollapsingExecutionPlanner extends ExecutionPlanner
-{
-  private final Logger _log = Logger.getLogger(PartitionCollapsingExecutionPlanner.class);
-
-  private SortedMap<Date,DatePath> _outputPathsByDate;
-  private boolean _reusePreviousOutput;
-  
-  // the chosen execution plan
-  private Plan _plan;
-  
-  /**
-   * An execution plan.  Encapsulates what inputs will be processed.
-   * 
-   * @author mhayes
-   *
-   */
-  private class Plan
-  {
-    private List<DatePath> _inputsToProcess = new ArrayList<DatePath>();
-    private List<DatePath> _newInputsToProcess = new ArrayList<DatePath>();
-    private List<DatePath> _oldInputsToProcess = new ArrayList<DatePath>();
-    private Map<String,String> _latestInputByPath = new HashMap<String,String>();
-    private DatePath _previousOutputToProcess;
-    private List<Schema> _inputSchemas = new ArrayList<Schema>();
-    private Map<String,Schema> _inputSchemasByPath = new HashMap<String,Schema>();
-    private boolean _needAnotherPass;
-    private DateRange _currentDateRange;
-    private int _numReducers;
-    private Long _totalBytes;
-    
-    public void finalizePlan() throws IOException
-    {
-      determineInputSchemas();
-      determineNumReducers();
-      determineTotalBytes();
-    }
-    
-    /**
-     * Determines the number of bytes that will be consumed by this execution plan.
-     * This is used to compare alternative plans so the one with the least bytes
-     * consumed can be used.
-     * 
-     * @throws IOException
-     */
-    private void determineTotalBytes() throws IOException
-    {
-      _totalBytes = 0L;
-      for (DatePath dp : _inputsToProcess)
-      {
-        _totalBytes += PathUtils.countBytes(getFileSystem(), dp.getPath());
-      }
-      if (_previousOutputToProcess != null)
-      {
-        _totalBytes += PathUtils.countBytes(getFileSystem(), _previousOutputToProcess.getPath());
-      }
-      _log.info("Total bytes consumed: " + _totalBytes);
-    }
-    
-    /**
-     * Determines the input schemas.  There may be multiple input schemas because multiple inputs are allowed.
-     * The latest available inputs are used to determine the schema, the assumption being that schemas are
-     * backwards-compatible.
-     * 
-     * @throws IOException
-     */
-    private void determineInputSchemas() throws IOException
-    {
-      if (_latestInputByPath.size() > 0)
-      {
-        _log.info("Determining input schemas");
-        for (Entry<String,String> entry : _latestInputByPath.entrySet())
-        {
-          String root = entry.getKey();
-          String input = entry.getValue();
-          _log.info("Loading schema for " + input);
-          Schema schema = PathUtils.getSchemaFromPath(getFileSystem(),new Path(input));
-          _inputSchemas.add(schema);
-          _inputSchemasByPath.put(root, schema);
-        }
-      }
-    }
-    
-    /**
-     * Determines the number of reducers to use based on the input data size and the previous output,
-     * if it exists and is being reused.
-     * The number of reducers to use is based on the input data size and the 
-     * <em>num.reducers.bytes.per.reducer</em> property.  This setting can be controlled more granularly
-     * through <em>num.reducers.input.bytes.per.reducer</em> and <em>num.reducers.previous.bytes.per.reducer</em>.
-     * See {@link ReduceEstimator} for details on reducer estimation.
-     * 
-     * @throws IOException
-     */
-    private void determineNumReducers() throws IOException
-    {
-      ReduceEstimator estimator = new ReduceEstimator(getFileSystem(),getProps());
-      List<String> inputPaths = new ArrayList<String>();
-      for (DatePath input : _inputsToProcess)
-      {
-        inputPaths.add(input.getPath().toString());
-        estimator.addInputPath("input",input.getPath());
-      }
-      if (_previousOutputToProcess != null)
-      {
-        estimator.addInputPath("previous",_previousOutputToProcess.getPath());
-      }
-      _numReducers = estimator.getNumReducers();
-    }
-  }
-  
-  /**
-   * Initializes the execution planner.
-   * 
-   * @param fs file system
-   * @param props configuration properties
-   */
-  public PartitionCollapsingExecutionPlanner(FileSystem fs, Properties props)
-  {
-    super(fs, props);
-  }
-
-  /**
-   * Create the execution plan.
-   * 
-   * @throws IOException
-   */
-  public void createPlan() throws IOException
-  {
-    if (_plan != null) throw new RuntimeException("Plan already exists");
-    
-    _log.info("Creating execution plan");
-    
-    loadInputData();
-    loadOutputData();    
-    determineAvailableInputDates();
-    determineDateRange();
-    
-    List<Plan> plans = new ArrayList<Plan>();
-    Plan plan;
-    
-    if (_reusePreviousOutput)
-    {
-      _log.info("Output may be reused, will create alternative plan that does not reuse output");
-      plan = new Plan();
-      try
-      {
-        determineInputsToProcess(false,plan);
-        plan.finalizePlan();
-        plans.add(plan);
-      }
-      catch (MaxInputDataExceededException e)
-      {
-        _log.info(e.getMessage());
-      }
-    }
-    
-    _log.info(String.format("Creating plan that %s previous output",(_reusePreviousOutput ? "reuses" : "does not reuse")));
-    plan = new Plan();
-    try
-    {
-      determineInputsToProcess(_reusePreviousOutput,plan);
-    }
-    catch (MaxInputDataExceededException e)
-    {
-      throw new RuntimeException(e);
-    }
-    plan.finalizePlan();
-    plans.add(plan);
-    
-    if (plans.size() > 1)
-    { 
-      _log.info(String.format("There are %d alternative execution plans:",plans.size()));
-      
-      for (Plan option : plans)
-      {
-        _log.info(String.format("* Consume %d new inputs, %d old inputs, %s previous output (%d bytes)",
-                                option._newInputsToProcess.size(),
-                                option._oldInputsToProcess.size(),
-                                option._previousOutputToProcess != null ? "reuse" : "no",
-                                option._totalBytes));
-      }
-      
-      // choose plan with least bytes consumed
-      Collections.sort(plans, new Comparator<Plan>() {
-        @Override
-        public int compare(Plan o1, Plan o2)
-        {
-          return o1._totalBytes.compareTo(o2._totalBytes);
-        }      
-      });
-      _plan = plans.get(0);
-      
-      _log.info(String.format("Choosing plan consuming %d bytes",_plan._totalBytes));
-    }
-    else
-    {
-      _plan = plans.get(0);
-    }
-  } 
-
-  /**
-   * Gets whether previous output should be reused, if it exists.
-   * 
-   * @return true if previous output should be reused
-   */
-  public boolean getReusePreviousOutput()
-  {
-    return _reusePreviousOutput;
-  }
-  
-  /**
-   * Sets whether previous output should be reused, if it exists.
-   * 
-   * @param reuse true if previous output should be reused
-   */
-  public void setReusePreviousOutput(boolean reuse)
-  {
-    _reusePreviousOutput = reuse;
-  }
-  
-  /**
-   * Get the number of reducers to use based on the input and previous output data size.
-   * Must call {@link #createPlan()} first.
-   * 
-   * @return number of reducers to use
-   */
-  public int getNumReducers()
-  {
-    checkPlanExists();
-    return getPlan()._numReducers;
-  }
-  
-  public DateRange getCurrentDateRange()
-  {
-    checkPlanExists();
-    return getPlan()._currentDateRange;
-  }
-  
-  /**
-   * Gets the previous output to reuse, or null if no output is being reused.
-   * Must call {@link #createPlan()} first.
-   * 
-   * @return previous output to reuse, or null
-   */
-  public DatePath getPreviousOutputToProcess()
-  {
-    return getPlan()._previousOutputToProcess;
-  }
-  
-  /**
-   * Gets all inputs that will be processed.  This includes both old and new data.
-   * Must call {@link #createPlan()} first.
-   * 
-   * @return inputs to process
-   */
-  public List<DatePath> getInputsToProcess()
-  {
-    return getPlan()._inputsToProcess;
-  }
-  
-  /**
-   * Gets only the new data that will be processed.  New data is data that falls within the 
-   * desired date range.
-   * Must call {@link #createPlan()} first.
-   * 
-   * @return new inputs to process
-   */
-  public List<DatePath> getNewInputsToProcess()
-  {
-    return getPlan()._newInputsToProcess;
-  }
-  
-  /**
-   * Gets only the old data that will be processed.  Old data is data that falls before the
-   * desired date range.  It will be subtracted out from the previous output.
-   * Must call {@link #createPlan()} first.
-   * 
-   * @return old inputs to process
-   */
-  public List<DatePath> getOldInputsToProcess()
-  {
-    return getPlan()._oldInputsToProcess;
-  }
-  
-  /**
-   * Gets whether another pass will be required.  Because there may be a limit on the number of inputs processed 
-   * in a single run, multiple runs may be required to process all data in the desired date range.  
-   * Must call {@link #createPlan()} first.
-   * 
-   * @return true if another pass is required
-   */
-  public boolean getNeedsAnotherPass()
-  {
-    return getPlan()._needAnotherPass;
-  }
-  
-  /**
-   * Gets the input schemas.  Because multiple inputs are allowed, there may be multiple schemas.
-   * Must call {@link #createPlan()} first.
-   * 
-   * <p>
-   * This does not include the output schema, even though previous output may be fed back as input.
-   * The reason is that the ouput schema it determined based on the input schema.
-   * </p>
-   * 
-   * @return input schemas
-   */
-  public List<Schema> getInputSchemas()
-  {
-    return getPlan()._inputSchemas;
-  }
-  
-  /**
-   * Gets a map from input path to schema.  Because multiple inputs are allowed, there may be multiple schemas.
-   * Must call {@link #createPlan()} first.
-   * 
-   * @return map from path to input schema
-   */
-  public Map<String,Schema> getInputSchemasByPath()
-  {
-    return getPlan()._inputSchemasByPath;
-  }
-  
-  /**
-   * Determines what output data already exists.  Previous output may be reused.
-   * 
-   * @throws IOException
-   */
-  private void loadOutputData() throws IOException
-  {
-    if (getOutputPath() == null)
-    {
-      throw new RuntimeException("No output path specified");
-    }
-    _log.info(String.format("Searching for existing output data in " + getOutputPath()));
-    _outputPathsByDate = getDatedData(getOutputPath());
-    _log.info(String.format("Found %d output paths",_outputPathsByDate.size()));
-  }
-  
-  /**
-   * Determines what input data to process.
-   * 
-   * <p>
-   * The input data to consume is determined by the desired date range.  If previous output is not reused then the input data to process
-   * will coincide with the date range.  If previous output may be reused and previous output exists, then the input data to process 
-   * will consist of new data and potentially old data.  The new input data to process is data that has time after the previous output date range,
-   * so that it may be added to the previous output.
-   * The old data to process is data that has time before the previous output date range, so that it may be subtracted from the previous output.
-   * </p>
-   * 
-   * <p>
-   * If there is a limit on how many days of input data can be processed then it may be the case that not all input data will be processed in
-   * a single run.
-   * </p>
-   * 
-   * @throws IOException
-   * @throws MaxInputDataExceededException 
-   */
-  private void determineInputsToProcess(boolean reusePreviousOutput, Plan plan) throws IOException, MaxInputDataExceededException
-  {
-    Calendar cal = Calendar.getInstance(PathUtils.timeZone);    
-        
-    DateRange outputDateRange = null;
-    
-    if (reusePreviousOutput)
-    {
-      if (_outputPathsByDate.size() > 0)
-      {
-        DatePath latestPriorOutput = _outputPathsByDate.get(Collections.max(_outputPathsByDate.keySet()));
-        _log.info("Have previous output, determining what previous incremental data to difference out");
-        outputDateRange = AvroDateRangeMetadata.getOutputFileDateRange(getFileSystem(),latestPriorOutput.getPath());
-        _log.info(String.format("Previous output has date range %s to %s",
-                  PathUtils.datedPathFormat.format(outputDateRange.getBeginDate()),
-                  PathUtils.datedPathFormat.format(outputDateRange.getEndDate())));
-        
-        for (Date currentDate=outputDateRange.getBeginDate(); 
-             currentDate.compareTo(getDateRange().getBeginDate()) < 0
-             && currentDate.compareTo(outputDateRange.getEndDate()) <= 0;)
-        {
-          if (!getAvailableInputsByDate().containsKey(currentDate))
-          {  
-            throw new RuntimeException(String.format("Missing incremental data for %s, so can't remove it from previous output",PathUtils.datedPathFormat.format(currentDate)));
-          }
-          
-          List<DatePath> inputs = getAvailableInputsByDate().get(currentDate);
-          
-          for (DatePath input : inputs)
-          {
-            _log.info(String.format("Old Input: %s",input.getPath()));
-            plan._inputsToProcess.add(input);
-            plan._oldInputsToProcess.add(input);
-            
-            Path root = PathUtils.getNestedPathRoot(input.getPath());
-            plan._latestInputByPath.put(root.toString(), input.getPath().toString());
-          }
-                                  
-          cal.setTime(currentDate);
-          cal.add(Calendar.DAY_OF_MONTH, 1);
-          currentDate = cal.getTime();
-        }
-          
-        plan._previousOutputToProcess = latestPriorOutput;
-        _log.info("Previous Output: " + plan._previousOutputToProcess.getPath());
-      }
-      else
-      {
-        _log.info("No previous output to reuse");
-      }
-    }
-    
-    // consume the incremental data and produce the final output
-    
-    int newDataCount = 0;
-    Date startDate = getDateRange().getBeginDate();
-    Date endDate = startDate;
-    for (Date currentDate=startDate; currentDate.compareTo(getDateRange().getEndDate()) <= 0; )
-    { 
-      if (getMaxToProcess() != null && newDataCount >= getMaxToProcess())
-      {
-        if (!reusePreviousOutput)
-        {
-          throw new MaxInputDataExceededException(String.format("Amount of input data has exceeded max of %d however output is not being reused so cannot do in multiple passes", getMaxToProcess()));
-        }
-        
-        // too much data to process in a single run, will require another pass
-        plan._needAnotherPass = true;
-        break;
-      }
-      
-      if (outputDateRange == null || currentDate.compareTo(outputDateRange.getEndDate()) > 0)
-      {
-        if (!getAvailableInputsByDate().containsKey(currentDate))
-        {
-          if (isFailOnMissing())
-          {
-            throw new RuntimeException("missing " + PathUtils.datedPathFormat.format(currentDate));            
-          }
-          else
-          {
-            _log.info("No input data found for " + PathUtils.datedPathFormat.format(currentDate));
-          }
-        }
-        else
-        {
-          List<DatePath> inputs = getAvailableInputsByDate().get(currentDate);
-          
-          for (DatePath input : inputs)
-          {
-            _log.info(String.format("New Input: %s",input.getPath()));
-            plan._inputsToProcess.add(input);
-            plan._newInputsToProcess.add(input);
-            
-            Path root = PathUtils.getNestedPathRoot(input.getPath());
-            plan._latestInputByPath.put(root.toString(), input.getPath().toString());
-          }
-                    
-          newDataCount++;
-        }
-      }
-      
-      cal.setTime(currentDate);
-      endDate = cal.getTime();
-      cal.add(Calendar.DAY_OF_MONTH, 1);
-      currentDate = cal.getTime();
-    }
-    
-    plan._currentDateRange = new DateRange(startDate,endDate);
-  } 
-  
-  /**
-   * Throws an exception if the plan hasn't been created.
-   */
-  private void checkPlanExists()
-  {
-    if (_plan == null) throw new RuntimeException("Must call createPlan first");
-  }
-  
-  private Plan getPlan()
-  {
-    checkPlanExists();
-    return _plan;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionCollapsingIncrementalJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionCollapsingIncrementalJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionCollapsingIncrementalJob.java
deleted file mode 100644
index f92b76b..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionCollapsingIncrementalJob.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.model.Mapper;
-import datafu.hourglass.model.Merger;
-
-/**
- * A concrete version of {@link AbstractPartitionCollapsingIncrementalJob}.
- * 
- * This provides an alternative to extending {@link AbstractPartitionCollapsingIncrementalJob}.
- * Instead of extending this class and implementing the abstract methods, this concrete version
- * can be used instead.  Getters and setters have been provided for the abstract methods. 
- * 
- * @author "Matthew Hayes"
- *
- */
-public class PartitionCollapsingIncrementalJob extends AbstractPartitionCollapsingIncrementalJob
-{
-  private Mapper<GenericRecord,GenericRecord,GenericRecord> _mapper;
-  private Accumulator<GenericRecord,GenericRecord> _combiner;
-  private Accumulator<GenericRecord,GenericRecord> _reducer;
-  private Schema _keySchema;
-  private Schema _intermediateValueSchema;
-  private Schema _outputValueSchema;
-  private Merger<GenericRecord> _merger;
-  private Merger<GenericRecord> _oldMerger;
-  private Setup _setup;
-
-  /**
-   * Initializes the job.  The job name is derived from the name of a provided class.
-   * 
-   * @param cls class to base job name on
-   * @throws IOException
-   */
-  public PartitionCollapsingIncrementalJob(@SuppressWarnings("rawtypes") Class cls) throws IOException
-  {
-    setName(cls.getName());
-  }
-
-  @Override
-  public Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper()
-  {
-    return _mapper;
-  }
-
-  @Override
-  public Accumulator<GenericRecord,GenericRecord> getCombinerAccumulator()
-  {
-    return _combiner;
-  }
-  
-  @Override
-  public Accumulator<GenericRecord,GenericRecord> getReducerAccumulator()
-  {
-    return _reducer;
-  }
-
-  @Override
-  protected Schema getKeySchema()
-  {
-    return _keySchema;
-  }
-
-  @Override
-  protected Schema getIntermediateValueSchema()
-  {
-    return _intermediateValueSchema;
-  }
-
-  @Override
-  protected Schema getOutputValueSchema()
-  {
-    return _outputValueSchema;
-  }
-
-  @Override
-  public Merger<GenericRecord> getRecordMerger()
-  {
-    return _merger;
-  }
-
-  @Override
-  public Merger<GenericRecord> getOldRecordMerger()
-  {
-    return _oldMerger;
-  }
-
-  /**
-   * Set the mapper.
-   * 
-   * @param mapper
-   */
-  public void setMapper(Mapper<GenericRecord,GenericRecord,GenericRecord> mapper)
-  {
-    this._mapper = mapper;
-  }
-
-  /**
-   * Set the accumulator for the combiner
-   * 
-   * @param combiner accumulator for the combiner
-   */
-  public void setCombinerAccumulator(Accumulator<GenericRecord,GenericRecord> combiner)
-  {
-    this._combiner = combiner;
-  }
-
-  /**
-   * Set the accumulator for the reducer.
-   * 
-   * @param reducer accumulator for the reducer
-   */
-  public void setReducerAccumulator(Accumulator<GenericRecord,GenericRecord> reducer)
-  {
-    this._reducer = reducer;
-  }
-
-  /**
-   * Sets the Avro schema for the key.
-   * <p>
-   * This is also used as the key for the map output.
-   * 
-   * @param keySchema key schema
-   */
-  public void setKeySchema(Schema keySchema)
-  {
-    this._keySchema = keySchema;
-  }
-  
-  /**
-   * Sets the Avro schema for the intermediate value.
-   * <p>
-   * This is also used for the value for the map output.
-   * 
-   * @param intermediateValueSchema intermediate value schema
-   */
-  public void setIntermediateValueSchema(Schema intermediateValueSchema)
-  {
-    this._intermediateValueSchema = intermediateValueSchema;
-  }
-  
-  /**
-   * Sets the Avro schema for the output data.
-   *  
-   * @param outputValueSchema output value schema
-   */
-  public void setOutputValueSchema(Schema outputValueSchema)
-  {
-    this._outputValueSchema = outputValueSchema;
-  }
-
-  /**
-   * Sets the record merger that is capable of merging previous output with a new partial output.
-   * This is only needed when reusing previous output where the intermediate and output schemas are different.
-   * New partial output is produced by the reducer from new input that is after the previous output.
-   * 
-   * @param merger
-   */
-  public void setMerger(Merger<GenericRecord> merger)
-  {
-    this._merger = merger;
-  }
-
-  /**
-   * Sets the record merger that is capable of unmerging old partial output from the new output.
-   * This is only needed when reusing previous output for a fixed-length sliding window.
-   * The new output is the result of merging the previous output with the new partial output.
-   * The old partial output is produced by the reducer from old input data before the time range of
-   * the previous output. 
-   * 
-   * @param oldMerger merger
-   */
-  public void setOldMerger(Merger<GenericRecord> oldMerger)
-  {
-    this._oldMerger = oldMerger;
-  } 
-  
-  /**
-   * Set callback to provide custom configuration before job begins execution.
-   * 
-   * @param setup object with callback method
-   */
-  public void setOnSetup(Setup setup)
-  {
-    _setup = setup;
-  }
-  
-  @Override
-  public void config(Configuration conf)
-  {    
-    super.config(conf);
-    if (_setup != null)
-    {
-      _setup.setup(conf);
-    }
-  } 
-}