You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ja...@apache.org on 2014/05/18 21:59:43 UTC

[05/14] DATAFU-44: Migrate Hourglass to Gradle

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/ExecutionPlanner.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/ExecutionPlanner.java
new file mode 100644
index 0000000..ce1c1d6
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/ExecutionPlanner.java
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Properties;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import datafu.hourglass.fs.DatePath;
+import datafu.hourglass.fs.DateRange;
+import datafu.hourglass.fs.PathUtils;
+
+/**
+ * Base class for execution planners.  An execution planner determines which files should be processed
+ * for a particular run.
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public abstract class ExecutionPlanner
+{
+  private final Logger _log = Logger.getLogger(ExecutionPlanner.class);
+  
+  private FileSystem _fileSystem;
+  private Properties _props;
+  private Date _startDate;
+  private Date _endDate;
+  private Integer _daysAgo;
+  private Integer _numDays;
+  private Integer _maxToProcess;
+  private DateRange _range;
+  private Path _outputPath;
+  private boolean _failOnMissing;
+  private List<Path> _inputPaths = new ArrayList<Path>();
+  private List<SortedMap<Date,DatePath>> _inputPathsByDate;
+  private Map<Date,List<DatePath>> _availableInputsByDate = new HashMap<Date,List<DatePath>>();
+  
+  /**
+   * Initializes the execution planner.
+   * 
+   * @param fs file system to use
+   * @param props configuration properties
+   */
+  public ExecutionPlanner(FileSystem fs, Properties props)
+  {
+    _props = props;
+    _fileSystem = fs;
+  }
+  
+  /**
+   * Gets the output path.
+   * 
+   * @return output path
+   */
+  public Path getOutputPath()
+  {
+    return _outputPath;
+  }
+
+  /**
+   * Gets the input paths.
+   * 
+   * @return input paths
+   */
+  public List<Path> getInputPaths()
+  {
+    return _inputPaths;
+  }
+
+  /**
+   * Sets the output path.
+   * 
+   * @param outputPath output path
+   */
+  public void setOutputPath(Path outputPath)
+  {
+    this._outputPath = outputPath;
+  }
+
+  /**
+   * Sets the input paths.
+   * 
+   * @param inputPaths input paths
+   */
+  public void setInputPaths(List<Path> inputPaths)
+  {
+    this._inputPaths = inputPaths;
+  }
+  
+  /**
+   * Sets the start date.
+   * 
+   * @param startDate start date
+   */
+  public void setStartDate(Date startDate)
+  {
+    this._startDate = startDate;
+  }
+  
+  /**
+   * Gets the start date
+   * 
+   * @return start date
+   */
+  public Date getStartDate()
+  {
+    return _startDate;
+  }
+
+  /**
+   * Sets the end date.
+   * 
+   * @param endDate end date
+   */
+  public void setEndDate(Date endDate)
+  {
+    this._endDate = endDate;
+  }
+  
+  /**
+   * Gets the end date
+   * 
+   * @return end date
+   */
+  public Date getEndDate()
+  {
+    return _endDate;
+  }
+  
+  /**
+   * Sets the number of days to subtract off the end date. 
+   * 
+   * @param daysAgo days ago
+   */
+  public void setDaysAgo(Integer daysAgo)
+  {
+    this._daysAgo = daysAgo;
+  }
+  
+  /**
+   * Gets the number of days to subtract off the end date. 
+   * 
+   * @return days ago
+   */
+  public Integer getDaysAgo()
+  {
+    return _daysAgo;
+  }
+
+  /**
+   * Sets the number of days to process.
+   * 
+   * @param numDays number of days to process
+   */
+  public void setNumDays(Integer numDays)
+  {
+    this._numDays = numDays;
+  }
+  
+  /**
+   * Gets the number of days to process.
+   * 
+   * @return number of days to process
+   */
+  public Integer getNumDays()
+  {
+    return _numDays;
+  }
+
+  /**
+   * Sets the maximum number of days to process at a time.
+   * 
+   * @param maxToProcess maximum number of days
+   */
+  public void setMaxToProcess(Integer maxToProcess)
+  {
+    this._maxToProcess = maxToProcess;
+  }
+  
+  /**
+   * Gets the maximum number of days to process at a time.
+   * 
+   * @return maximum number of days
+   */
+  public Integer getMaxToProcess()
+  {
+    return _maxToProcess;
+  }
+  
+  /**
+   * Gets whether the job should fail if data is missing within the desired date range.
+   * 
+   * @return true if the job should fail on missing data
+   */
+  public boolean isFailOnMissing()
+  {
+    return _failOnMissing;
+  }
+
+  /**
+   * Sets whether the job should fail if data is missing within the desired date range.
+   * 
+   * @param failOnMissing true if the job should fail on missing data
+   */
+  public void setFailOnMissing(boolean failOnMissing)
+  {
+    this._failOnMissing = failOnMissing;
+  }
+  
+  /**
+   * Gets the desired input date range to process based on the configuration and available inputs.
+   * 
+   * @return desired date range
+   */
+  public DateRange getDateRange()
+  {
+    return _range;
+  }
+
+  /**
+   * Gets the file system.
+   * 
+   * @return file system
+   */
+  protected FileSystem getFileSystem()
+  {
+    return _fileSystem;
+  }
+  
+  /**
+   * Gets the configuration properties.
+   * 
+   * @return properties
+   */
+  protected Properties getProps()
+  {
+    return _props;
+  }
+  
+  /**
+   * Gets a map from date to available input data.
+   * 
+   * @return map from date to available input data
+   */
+  protected Map<Date,List<DatePath>> getAvailableInputsByDate()
+  {
+    return _availableInputsByDate;
+  }
+  
+  /**
+   * Get a map from date to path for all paths matching yyyy/MM/dd under the given path.
+   * 
+   * @param path path to search under
+   * @return map of date to path
+   * @throws IOException
+   */
+  protected SortedMap<Date,DatePath> getDailyData(Path path) throws IOException
+  {
+    SortedMap<Date,DatePath> data = new TreeMap<Date,DatePath>();
+    for (DatePath datePath : PathUtils.findNestedDatedPaths(getFileSystem(),path))
+    {
+      data.put(datePath.getDate(),datePath);
+    }
+    return data;
+  }
+  
+  /**
+   * Get a map from date to path for all paths matching yyyyMMdd under the given path.
+   * 
+   * @param path path to search under
+   * @return map of date to path
+   * @throws IOException
+   */
+  protected SortedMap<Date,DatePath> getDatedData(Path path) throws IOException
+  {
+    SortedMap<Date,DatePath> data = new TreeMap<Date,DatePath>();
+    for (DatePath datePath : PathUtils.findDatedPaths(getFileSystem(),path))
+    {
+      data.put(datePath.getDate(),datePath);
+    }
+    return data;
+  }
+  
+  /**
+   * Determine what input data is available.
+   * 
+   * @throws IOException
+   */
+  protected void loadInputData() throws IOException
+  {
+    _inputPathsByDate = new ArrayList<SortedMap<Date,DatePath>>();
+    for (Path inputPath : getInputPaths())
+    {
+      _log.info(String.format("Searching for available input data in " + inputPath));
+      _inputPathsByDate.add(getDailyData(inputPath));
+    }
+  }
+  
+  /**
+   * Determines what input data is available.
+   */
+  protected void determineAvailableInputDates()
+  {
+    // first find the latest date available for all inputs
+    PriorityQueue<Date> dates = new PriorityQueue<Date>();
+    for (SortedMap<Date,DatePath> pathMap : _inputPathsByDate)
+    {
+      for (Date date : pathMap.keySet())
+      {
+        dates.add(date);
+      }
+    }
+    if (dates.size() == 0)
+    {
+      throw new RuntimeException("No input data!");
+    }
+    List<Date> available = new ArrayList<Date>();
+    Date currentDate = dates.peek();
+    int found = 0;
+    int needed = getInputPaths().size();
+    while (currentDate != null)
+    {
+      Date date = dates.poll();
+      
+      if (date != null && date.equals(currentDate))
+      {
+        found++;
+      }
+      else
+      {
+        if (found == needed)
+        {
+          available.add(currentDate);
+        }
+        else if (available.size() > 0)
+        {
+          _log.info("Did not find all input data for date " + PathUtils.datedPathFormat.format(currentDate));
+          _log.info("Paths found for " + PathUtils.datedPathFormat.format(currentDate) + ":");
+          // collect what's available for this date
+          for (SortedMap<Date,DatePath> pathMap : _inputPathsByDate)
+          {
+            DatePath path = pathMap.get(currentDate);
+            if (path != null)
+            {
+              _log.info("=> " + path);
+            }
+          }
+          
+          if (_failOnMissing)
+          {
+            throw new RuntimeException("Did not find all input data for date " + PathUtils.datedPathFormat.format(currentDate));
+          }
+          else
+          {
+            available.add(currentDate);
+          }
+        }
+        
+        found = 0;          
+        currentDate = date;
+        
+        if (currentDate != null)
+        {
+          found++;
+        }
+      }        
+      
+      if (found > needed)
+      {
+        throw new RuntimeException("found more than needed");
+      }        
+    }
+    
+    _availableInputsByDate.clear();
+    
+    for (Date date : available)
+    {
+      List<DatePath> paths = new ArrayList<DatePath>();
+      for (SortedMap<Date,DatePath> map : _inputPathsByDate)
+      {
+        DatePath path = map.get(date);
+        if (path != null)
+        {
+          paths.add(path);
+        }
+      }
+      _availableInputsByDate.put(date, paths);
+    }
+  }
+  
+  /**
+   * Determine the date range for inputs to process based on the configuration and available inputs.
+   */
+  protected void determineDateRange()
+  {
+    _log.info("Determining range of input data to consume");
+    _range = DateRangePlanner.getDateRange(getStartDate(), getEndDate(), getAvailableInputsByDate().keySet(), getDaysAgo(), getNumDays(), isFailOnMissing());
+    if (_range.getBeginDate() == null || _range.getEndDate() == null)
+    {
+      throw new RuntimeException("Expected start and end date");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/FileCleaner.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/FileCleaner.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/FileCleaner.java
new file mode 100644
index 0000000..7248969
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/FileCleaner.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Used to remove files from the file system when they are no longer needed.
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class FileCleaner
+{
+  private final Logger log = Logger.getLogger(FileCleaner.class);
+  
+  private final Set<Path> garbage = new HashSet<Path>();
+  private final FileSystem fs;
+  
+  public FileCleaner(FileSystem fs)
+  {
+    this.fs = fs;
+  }
+  
+  /**
+   * Add a path to be removed later.
+   * 
+   * @param path
+   * @return added path
+   */
+  public Path add(Path path)
+  {
+    garbage.add(path);
+    return path;
+  }
+  
+  /**
+   * Add a path to be removed later.
+   * 
+   * @param path
+   * @return added path
+   */
+  public String add(String path)
+  {
+    garbage.add(new Path(path));
+    return path;
+  }
+  
+  /**
+   * Removes added paths from the file system.
+   * 
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public void clean() throws IOException
+  {
+    List<Path> sorted = new ArrayList<Path>(garbage);
+    Collections.sort(sorted);
+    for (Path p : sorted)
+    {
+      if (fs.exists(p))
+      {
+        log.info(String.format("Removing %s",p));
+        fs.delete(p, true);
+      }
+    }
+    garbage.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/IncrementalJob.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/IncrementalJob.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/IncrementalJob.java
new file mode 100644
index 0000000..86880b9
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/IncrementalJob.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.util.Properties;
+
+import org.apache.avro.Schema;
+
+import datafu.hourglass.schemas.TaskSchemas;
+
+/**
+ * Base class for incremental jobs.  Incremental jobs consume day-partitioned input data.  
+ * 
+ * <p>
+ * Implementations of this class must provide key, intermediate value, and output value schemas.
+ * The key and intermediate value schemas define the output for the mapper and combiner.
+ * The key and output value schemas define the output for the reducer.
+ * </p>
+ * 
+ * <p>
+ * This class has the same configuration and methods as {@link TimeBasedJob}.
+ * In addition it also recognizes the following properties:
+ * </p>
+ * 
+ * <ul>
+ *   <li><em>max.iterations</em> - maximum number of iterations for the job</li>
+ *   <li><em>max.days.to.process</em> - maximum number of days of input data to process in a single run</li>
+ *   <li><em>fail.on.missing</em> - whether the job should fail if input data within the desired range is missing</li>
+ * </ul>
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public abstract class IncrementalJob extends TimeBasedJob
+{
+  private Integer _maxToProcess;
+  private Integer _maxIterations;
+  private boolean _failOnMissing;
+  private TaskSchemas _schemas;
+  
+  /**
+   * Initializes the job.
+   */
+  public IncrementalJob()
+  {    
+  }
+
+  /**
+   * Initializes the job with a job name and properties.
+   * 
+   * @param name job name
+   * @param props configuration properties
+   */
+  public IncrementalJob(String name, Properties props)
+  {        
+    super(name,props);
+  }
+  
+  public void setProperties(Properties props)
+  {
+    super.setProperties(props);
+        
+    if (getProperties().get("max.iterations") != null)
+    {
+      setMaxIterations(Integer.parseInt((String)getProperties().get("max.iterations")));
+    }
+    
+    if (getProperties().get("max.days.to.process") != null)
+    {
+      setMaxToProcess(Integer.parseInt((String)getProperties().get("max.days.to.process")));
+    }
+    
+    if (getProperties().get("fail.on.missing") != null)
+    {
+      setFailOnMissing(Boolean.parseBoolean((String)getProperties().get("max.days.to.process")));
+    }
+  }
+  
+  protected void initialize()
+  {
+    super.initialize();
+    
+    if (getKeySchema() == null)
+    {
+      throw new RuntimeException("Key schema not specified");
+    }
+
+    if (getIntermediateValueSchema() == null)
+    {
+      throw new RuntimeException("Intermediate schema not specified");
+    }
+
+    if (getOutputValueSchema() == null)
+    {
+      throw new RuntimeException("Output schema not specified");
+    }
+    
+    _schemas = new TaskSchemas.Builder()
+      .setKeySchema(getKeySchema())
+      .setIntermediateValueSchema(getIntermediateValueSchema())
+      .setOutputValueSchema(getOutputValueSchema())
+      .build();
+  }
+  
+  /**
+   * Gets the Avro schema for the key.
+   * <p>
+   * This is also used as the key for the map output.
+   * 
+   * @return key schema.
+   */
+  protected abstract Schema getKeySchema();
+  
+  /**
+   * Gets the Avro schema for the intermediate value.
+   * <p>
+   * This is also used for the value for the map output.
+   * 
+   * @return intermediate value schema
+   */
+  protected abstract Schema getIntermediateValueSchema();
+  
+  /**
+   * Gets the Avro schema for the output data.
+   * 
+   * @return output data schema
+   */
+  protected abstract Schema getOutputValueSchema();
+  
+  /**
+   * Gets the schemas.
+   * 
+   * @return schemas
+   */
+  protected TaskSchemas getSchemas()
+  {
+    return _schemas;
+  }
+  
+  /**
+   * Gets the maximum number of days of input data to process in a single run.
+   * 
+   * @return maximum number of days to process
+   */
+  public Integer getMaxToProcess()
+  {
+    return _maxToProcess;
+  }
+
+  /**
+   * Sets the maximum number of days of input data to process in a single run.
+   * 
+   * @param maxToProcess maximum number of days to process
+   */
+  public void setMaxToProcess(Integer maxToProcess)
+  {
+    _maxToProcess = maxToProcess;
+  }
+
+  /**
+   * Gets the maximum number of iterations for the job.  Multiple iterations will only occur
+   * when there is a maximum set for the number of days to process in a single run.
+   * An error should be thrown if this number will be exceeded.
+   * 
+   * @return maximum number of iterations
+   */
+  public Integer getMaxIterations()
+  {
+    return _maxIterations;
+  }
+
+  /**
+   * Sets the maximum number of iterations for the job.  Multiple iterations will only occur
+   * when there is a maximum set for the number of days to process in a single run.
+   * An error should be thrown if this number will be exceeded.
+   * 
+   * @param maxIterations maximum number of iterations
+   */
+  public void setMaxIterations(Integer maxIterations)
+  {
+    _maxIterations = maxIterations;
+  }
+
+  /**
+   * Gets whether the job should fail if input data within the desired range is missing. 
+   * 
+   * @return true if the job should fail on missing data
+   */
+  public boolean isFailOnMissing()
+  {
+    return _failOnMissing;
+  }
+
+  /**
+   * Sets whether the job should fail if input data within the desired range is missing. 
+   * 
+   * @param failOnMissing true if the job should fail on missing data
+   */
+  public void setFailOnMissing(boolean failOnMissing)
+  {
+    _failOnMissing = failOnMissing;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/MaxInputDataExceededException.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/MaxInputDataExceededException.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/MaxInputDataExceededException.java
new file mode 100644
index 0000000..e2024fb
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/MaxInputDataExceededException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.hourglass.jobs;
+
+public class MaxInputDataExceededException extends Throwable
+{
+  public MaxInputDataExceededException()
+  {    
+  }
+  
+  public MaxInputDataExceededException(String message)
+  {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java
new file mode 100644
index 0000000..34368f7
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java
@@ -0,0 +1,558 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.SortedMap;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import datafu.hourglass.avro.AvroDateRangeMetadata;
+import datafu.hourglass.fs.DatePath;
+import datafu.hourglass.fs.DateRange;
+import datafu.hourglass.fs.PathUtils;
+
+/**
+ * Execution planner used by {@link AbstractPartitionCollapsingIncrementalJob} and its derived classes.
+ * This creates a plan to process partitioned input data and collapse the partitions into a single output.
+ * 
+ * <p>
+ * To use this class, the input and output paths must be specified.  In addition the desired input date
+ * range can be specified through several methods.  Then {@link #createPlan()} can be called and the
+ * execution plan will be created.  The inputs to process will be available from {@link #getInputsToProcess()},
+ * the number of reducers to use will be available from {@link #getNumReducers()}, and the input schemas
+ * will be available from {@link #getInputSchemas()}.
+ * </p>
+ * 
+ * <p>
+ * Previous output may be reused by using {@link #setReusePreviousOutput(boolean)}.  If previous output exists
+ * and it is to be reused then it will be available from {@link #getPreviousOutputToProcess()}.  New input data
+ * to process that is after the previous output time range is available from {@link #getNewInputsToProcess()}.
+ * Old input data to process that is before the previous output time range and should be subtracted from the
+ * previous output is available from {@link #getOldInputsToProcess()}.
+ * </p>
+ * 
+ * <p>
+ * Configuration properties are used to configure a {@link ReduceEstimator} instance.  This is used to 
+ * calculate how many reducers should be used.  
+ * The number of reducers to use is based on the input data size and the 
+ * <em>num.reducers.bytes.per.reducer</em> property.  This setting can be controlled more granularly
+ * through <em>num.reducers.input.bytes.per.reducer</em> and <em>num.reducers.previous.bytes.per.reducer</em>.
+ * Check {@link ReduceEstimator} for more details on how the properties are used.
+ * </p>
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class PartitionCollapsingExecutionPlanner extends ExecutionPlanner
+{
+  private final Logger _log = Logger.getLogger(PartitionCollapsingExecutionPlanner.class);
+
+  private SortedMap<Date,DatePath> _outputPathsByDate;
+  private boolean _reusePreviousOutput;
+  
+  // the chosen execution plan
+  private Plan _plan;
+  
+  /**
+   * An execution plan.  Encapsulates what inputs will be processed.
+   * 
+   * @author mhayes
+   *
+   */
+  private class Plan
+  {
+    private List<DatePath> _inputsToProcess = new ArrayList<DatePath>();
+    private List<DatePath> _newInputsToProcess = new ArrayList<DatePath>();
+    private List<DatePath> _oldInputsToProcess = new ArrayList<DatePath>();
+    private Map<String,String> _latestInputByPath = new HashMap<String,String>();
+    private DatePath _previousOutputToProcess;
+    private List<Schema> _inputSchemas = new ArrayList<Schema>();
+    private Map<String,Schema> _inputSchemasByPath = new HashMap<String,Schema>();
+    private boolean _needAnotherPass;
+    private DateRange _currentDateRange;
+    private int _numReducers;
+    private Long _totalBytes;
+    
+    public void finalizePlan() throws IOException
+    {
+      determineInputSchemas();
+      determineNumReducers();
+      determineTotalBytes();
+    }
+    
+    /**
+     * Determines the number of bytes that will be consumed by this execution plan.
+     * This is used to compare alternative plans so the one with the least bytes
+     * consumed can be used.
+     * 
+     * @throws IOException
+     */
+    private void determineTotalBytes() throws IOException
+    {
+      _totalBytes = 0L;
+      for (DatePath dp : _inputsToProcess)
+      {
+        _totalBytes += PathUtils.countBytes(getFileSystem(), dp.getPath());
+      }
+      if (_previousOutputToProcess != null)
+      {
+        _totalBytes += PathUtils.countBytes(getFileSystem(), _previousOutputToProcess.getPath());
+      }
+      _log.info("Total bytes consumed: " + _totalBytes);
+    }
+    
+    /**
+     * Determines the input schemas.  There may be multiple input schemas because multiple inputs are allowed.
+     * The latest available inputs are used to determine the schema, the assumption being that schemas are
+     * backwards-compatible.
+     * 
+     * @throws IOException
+     */
+    private void determineInputSchemas() throws IOException
+    {
+      if (_latestInputByPath.size() > 0)
+      {
+        _log.info("Determining input schemas");
+        for (Entry<String,String> entry : _latestInputByPath.entrySet())
+        {
+          String root = entry.getKey();
+          String input = entry.getValue();
+          _log.info("Loading schema for " + input);
+          Schema schema = PathUtils.getSchemaFromPath(getFileSystem(),new Path(input));
+          _inputSchemas.add(schema);
+          _inputSchemasByPath.put(root, schema);
+        }
+      }
+    }
+    
+    /**
+     * Determines the number of reducers to use based on the input data size and the previous output,
+     * if it exists and is being reused.
+     * The number of reducers to use is based on the input data size and the 
+     * <em>num.reducers.bytes.per.reducer</em> property.  This setting can be controlled more granularly
+     * through <em>num.reducers.input.bytes.per.reducer</em> and <em>num.reducers.previous.bytes.per.reducer</em>.
+     * See {@link ReduceEstimator} for details on reducer estimation.
+     * 
+     * @throws IOException
+     */
+    private void determineNumReducers() throws IOException
+    {
+      ReduceEstimator estimator = new ReduceEstimator(getFileSystem(),getProps());
+      List<String> inputPaths = new ArrayList<String>();
+      for (DatePath input : _inputsToProcess)
+      {
+        inputPaths.add(input.getPath().toString());
+        estimator.addInputPath("input",input.getPath());
+      }
+      if (_previousOutputToProcess != null)
+      {
+        estimator.addInputPath("previous",_previousOutputToProcess.getPath());
+      }
+      _numReducers = estimator.getNumReducers();
+    }
+  }
+  
+  /**
+   * Initializes the execution planner.
+   * 
+   * @param fs file system
+   * @param props configuration properties
+   */
+  public PartitionCollapsingExecutionPlanner(FileSystem fs, Properties props)
+  {
+    super(fs, props);
+  }
+
+  /**
+   * Create the execution plan.
+   * 
+   * @throws IOException
+   */
+  public void createPlan() throws IOException
+  {
+    if (_plan != null) throw new RuntimeException("Plan already exists");
+    
+    _log.info("Creating execution plan");
+    
+    loadInputData();
+    loadOutputData();    
+    determineAvailableInputDates();
+    determineDateRange();
+    
+    List<Plan> plans = new ArrayList<Plan>();
+    Plan plan;
+    
+    if (_reusePreviousOutput)
+    {
+      _log.info("Output may be reused, will create alternative plan that does not reuse output");
+      plan = new Plan();
+      try
+      {
+        determineInputsToProcess(false,plan);
+        plan.finalizePlan();
+        plans.add(plan);
+      }
+      catch (MaxInputDataExceededException e)
+      {
+        _log.info(e.getMessage());
+      }
+    }
+    
+    _log.info(String.format("Creating plan that %s previous output",(_reusePreviousOutput ? "reuses" : "does not reuse")));
+    plan = new Plan();
+    try
+    {
+      determineInputsToProcess(_reusePreviousOutput,plan);
+    }
+    catch (MaxInputDataExceededException e)
+    {
+      throw new RuntimeException(e);
+    }
+    plan.finalizePlan();
+    plans.add(plan);
+    
+    if (plans.size() > 1)
+    { 
+      _log.info(String.format("There are %d alternative execution plans:",plans.size()));
+      
+      for (Plan option : plans)
+      {
+        _log.info(String.format("* Consume %d new inputs, %d old inputs, %s previous output (%d bytes)",
+                                option._newInputsToProcess.size(),
+                                option._oldInputsToProcess.size(),
+                                option._previousOutputToProcess != null ? "reuse" : "no",
+                                option._totalBytes));
+      }
+      
+      // choose plan with least bytes consumed
+      Collections.sort(plans, new Comparator<Plan>() {
+        @Override
+        public int compare(Plan o1, Plan o2)
+        {
+          return o1._totalBytes.compareTo(o2._totalBytes);
+        }      
+      });
+      _plan = plans.get(0);
+      
+      _log.info(String.format("Choosing plan consuming %d bytes",_plan._totalBytes));
+    }
+    else
+    {
+      _plan = plans.get(0);
+    }
+  } 
+
+  /**
+   * Gets whether previous output should be reused, if it exists.
+   * 
+   * @return true if previous output should be reused
+   */
+  public boolean getReusePreviousOutput()
+  {
+    return _reusePreviousOutput;
+  }
+  
+  /**
+   * Sets whether previous output should be reused, if it exists.
+   * 
+   * @param reuse true if previous output should be reused
+   */
+  public void setReusePreviousOutput(boolean reuse)
+  {
+    _reusePreviousOutput = reuse;
+  }
+  
+  /**
+   * Get the number of reducers to use based on the input and previous output data size.
+   * Must call {@link #createPlan()} first.
+   * 
+   * @return number of reducers to use
+   */
+  public int getNumReducers()
+  {
+    checkPlanExists();
+    return getPlan()._numReducers;
+  }
+  
+  public DateRange getCurrentDateRange()
+  {
+    checkPlanExists();
+    return getPlan()._currentDateRange;
+  }
+  
+  /**
+   * Gets the previous output to reuse, or null if no output is being reused.
+   * Must call {@link #createPlan()} first.
+   * 
+   * @return previous output to reuse, or null
+   */
+  public DatePath getPreviousOutputToProcess()
+  {
+    return getPlan()._previousOutputToProcess;
+  }
+  
+  /**
+   * Gets all inputs that will be processed.  This includes both old and new data.
+   * Must call {@link #createPlan()} first.
+   * 
+   * @return inputs to process
+   */
+  public List<DatePath> getInputsToProcess()
+  {
+    return getPlan()._inputsToProcess;
+  }
+  
+  /**
+   * Gets only the new data that will be processed.  New data is data that falls within the 
+   * desired date range.
+   * Must call {@link #createPlan()} first.
+   * 
+   * @return new inputs to process
+   */
+  public List<DatePath> getNewInputsToProcess()
+  {
+    return getPlan()._newInputsToProcess;
+  }
+  
+  /**
+   * Gets only the old data that will be processed.  Old data is data that falls before the
+   * desired date range.  It will be subtracted out from the previous output.
+   * Must call {@link #createPlan()} first.
+   * 
+   * @return old inputs to process
+   */
+  public List<DatePath> getOldInputsToProcess()
+  {
+    return getPlan()._oldInputsToProcess;
+  }
+  
+  /**
+   * Gets whether another pass will be required.  Because there may be a limit on the number of inputs processed 
+   * in a single run, multiple runs may be required to process all data in the desired date range.  
+   * Must call {@link #createPlan()} first.
+   * 
+   * @return true if another pass is required
+   */
+  public boolean getNeedsAnotherPass()
+  {
+    return getPlan()._needAnotherPass;
+  }
+  
+  /**
+   * Gets the input schemas.  Because multiple inputs are allowed, there may be multiple schemas.
+   * Must call {@link #createPlan()} first.
+   * 
+   * <p>
+   * This does not include the output schema, even though previous output may be fed back as input.
+   * The reason is that the ouput schema it determined based on the input schema.
+   * </p>
+   * 
+   * @return input schemas
+   */
+  public List<Schema> getInputSchemas()
+  {
+    return getPlan()._inputSchemas;
+  }
+  
+  /**
+   * Gets a map from input path to schema.  Because multiple inputs are allowed, there may be multiple schemas.
+   * Must call {@link #createPlan()} first.
+   * 
+   * @return map from path to input schema
+   */
+  public Map<String,Schema> getInputSchemasByPath()
+  {
+    return getPlan()._inputSchemasByPath;
+  }
+  
+  /**
+   * Determines what output data already exists.  Previous output may be reused.
+   * 
+   * @throws IOException
+   */
+  private void loadOutputData() throws IOException
+  {
+    if (getOutputPath() == null)
+    {
+      throw new RuntimeException("No output path specified");
+    }
+    _log.info(String.format("Searching for existing output data in " + getOutputPath()));
+    _outputPathsByDate = getDatedData(getOutputPath());
+    _log.info(String.format("Found %d output paths",_outputPathsByDate.size()));
+  }
+  
+  /**
+   * Determines what input data to process.
+   * 
+   * <p>
+   * The input data to consume is determined by the desired date range.  If previous output is not reused then the input data to process
+   * will coincide with the date range.  If previous output may be reused and previous output exists, then the input data to process 
+   * will consist of new data and potentially old data.  The new input data to process is data that has time after the previous output date range,
+   * so that it may be added to the previous output.
+   * The old data to process is data that has time before the previous output date range, so that it may be subtracted from the previous output.
+   * </p>
+   * 
+   * <p>
+   * If there is a limit on how many days of input data can be processed then it may be the case that not all input data will be processed in
+   * a single run.
+   * </p>
+   * 
+   * @throws IOException
+   * @throws MaxInputDataExceededException 
+   */
+  private void determineInputsToProcess(boolean reusePreviousOutput, Plan plan) throws IOException, MaxInputDataExceededException
+  {
+    Calendar cal = Calendar.getInstance(PathUtils.timeZone);    
+        
+    DateRange outputDateRange = null;
+    
+    if (reusePreviousOutput)
+    {
+      if (_outputPathsByDate.size() > 0)
+      {
+        DatePath latestPriorOutput = _outputPathsByDate.get(Collections.max(_outputPathsByDate.keySet()));
+        _log.info("Have previous output, determining what previous incremental data to difference out");
+        outputDateRange = AvroDateRangeMetadata.getOutputFileDateRange(getFileSystem(),latestPriorOutput.getPath());
+        _log.info(String.format("Previous output has date range %s to %s",
+                  PathUtils.datedPathFormat.format(outputDateRange.getBeginDate()),
+                  PathUtils.datedPathFormat.format(outputDateRange.getEndDate())));
+        
+        for (Date currentDate=outputDateRange.getBeginDate(); 
+             currentDate.compareTo(getDateRange().getBeginDate()) < 0
+             && currentDate.compareTo(outputDateRange.getEndDate()) <= 0;)
+        {
+          if (!getAvailableInputsByDate().containsKey(currentDate))
+          {  
+            throw new RuntimeException(String.format("Missing incremental data for %s, so can't remove it from previous output",PathUtils.datedPathFormat.format(currentDate)));
+          }
+          
+          List<DatePath> inputs = getAvailableInputsByDate().get(currentDate);
+          
+          for (DatePath input : inputs)
+          {
+            _log.info(String.format("Old Input: %s",input.getPath()));
+            plan._inputsToProcess.add(input);
+            plan._oldInputsToProcess.add(input);
+            
+            Path root = PathUtils.getNestedPathRoot(input.getPath());
+            plan._latestInputByPath.put(root.toString(), input.getPath().toString());
+          }
+                                  
+          cal.setTime(currentDate);
+          cal.add(Calendar.DAY_OF_MONTH, 1);
+          currentDate = cal.getTime();
+        }
+          
+        plan._previousOutputToProcess = latestPriorOutput;
+        _log.info("Previous Output: " + plan._previousOutputToProcess.getPath());
+      }
+      else
+      {
+        _log.info("No previous output to reuse");
+      }
+    }
+    
+    // consume the incremental data and produce the final output
+    
+    int newDataCount = 0;
+    Date startDate = getDateRange().getBeginDate();
+    Date endDate = startDate;
+    for (Date currentDate=startDate; currentDate.compareTo(getDateRange().getEndDate()) <= 0; )
+    { 
+      if (getMaxToProcess() != null && newDataCount >= getMaxToProcess())
+      {
+        if (!reusePreviousOutput)
+        {
+          throw new MaxInputDataExceededException(String.format("Amount of input data has exceeded max of %d however output is not being reused so cannot do in multiple passes", getMaxToProcess()));
+        }
+        
+        // too much data to process in a single run, will require another pass
+        plan._needAnotherPass = true;
+        break;
+      }
+      
+      if (outputDateRange == null || currentDate.compareTo(outputDateRange.getEndDate()) > 0)
+      {
+        if (!getAvailableInputsByDate().containsKey(currentDate))
+        {
+          if (isFailOnMissing())
+          {
+            throw new RuntimeException("missing " + PathUtils.datedPathFormat.format(currentDate));            
+          }
+          else
+          {
+            _log.info("No input data found for " + PathUtils.datedPathFormat.format(currentDate));
+          }
+        }
+        else
+        {
+          List<DatePath> inputs = getAvailableInputsByDate().get(currentDate);
+          
+          for (DatePath input : inputs)
+          {
+            _log.info(String.format("New Input: %s",input.getPath()));
+            plan._inputsToProcess.add(input);
+            plan._newInputsToProcess.add(input);
+            
+            Path root = PathUtils.getNestedPathRoot(input.getPath());
+            plan._latestInputByPath.put(root.toString(), input.getPath().toString());
+          }
+                    
+          newDataCount++;
+        }
+      }
+      
+      cal.setTime(currentDate);
+      endDate = cal.getTime();
+      cal.add(Calendar.DAY_OF_MONTH, 1);
+      currentDate = cal.getTime();
+    }
+    
+    plan._currentDateRange = new DateRange(startDate,endDate);
+  } 
+  
+  /**
+   * Throws an exception if the plan hasn't been created.
+   */
+  private void checkPlanExists()
+  {
+    if (_plan == null) throw new RuntimeException("Must call createPlan first");
+  }
+  
+  private Plan getPlan()
+  {
+    checkPlanExists();
+    return _plan;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingIncrementalJob.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingIncrementalJob.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingIncrementalJob.java
new file mode 100644
index 0000000..68e776a
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingIncrementalJob.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+
+import datafu.hourglass.model.Accumulator;
+import datafu.hourglass.model.Mapper;
+import datafu.hourglass.model.Merger;
+
+/**
+ * A concrete version of {@link AbstractPartitionCollapsingIncrementalJob}.
+ * 
+ * This provides an alternative to extending {@link AbstractPartitionCollapsingIncrementalJob}.
+ * Instead of extending this class and implementing the abstract methods, this concrete version
+ * can be used instead.  Getters and setters have been provided for the abstract methods. 
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class PartitionCollapsingIncrementalJob extends AbstractPartitionCollapsingIncrementalJob
+{
+  private Mapper<GenericRecord,GenericRecord,GenericRecord> _mapper;
+  private Accumulator<GenericRecord,GenericRecord> _combiner;
+  private Accumulator<GenericRecord,GenericRecord> _reducer;
+  private Schema _keySchema;
+  private Schema _intermediateValueSchema;
+  private Schema _outputValueSchema;
+  private Merger<GenericRecord> _merger;
+  private Merger<GenericRecord> _oldMerger;
+  private Setup _setup;
+
+  /**
+   * Initializes the job.  The job name is derived from the name of a provided class.
+   * 
+   * @param cls class to base job name on
+   * @throws IOException
+   */
+  public PartitionCollapsingIncrementalJob(@SuppressWarnings("rawtypes") Class cls) throws IOException
+  {
+    setName(cls.getName());
+  }
+
+  @Override
+  public Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper()
+  {
+    return _mapper;
+  }
+
+  @Override
+  public Accumulator<GenericRecord,GenericRecord> getCombinerAccumulator()
+  {
+    return _combiner;
+  }
+  
+  @Override
+  public Accumulator<GenericRecord,GenericRecord> getReducerAccumulator()
+  {
+    return _reducer;
+  }
+
+  @Override
+  protected Schema getKeySchema()
+  {
+    return _keySchema;
+  }
+
+  @Override
+  protected Schema getIntermediateValueSchema()
+  {
+    return _intermediateValueSchema;
+  }
+
+  @Override
+  protected Schema getOutputValueSchema()
+  {
+    return _outputValueSchema;
+  }
+
+  @Override
+  public Merger<GenericRecord> getRecordMerger()
+  {
+    return _merger;
+  }
+
+  @Override
+  public Merger<GenericRecord> getOldRecordMerger()
+  {
+    return _oldMerger;
+  }
+
+  /**
+   * Set the mapper.
+   * 
+   * @param mapper
+   */
+  public void setMapper(Mapper<GenericRecord,GenericRecord,GenericRecord> mapper)
+  {
+    this._mapper = mapper;
+  }
+
+  /**
+   * Set the accumulator for the combiner
+   * 
+   * @param combiner accumulator for the combiner
+   */
+  public void setCombinerAccumulator(Accumulator<GenericRecord,GenericRecord> combiner)
+  {
+    this._combiner = combiner;
+  }
+
+  /**
+   * Set the accumulator for the reducer.
+   * 
+   * @param reducer accumulator for the reducer
+   */
+  public void setReducerAccumulator(Accumulator<GenericRecord,GenericRecord> reducer)
+  {
+    this._reducer = reducer;
+  }
+
+  /**
+   * Sets the Avro schema for the key.
+   * <p>
+   * This is also used as the key for the map output.
+   * 
+   * @param keySchema key schema
+   */
+  public void setKeySchema(Schema keySchema)
+  {
+    this._keySchema = keySchema;
+  }
+  
+  /**
+   * Sets the Avro schema for the intermediate value.
+   * <p>
+   * This is also used for the value for the map output.
+   * 
+   * @param intermediateValueSchema intermediate value schema
+   */
+  public void setIntermediateValueSchema(Schema intermediateValueSchema)
+  {
+    this._intermediateValueSchema = intermediateValueSchema;
+  }
+  
+  /**
+   * Sets the Avro schema for the output data.
+   *  
+   * @param outputValueSchema output value schema
+   */
+  public void setOutputValueSchema(Schema outputValueSchema)
+  {
+    this._outputValueSchema = outputValueSchema;
+  }
+
+  /**
+   * Sets the record merger that is capable of merging previous output with a new partial output.
+   * This is only needed when reusing previous output where the intermediate and output schemas are different.
+   * New partial output is produced by the reducer from new input that is after the previous output.
+   * 
+   * @param merger
+   */
+  public void setMerger(Merger<GenericRecord> merger)
+  {
+    this._merger = merger;
+  }
+
+  /**
+   * Sets the record merger that is capable of unmerging old partial output from the new output.
+   * This is only needed when reusing previous output for a fixed-length sliding window.
+   * The new output is the result of merging the previous output with the new partial output.
+   * The old partial output is produced by the reducer from old input data before the time range of
+   * the previous output. 
+   * 
+   * @param oldMerger merger
+   */
+  public void setOldMerger(Merger<GenericRecord> oldMerger)
+  {
+    this._oldMerger = oldMerger;
+  } 
+  
+  /**
+   * Set callback to provide custom configuration before job begins execution.
+   * 
+   * @param setup object with callback method
+   */
+  public void setOnSetup(Setup setup)
+  {
+    _setup = setup;
+  }
+  
+  @Override
+  public void config(Configuration conf)
+  {    
+    super.config(conf);
+    if (_setup != null)
+    {
+      _setup.setup(conf);
+    }
+  } 
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java
new file mode 100644
index 0000000..6ac55a8
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import datafu.hourglass.fs.DatePath;
+import datafu.hourglass.fs.PathUtils;
+
+/**
+ * Execution planner used by {@link AbstractPartitionPreservingIncrementalJob} and its derived classes.
+ * This creates a plan to process partitioned input data and produce partitioned output data.
+ * 
+ * <p>
+ * To use this class, the input and output paths must be specified.  In addition the desired input date
+ * range can be specified through several methods.  Then {@link #createPlan()} can be called and the
+ * execution plan will be created.  The inputs to process will be available from {@link #getInputsToProcess()},
+ * the number of reducers to use will be available from {@link #getNumReducers()}, and the input schemas
+ * will be available from {@link #getInputSchemas()}.
+ * </p>
+ * 
+ * <p>
+ * Configuration properties are used to configure a {@link ReduceEstimator} instance.  This is used to 
+ * calculate how many reducers should be used.  
+ * The number of reducers to use is based on the input data size and the 
+ * <em>num.reducers.bytes.per.reducer</em> property.
+ * Check {@link ReduceEstimator} for more details on how the properties are used.
+ * </p>
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class PartitionPreservingExecutionPlanner extends ExecutionPlanner
+{
+  private final Logger _log = Logger.getLogger(PartitionPreservingExecutionPlanner.class);
+  
+  private SortedMap<Date,DatePath> _outputPathsByDate;
+  private Map<String,String> _latestInputByPath = new HashMap<String,String>();
+  private List<DatePath> _inputsToProcess = new ArrayList<DatePath>();
+  private List<Schema> _inputSchemas = new ArrayList<Schema>();
+  private Map<String,Schema> _inputSchemasByPath = new HashMap<String,Schema>();
+  private boolean _needAnotherPass;
+  private int _numReducers;
+  private boolean _planExists;
+  
+  /**
+   * Initializes the execution planner.
+   * 
+   * @param fs file system
+   * @param props configuration properties
+   */
+  public PartitionPreservingExecutionPlanner(FileSystem fs, Properties props)
+  {
+    super(fs,props);
+  }
+
+  /**
+   * Create the execution plan.
+   * 
+   * @throws IOException
+   */
+  public void createPlan() throws IOException
+  {
+    if (_planExists) throw new RuntimeException("Plan already exists");
+    _planExists = true;
+    loadInputData();
+    loadOutputData();
+    determineAvailableInputDates();
+    determineDateRange();
+    determineInputsToProcess();
+    determineInputSchemas();
+    determineNumReducers();
+  }
+  
+  /**
+   * Get the number of reducers to use based on the input data size.
+   * Must call {@link #createPlan()} first.
+   * 
+   * @return number of reducers to use
+   */
+  public int getNumReducers()
+  {
+    checkPlanExists();
+    return _numReducers;
+  }
+  
+  /**
+   * Gets the input schemas.  Because multiple inputs are allowed, there may be multiple schemas.
+   * Must call {@link #createPlan()} first.
+   * 
+   * @return input schemas
+   */
+  public List<Schema> getInputSchemas()
+  {
+    checkPlanExists();
+    return _inputSchemas;
+  }
+  
+  /**
+   * Gets a map from input path to schema.  Because multiple inputs are allowed, there may be multiple schemas.
+   * Must call {@link #createPlan()} first.
+   * 
+   * @return map from path to input schema
+   */
+  public Map<String,Schema> getInputSchemasByPath()
+  {
+    checkPlanExists();
+    return _inputSchemasByPath;
+  }
+    
+  /**
+   * Gets whether another pass will be required.  Because there may be a limit on the number of inputs processed 
+   * in a single run, multiple runs may be required to process all data in the desired date range.  
+   * Must call {@link #createPlan()} first.
+   * 
+   * @return true if another pass is required
+   */
+  public boolean getNeedsAnotherPass()
+  {
+    checkPlanExists();
+    return _needAnotherPass;
+  }
+  
+  /**
+   * Gets the inputs which are to be processed.
+   * Must call {@link #createPlan()} first.
+   * 
+   * @return inputs to process
+   */
+  public List<DatePath> getInputsToProcess()
+  {
+    checkPlanExists();
+    return _inputsToProcess;
+  }
+  
+  /**
+   * Gets the input dates which are to be processed.
+   * Must call {@link #createPlan()} first.
+   * 
+   * @return dates to process
+   */
+  public List<Date> getDatesToProcess()
+  {
+    checkPlanExists();
+    Set<Date> dates = new TreeSet<Date>();
+    for (DatePath dp : _inputsToProcess)
+    {
+      dates.add(dp.getDate());
+    }
+    return new ArrayList<Date>(dates);
+  }
+  
+  /**
+   * Determines the number of reducers to use based on the input data size.
+   * The number of reducers to use is based on the input data size and the 
+   * <em>num.reducers.bytes.per.reducer</em> property.  See {@link ReduceEstimator}
+   * for details on reducer estimation.
+   * 
+   * @throws IOException
+   */
+  private void determineNumReducers() throws IOException
+  {
+    ReduceEstimator estimator = new ReduceEstimator(getFileSystem(),getProps());
+    List<String> inputPaths = new ArrayList<String>();
+    for (DatePath input : getInputsToProcess())
+    {
+      inputPaths.add(input.getPath().toString());
+      estimator.addInputPath("input",input.getPath());
+    }
+    _numReducers = estimator.getNumReducers();
+  }
+  
+  /**
+   * Determines the input schemas.  There may be multiple input schemas because multiple inputs are allowed.
+   * The latest available inputs are used to determine the schema, the assumption being that schemas are
+   * backwards-compatible.
+   * 
+   * @throws IOException
+   */
+  private void determineInputSchemas() throws IOException
+  {
+    if (_latestInputByPath.size() > 0)
+    {
+      _log.info("Determining input schemas");
+      for (Entry<String,String> entry : _latestInputByPath.entrySet())
+      {
+        String root = entry.getKey();
+        String input = entry.getValue();
+        _log.info("Loading schema for " + input);
+        Schema schema = PathUtils.getSchemaFromPath(getFileSystem(),new Path(input));
+        _inputSchemas.add(schema);
+        _inputSchemasByPath.put(root, schema);
+      }
+    }
+  }
+  
+  /**
+   * Determines which input data should be processed.  This checks the availability of input data within
+   * the desired date range and also checks whether the output already exists.  Only inputs with no 
+   * corresponding output are processed. 
+   */
+  private void determineInputsToProcess()
+  {
+    _log.info("Determining inputs to process");
+    _latestInputByPath.clear();
+    int newDataCount = 0;
+    Calendar cal = Calendar.getInstance(PathUtils.timeZone);
+    for (Date currentDate=getDateRange().getBeginDate(); currentDate.compareTo(getDateRange().getEndDate()) <= 0; )
+    { 
+      if (!_outputPathsByDate.containsKey(currentDate))
+      {      
+        List<DatePath> inputs = getAvailableInputsByDate().get(currentDate);  
+        if (inputs != null)
+        { 
+          if (getMaxToProcess() != null && newDataCount >= getMaxToProcess())
+          {          
+            // too much data to process in a single run, will require another pass
+            _needAnotherPass = true;
+            break;
+          }
+          
+          for (DatePath input : inputs)
+          {
+            _log.info(String.format("Input: %s",input.getPath()));
+            _inputsToProcess.add(input);
+            
+            Path root = PathUtils.getNestedPathRoot(input.getPath());
+            _latestInputByPath.put(root.toString(), input.getPath().toString());
+          }
+                    
+          newDataCount++;
+        }
+        else if (isFailOnMissing())
+        {
+          throw new RuntimeException("missing input data for " + currentDate);
+        }
+      }
+      
+      cal.setTime(currentDate);
+      cal.add(Calendar.DAY_OF_MONTH, 1);
+      currentDate = cal.getTime();
+    }
+  }
+    
+  /**
+   * Determines what output data already exists.  Inputs will not be consumed if the output already exists.
+   * 
+   * @throws IOException
+   */
+  private void loadOutputData() throws IOException
+  {
+    _log.info(String.format("Checking output data in " + getOutputPath()));
+    _outputPathsByDate = getDailyData(getOutputPath());
+  }
+  
+  /**
+   * Throws an exception if the plan hasn't been created.
+   */
+  private void checkPlanExists()
+  {
+    if (!_planExists) throw new RuntimeException("Must call createPlan first");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionPreservingIncrementalJob.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionPreservingIncrementalJob.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionPreservingIncrementalJob.java
new file mode 100644
index 0000000..99aba47
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionPreservingIncrementalJob.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+
+import datafu.hourglass.model.Accumulator;
+import datafu.hourglass.model.Mapper;
+
+/**
+ * A concrete version of {@link AbstractPartitionPreservingIncrementalJob}.
+ * 
+ * This provides an alternative to extending {@link AbstractPartitionPreservingIncrementalJob}.
+ * Instead of extending this class and implementing the abstract methods, this concrete version
+ * can be used instead.  Getters and setters have been provided for the abstract methods. 
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class PartitionPreservingIncrementalJob extends AbstractPartitionPreservingIncrementalJob
+{
+  private Mapper<GenericRecord,GenericRecord,GenericRecord> _mapper;
+  private Accumulator<GenericRecord,GenericRecord> _combiner;
+  private Accumulator<GenericRecord,GenericRecord> _reducer;
+  private Schema _keySchema;
+  private Schema _intermediateValueSchema;
+  private Schema _outputValueSchema;
+  private Setup _setup;
+  
+  /**
+   * Initializes the job.  The job name is derived from the name of a provided class.
+   * 
+   * @param cls class to base job name on
+   * @throws IOException
+   */
+  public PartitionPreservingIncrementalJob(@SuppressWarnings("rawtypes") Class cls) throws IOException
+  {
+    setName(cls.getName());
+  }
+  
+  @Override
+  public Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper()
+  {
+    return _mapper;
+  }
+
+  @Override
+  public Accumulator<GenericRecord,GenericRecord> getCombinerAccumulator()
+  {
+    return _combiner;
+  }
+  
+  @Override
+  public Accumulator<GenericRecord,GenericRecord> getReducerAccumulator()
+  {
+    return _reducer;
+  }
+
+  @Override
+  protected Schema getKeySchema()
+  {
+    return _keySchema;
+  }
+
+  @Override
+  protected Schema getIntermediateValueSchema()
+  {
+    return _intermediateValueSchema;
+  }
+
+  @Override
+  protected Schema getOutputValueSchema()
+  {
+    return _outputValueSchema;
+  }
+
+  /**
+   * Set the mapper.
+   * 
+   * @param mapper
+   */
+  public void setMapper(Mapper<GenericRecord,GenericRecord,GenericRecord> mapper)
+  {
+    this._mapper = mapper;
+  }
+
+  /**
+   * Set the accumulator for the combiner
+   * 
+   * @param combiner accumulator for the combiner
+   */
+  public void setCombinerAccumulator(Accumulator<GenericRecord,GenericRecord> combiner)
+  {
+    this._combiner = combiner;
+  }
+
+  /**
+   * Set the accumulator for the reducer.
+   * 
+   * @param reducer accumulator for the reducer
+   */
+  public void setReducerAccumulator(Accumulator<GenericRecord,GenericRecord> reducer)
+  {
+    this._reducer = reducer;
+  }
+
+  /**
+   * Sets the Avro schema for the key.
+   * <p>
+   * This is also used as the key for the map output.
+   * 
+   * @param keySchema key schema
+   */
+  public void setKeySchema(Schema keySchema)
+  {
+    this._keySchema = keySchema;
+  }
+
+  /**
+   * Sets the Avro schema for the intermediate value.
+   * <p>
+   * This is also used for the value for the map output.
+   * 
+   * @param intermediateValueSchema intermediate value schema
+   */
+  public void setIntermediateValueSchema(Schema intermediateValueSchema)
+  {
+    this._intermediateValueSchema = intermediateValueSchema;
+  }
+
+  /**
+   * Sets the Avro schema for the output data.
+   *  
+   * @param outputValueSchema output value schema
+   */
+  public void setOutputValueSchema(Schema outputValueSchema)
+  {
+    this._outputValueSchema = outputValueSchema;
+  }
+  
+  /**
+   * Set callback to provide custom configuration before job begins execution.
+   * 
+   * @param setup object with callback method
+   */
+  public void setOnSetup(Setup setup)
+  {
+    _setup = setup;
+  }
+  
+  @Override
+  public void config(Configuration conf)
+  {    
+    super.config(conf);
+    if (_setup != null)
+    {
+      _setup.setup(conf);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/ReduceEstimator.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/ReduceEstimator.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/ReduceEstimator.java
new file mode 100644
index 0000000..6931f7b
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/ReduceEstimator.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import datafu.hourglass.fs.PathUtils;
+
+/**
+ * Estimates the number of reducers needed based on input size.
+ * 
+ * <p>
+ * This sums the size of the inputs and uses bytes-per-reducer
+ * settings to compute the number of reducers.  By default,
+ * the bytes-per-reducer is 256 MB.  This means that if the
+ * total input size is 1 GB, the total number of reducers
+ * computed will be 4.
+ * </p>
+ * 
+ * <p>
+ * The bytes-per-reducer can be configured through properties
+ * provided in the constructor.  The default bytes-per-reducer
+ * can be overriden by setting <em>num.reducers.bytes.per.reducer</em>.
+ * For example, if 536870912 (512 MB) is used for this setting,
+ * then 2 reducers would be used for 1 GB.
+ * </p>
+ * 
+ * <p>
+ * The bytes-per-reducer can also be configured separately for
+ * different types of inputs.  Inputs can be identified by a tag.
+ * For example, if an input is tagged with <em>mydata</em>, then
+ * the reducers for this input data can be configured with
+ * <em>num.reducers.mydata.bytes.per.reducer</em>.
+ * </p>
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public class ReduceEstimator
+{
+  private final Logger _log = Logger.getLogger(ReduceEstimator.class);
+  
+  private final Set<Path> inputPaths = new HashSet<Path>(); 
+  private final Map<Path,String> pathToTag = new HashMap<Path,String>();
+  private final Map<String, Long> tagToBytesPerReducer = new HashMap<String,Long>();
+  private final FileSystem fs;
+  
+  private final static String DEFAULT = "default";
+  private final static Long DEFAULT_BYTES_PER_REDUCER = 256L*1024L*1024L; // 256 MB
+  
+  public ReduceEstimator(FileSystem fs, Properties props)
+  {
+    this.fs = fs;
+    
+    if (props != null)
+    {
+      for (Object o : props.keySet())
+      {
+        String key = (String)o;
+        if (key.startsWith("num.reducers."))
+        {
+          if (key.equals("num.reducers.bytes.per.reducer"))
+          {
+            tagToBytesPerReducer.put(DEFAULT, Long.parseLong(props.getProperty(key)));
+          }
+          else
+          {
+            Pattern p = Pattern.compile("num\\.reducers\\.([a-z]+)\\.bytes\\.per\\.reducer");
+            Matcher m = p.matcher(key);
+            if (m.matches())
+            {
+              String tag = m.group(1);
+              tagToBytesPerReducer.put(tag, Long.parseLong(props.getProperty(key)));
+            }
+            else
+            {
+              throw new RuntimeException("Property not recognized: " + key);
+            }
+          }
+        }
+      }
+    }
+    
+    if (!tagToBytesPerReducer.containsKey(DEFAULT))
+    {
+      long defaultValue = DEFAULT_BYTES_PER_REDUCER;
+      _log.info(String.format("No default bytes per reducer set, using %.2f MB",toMB(defaultValue)));
+      tagToBytesPerReducer.put(DEFAULT, defaultValue);
+    }
+  }
+  
+  public void addInputPath(Path input)
+  {
+    addInputPath(DEFAULT,input);
+  }
+  
+  public void addInputPath(String tag, Path input)
+  {
+    if (!inputPaths.contains(input))
+    {
+      inputPaths.add(input);
+      pathToTag.put(input, tag);
+    }
+    else
+    {
+      throw new RuntimeException("Already added input: " + input);
+    }
+  }
+  
+  public int getNumReducers() throws IOException
+  {
+    Map<String,Long> bytesPerTag = getTagToInputBytes();
+    
+    double numReducers = 0.0;
+    for (String tag : bytesPerTag.keySet())
+    {
+      long bytes = bytesPerTag.get(tag);
+      _log.info(String.format("Found %d bytes (%.2f GB) for inputs tagged with '%s'",bytes,toGB(bytes),tag));
+      Long bytesPerReducer = tagToBytesPerReducer.get(tag);
+      if (bytesPerReducer == null) 
+      {
+        bytesPerReducer = tagToBytesPerReducer.get(DEFAULT);
+        
+        if (bytesPerReducer == null) 
+        {
+          throw new RuntimeException("Could not determine bytes per reducer");
+        }
+        
+        _log.info(String.format("No configured bytes per reducer for '%s', using default value of %.2f MB",tag,toMB(bytesPerReducer)));        
+      }
+      else
+      {
+        _log.info(String.format("Using configured bytes per reducer for '%s' of %.2f MB",tag,toMB(bytesPerReducer)));
+      }
+      
+      double partialNumReducers = bytes/(double)bytesPerReducer;
+      
+      _log.info(String.format("Reducers computed for '%s' is %.2f",tag,partialNumReducers));
+      
+      numReducers += bytes/(double)bytesPerReducer;
+    }
+    
+    int finalNumReducers = Math.max(1, (int)Math.ceil(numReducers));
+    
+    _log.info(String.format("Final computed reducers is: %d",finalNumReducers));
+    
+    return finalNumReducers;
+  }
+  
+  private static double toGB(long bytes)
+  {
+    return bytes/(1024.0*1024.0*1024.0);
+  }
+  
+  private static double toMB(long bytes)
+  {
+    return bytes/(1024.0*1024.0);
+  }
+  
+  /**
+   * Gets the total number of bytes per tag.
+   * 
+   * @return Map from tag to total bytes
+   * @throws IOException
+   */
+  private Map<String,Long> getTagToInputBytes() throws IOException
+  {
+    Map<String,Long> result = new HashMap<String,Long>();
+    for (Path input : inputPaths)
+    {
+      long bytes = PathUtils.countBytes(fs, input);
+      String tag = pathToTag.get(input);
+      if (tag == null) throw new RuntimeException("Could not find tag for input: " + input);
+      Long current = result.get(tag);
+      if (current == null) current = 0L;
+      current += bytes;
+      result.put(tag, current);
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/Setup.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/Setup.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/Setup.java
new file mode 100644
index 0000000..2e26900
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/Setup.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Used as a callback by {@link PartitionCollapsingIncrementalJob} and {@link PartitionPreservingIncrementalJob}
+ * to provide configuration settings for the Hadoop job.
+ * 
+ * @author "Matthew Hayes"
+ *
+ */
+public interface Setup
+{
+  /**
+   * Set custom configuration.
+   * 
+   * @param conf configuration
+   */
+  void setup(Configuration conf);
+}