You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ja...@apache.org on 2014/05/18 21:59:49 UTC

[11/14] DATAFU-44: Migrate Hourglass to Gradle

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java
deleted file mode 100644
index a3fc77d..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeSet;
-import java.util.Map.Entry;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-import datafu.hourglass.fs.DatePath;
-import datafu.hourglass.fs.PathUtils;
-
-/**
- * Execution planner used by {@link AbstractPartitionPreservingIncrementalJob} and its derived classes.
- * This creates a plan to process partitioned input data and produce partitioned output data.
- * 
- * <p>
- * To use this class, the input and output paths must be specified.  In addition the desired input date
- * range can be specified through several methods.  Then {@link #createPlan()} can be called and the
- * execution plan will be created.  The inputs to process will be available from {@link #getInputsToProcess()},
- * the number of reducers to use will be available from {@link #getNumReducers()}, and the input schemas
- * will be available from {@link #getInputSchemas()}.
- * </p>
- * 
- * <p>
- * Configuration properties are used to configure a {@link ReduceEstimator} instance.  This is used to 
- * calculate how many reducers should be used.  
- * The number of reducers to use is based on the input data size and the 
- * <em>num.reducers.bytes.per.reducer</em> property.
- * Check {@link ReduceEstimator} for more details on how the properties are used.
- * </p>
- * 
- * @author "Matthew Hayes"
- *
- */
-public class PartitionPreservingExecutionPlanner extends ExecutionPlanner
-{
-  private final Logger _log = Logger.getLogger(PartitionPreservingExecutionPlanner.class);
-  
-  private SortedMap<Date,DatePath> _outputPathsByDate;
-  private Map<String,String> _latestInputByPath = new HashMap<String,String>();
-  private List<DatePath> _inputsToProcess = new ArrayList<DatePath>();
-  private List<Schema> _inputSchemas = new ArrayList<Schema>();
-  private Map<String,Schema> _inputSchemasByPath = new HashMap<String,Schema>();
-  private boolean _needAnotherPass;
-  private int _numReducers;
-  private boolean _planExists;
-  
-  /**
-   * Initializes the execution planner.
-   * 
-   * @param fs file system
-   * @param props configuration properties
-   */
-  public PartitionPreservingExecutionPlanner(FileSystem fs, Properties props)
-  {
-    super(fs,props);
-  }
-
-  /**
-   * Create the execution plan.
-   * 
-   * @throws IOException
-   */
-  public void createPlan() throws IOException
-  {
-    if (_planExists) throw new RuntimeException("Plan already exists");
-    _planExists = true;
-    loadInputData();
-    loadOutputData();
-    determineAvailableInputDates();
-    determineDateRange();
-    determineInputsToProcess();
-    determineInputSchemas();
-    determineNumReducers();
-  }
-  
-  /**
-   * Get the number of reducers to use based on the input data size.
-   * Must call {@link #createPlan()} first.
-   * 
-   * @return number of reducers to use
-   */
-  public int getNumReducers()
-  {
-    checkPlanExists();
-    return _numReducers;
-  }
-  
-  /**
-   * Gets the input schemas.  Because multiple inputs are allowed, there may be multiple schemas.
-   * Must call {@link #createPlan()} first.
-   * 
-   * @return input schemas
-   */
-  public List<Schema> getInputSchemas()
-  {
-    checkPlanExists();
-    return _inputSchemas;
-  }
-  
-  /**
-   * Gets a map from input path to schema.  Because multiple inputs are allowed, there may be multiple schemas.
-   * Must call {@link #createPlan()} first.
-   * 
-   * @return map from path to input schema
-   */
-  public Map<String,Schema> getInputSchemasByPath()
-  {
-    checkPlanExists();
-    return _inputSchemasByPath;
-  }
-    
-  /**
-   * Gets whether another pass will be required.  Because there may be a limit on the number of inputs processed 
-   * in a single run, multiple runs may be required to process all data in the desired date range.  
-   * Must call {@link #createPlan()} first.
-   * 
-   * @return true if another pass is required
-   */
-  public boolean getNeedsAnotherPass()
-  {
-    checkPlanExists();
-    return _needAnotherPass;
-  }
-  
-  /**
-   * Gets the inputs which are to be processed.
-   * Must call {@link #createPlan()} first.
-   * 
-   * @return inputs to process
-   */
-  public List<DatePath> getInputsToProcess()
-  {
-    checkPlanExists();
-    return _inputsToProcess;
-  }
-  
-  /**
-   * Gets the input dates which are to be processed.
-   * Must call {@link #createPlan()} first.
-   * 
-   * @return dates to process
-   */
-  public List<Date> getDatesToProcess()
-  {
-    checkPlanExists();
-    Set<Date> dates = new TreeSet<Date>();
-    for (DatePath dp : _inputsToProcess)
-    {
-      dates.add(dp.getDate());
-    }
-    return new ArrayList<Date>(dates);
-  }
-  
-  /**
-   * Determines the number of reducers to use based on the input data size.
-   * The number of reducers to use is based on the input data size and the 
-   * <em>num.reducers.bytes.per.reducer</em> property.  See {@link ReduceEstimator}
-   * for details on reducer estimation.
-   * 
-   * @throws IOException
-   */
-  private void determineNumReducers() throws IOException
-  {
-    ReduceEstimator estimator = new ReduceEstimator(getFileSystem(),getProps());
-    List<String> inputPaths = new ArrayList<String>();
-    for (DatePath input : getInputsToProcess())
-    {
-      inputPaths.add(input.getPath().toString());
-      estimator.addInputPath("input",input.getPath());
-    }
-    _numReducers = estimator.getNumReducers();
-  }
-  
-  /**
-   * Determines the input schemas.  There may be multiple input schemas because multiple inputs are allowed.
-   * The latest available inputs are used to determine the schema, the assumption being that schemas are
-   * backwards-compatible.
-   * 
-   * @throws IOException
-   */
-  private void determineInputSchemas() throws IOException
-  {
-    if (_latestInputByPath.size() > 0)
-    {
-      _log.info("Determining input schemas");
-      for (Entry<String,String> entry : _latestInputByPath.entrySet())
-      {
-        String root = entry.getKey();
-        String input = entry.getValue();
-        _log.info("Loading schema for " + input);
-        Schema schema = PathUtils.getSchemaFromPath(getFileSystem(),new Path(input));
-        _inputSchemas.add(schema);
-        _inputSchemasByPath.put(root, schema);
-      }
-    }
-  }
-  
-  /**
-   * Determines which input data should be processed.  This checks the availability of input data within
-   * the desired date range and also checks whether the output already exists.  Only inputs with no 
-   * corresponding output are processed. 
-   */
-  private void determineInputsToProcess()
-  {
-    _log.info("Determining inputs to process");
-    _latestInputByPath.clear();
-    int newDataCount = 0;
-    Calendar cal = Calendar.getInstance(PathUtils.timeZone);
-    for (Date currentDate=getDateRange().getBeginDate(); currentDate.compareTo(getDateRange().getEndDate()) <= 0; )
-    { 
-      if (!_outputPathsByDate.containsKey(currentDate))
-      {      
-        List<DatePath> inputs = getAvailableInputsByDate().get(currentDate);  
-        if (inputs != null)
-        { 
-          if (getMaxToProcess() != null && newDataCount >= getMaxToProcess())
-          {          
-            // too much data to process in a single run, will require another pass
-            _needAnotherPass = true;
-            break;
-          }
-          
-          for (DatePath input : inputs)
-          {
-            _log.info(String.format("Input: %s",input.getPath()));
-            _inputsToProcess.add(input);
-            
-            Path root = PathUtils.getNestedPathRoot(input.getPath());
-            _latestInputByPath.put(root.toString(), input.getPath().toString());
-          }
-                    
-          newDataCount++;
-        }
-        else if (isFailOnMissing())
-        {
-          throw new RuntimeException("missing input data for " + currentDate);
-        }
-      }
-      
-      cal.setTime(currentDate);
-      cal.add(Calendar.DAY_OF_MONTH, 1);
-      currentDate = cal.getTime();
-    }
-  }
-    
-  /**
-   * Determines what output data already exists.  Inputs will not be consumed if the output already exists.
-   * 
-   * @throws IOException
-   */
-  private void loadOutputData() throws IOException
-  {
-    _log.info(String.format("Checking output data in " + getOutputPath()));
-    _outputPathsByDate = getDailyData(getOutputPath());
-  }
-  
-  /**
-   * Throws an exception if the plan hasn't been created.
-   */
-  private void checkPlanExists()
-  {
-    if (!_planExists) throw new RuntimeException("Must call createPlan first");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionPreservingIncrementalJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionPreservingIncrementalJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionPreservingIncrementalJob.java
deleted file mode 100644
index 488fe7a..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionPreservingIncrementalJob.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.model.Mapper;
-
-/**
- * A concrete version of {@link AbstractPartitionPreservingIncrementalJob}.
- * 
- * This provides an alternative to extending {@link AbstractPartitionPreservingIncrementalJob}.
- * Instead of extending this class and implementing the abstract methods, this concrete version
- * can be used instead.  Getters and setters have been provided for the abstract methods. 
- * 
- * @author "Matthew Hayes"
- *
- */
-public class PartitionPreservingIncrementalJob extends AbstractPartitionPreservingIncrementalJob
-{
-  private Mapper<GenericRecord,GenericRecord,GenericRecord> _mapper;
-  private Accumulator<GenericRecord,GenericRecord> _combiner;
-  private Accumulator<GenericRecord,GenericRecord> _reducer;
-  private Schema _keySchema;
-  private Schema _intermediateValueSchema;
-  private Schema _outputValueSchema;
-  private Setup _setup;
-  
-  /**
-   * Initializes the job.  The job name is derived from the name of a provided class.
-   * 
-   * @param cls class to base job name on
-   * @throws IOException
-   */
-  public PartitionPreservingIncrementalJob(@SuppressWarnings("rawtypes") Class cls) throws IOException
-  {
-    setName(cls.getName());
-  }
-  
-  @Override
-  public Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper()
-  {
-    return _mapper;
-  }
-
-  @Override
-  public Accumulator<GenericRecord,GenericRecord> getCombinerAccumulator()
-  {
-    return _combiner;
-  }
-  
-  @Override
-  public Accumulator<GenericRecord,GenericRecord> getReducerAccumulator()
-  {
-    return _reducer;
-  }
-
-  @Override
-  protected Schema getKeySchema()
-  {
-    return _keySchema;
-  }
-
-  @Override
-  protected Schema getIntermediateValueSchema()
-  {
-    return _intermediateValueSchema;
-  }
-
-  @Override
-  protected Schema getOutputValueSchema()
-  {
-    return _outputValueSchema;
-  }
-
-  /**
-   * Set the mapper.
-   * 
-   * @param mapper
-   */
-  public void setMapper(Mapper<GenericRecord,GenericRecord,GenericRecord> mapper)
-  {
-    this._mapper = mapper;
-  }
-
-  /**
-   * Set the accumulator for the combiner
-   * 
-   * @param combiner accumulator for the combiner
-   */
-  public void setCombinerAccumulator(Accumulator<GenericRecord,GenericRecord> combiner)
-  {
-    this._combiner = combiner;
-  }
-
-  /**
-   * Set the accumulator for the reducer.
-   * 
-   * @param reducer accumulator for the reducer
-   */
-  public void setReducerAccumulator(Accumulator<GenericRecord,GenericRecord> reducer)
-  {
-    this._reducer = reducer;
-  }
-
-  /**
-   * Sets the Avro schema for the key.
-   * <p>
-   * This is also used as the key for the map output.
-   * 
-   * @param keySchema key schema
-   */
-  public void setKeySchema(Schema keySchema)
-  {
-    this._keySchema = keySchema;
-  }
-
-  /**
-   * Sets the Avro schema for the intermediate value.
-   * <p>
-   * This is also used for the value for the map output.
-   * 
-   * @param intermediateValueSchema intermediate value schema
-   */
-  public void setIntermediateValueSchema(Schema intermediateValueSchema)
-  {
-    this._intermediateValueSchema = intermediateValueSchema;
-  }
-
-  /**
-   * Sets the Avro schema for the output data.
-   *  
-   * @param outputValueSchema output value schema
-   */
-  public void setOutputValueSchema(Schema outputValueSchema)
-  {
-    this._outputValueSchema = outputValueSchema;
-  }
-  
-  /**
-   * Set callback to provide custom configuration before job begins execution.
-   * 
-   * @param setup object with callback method
-   */
-  public void setOnSetup(Setup setup)
-  {
-    _setup = setup;
-  }
-  
-  @Override
-  public void config(Configuration conf)
-  {    
-    super.config(conf);
-    if (_setup != null)
-    {
-      _setup.setup(conf);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/ReduceEstimator.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/ReduceEstimator.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/ReduceEstimator.java
deleted file mode 100644
index 9f2f068..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/ReduceEstimator.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-import datafu.hourglass.fs.PathUtils;
-
-/**
- * Estimates the number of reducers needed based on input size.
- * 
- * <p>
- * This sums the size of the inputs and uses bytes-per-reducer
- * settings to compute the number of reducers.  By default,
- * the bytes-per-reducer is 256 MB.  This means that if the
- * total input size is 1 GB, the total number of reducers
- * computed will be 4.
- * </p>
- * 
- * <p>
- * The bytes-per-reducer can be configured through properties
- * provided in the constructor.  The default bytes-per-reducer
- * can be overriden by setting <em>num.reducers.bytes.per.reducer</em>.
- * For example, if 536870912 (512 MB) is used for this setting,
- * then 2 reducers would be used for 1 GB.
- * </p>
- * 
- * <p>
- * The bytes-per-reducer can also be configured separately for
- * different types of inputs.  Inputs can be identified by a tag.
- * For example, if an input is tagged with <em>mydata</em>, then
- * the reducers for this input data can be configured with
- * <em>num.reducers.mydata.bytes.per.reducer</em>.
- * </p>
- * 
- * @author "Matthew Hayes"
- *
- */
-public class ReduceEstimator
-{
-  private final Logger _log = Logger.getLogger(ReduceEstimator.class);
-  
-  private final Set<Path> inputPaths = new HashSet<Path>(); 
-  private final Map<Path,String> pathToTag = new HashMap<Path,String>();
-  private final Map<String, Long> tagToBytesPerReducer = new HashMap<String,Long>();
-  private final FileSystem fs;
-  
-  private final static String DEFAULT = "default";
-  private final static Long DEFAULT_BYTES_PER_REDUCER = 256L*1024L*1024L; // 256 MB
-  
-  public ReduceEstimator(FileSystem fs, Properties props)
-  {
-    this.fs = fs;
-    
-    if (props != null)
-    {
-      for (Object o : props.keySet())
-      {
-        String key = (String)o;
-        if (key.startsWith("num.reducers."))
-        {
-          if (key.equals("num.reducers.bytes.per.reducer"))
-          {
-            tagToBytesPerReducer.put(DEFAULT, Long.parseLong(props.getProperty(key)));
-          }
-          else
-          {
-            Pattern p = Pattern.compile("num\\.reducers\\.([a-z]+)\\.bytes\\.per\\.reducer");
-            Matcher m = p.matcher(key);
-            if (m.matches())
-            {
-              String tag = m.group(1);
-              tagToBytesPerReducer.put(tag, Long.parseLong(props.getProperty(key)));
-            }
-            else
-            {
-              throw new RuntimeException("Property not recognized: " + key);
-            }
-          }
-        }
-      }
-    }
-    
-    if (!tagToBytesPerReducer.containsKey(DEFAULT))
-    {
-      long defaultValue = DEFAULT_BYTES_PER_REDUCER;
-      _log.info(String.format("No default bytes per reducer set, using %.2f MB",toMB(defaultValue)));
-      tagToBytesPerReducer.put(DEFAULT, defaultValue);
-    }
-  }
-  
-  public void addInputPath(Path input)
-  {
-    addInputPath(DEFAULT,input);
-  }
-  
-  public void addInputPath(String tag, Path input)
-  {
-    if (!inputPaths.contains(input))
-    {
-      inputPaths.add(input);
-      pathToTag.put(input, tag);
-    }
-    else
-    {
-      throw new RuntimeException("Already added input: " + input);
-    }
-  }
-  
-  public int getNumReducers() throws IOException
-  {
-    Map<String,Long> bytesPerTag = getTagToInputBytes();
-    
-    double numReducers = 0.0;
-    for (String tag : bytesPerTag.keySet())
-    {
-      long bytes = bytesPerTag.get(tag);
-      _log.info(String.format("Found %d bytes (%.2f GB) for inputs tagged with '%s'",bytes,toGB(bytes),tag));
-      Long bytesPerReducer = tagToBytesPerReducer.get(tag);
-      if (bytesPerReducer == null) 
-      {
-        bytesPerReducer = tagToBytesPerReducer.get(DEFAULT);
-        
-        if (bytesPerReducer == null) 
-        {
-          throw new RuntimeException("Could not determine bytes per reducer");
-        }
-        
-        _log.info(String.format("No configured bytes per reducer for '%s', using default value of %.2f MB",tag,toMB(bytesPerReducer)));        
-      }
-      else
-      {
-        _log.info(String.format("Using configured bytes per reducer for '%s' of %.2f MB",tag,toMB(bytesPerReducer)));
-      }
-      
-      double partialNumReducers = bytes/(double)bytesPerReducer;
-      
-      _log.info(String.format("Reducers computed for '%s' is %.2f",tag,partialNumReducers));
-      
-      numReducers += bytes/(double)bytesPerReducer;
-    }
-    
-    int finalNumReducers = Math.max(1, (int)Math.ceil(numReducers));
-    
-    _log.info(String.format("Final computed reducers is: %d",finalNumReducers));
-    
-    return finalNumReducers;
-  }
-  
-  private static double toGB(long bytes)
-  {
-    return bytes/(1024.0*1024.0*1024.0);
-  }
-  
-  private static double toMB(long bytes)
-  {
-    return bytes/(1024.0*1024.0);
-  }
-  
-  /**
-   * Gets the total number of bytes per tag.
-   * 
-   * @return Map from tag to total bytes
-   * @throws IOException
-   */
-  private Map<String,Long> getTagToInputBytes() throws IOException
-  {
-    Map<String,Long> result = new HashMap<String,Long>();
-    for (Path input : inputPaths)
-    {
-      long bytes = PathUtils.countBytes(fs, input);
-      String tag = pathToTag.get(input);
-      if (tag == null) throw new RuntimeException("Could not find tag for input: " + input);
-      Long current = result.get(tag);
-      if (current == null) current = 0L;
-      current += bytes;
-      result.put(tag, current);
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/Setup.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/Setup.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/Setup.java
deleted file mode 100644
index 96ee07d..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/Setup.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Used as a callback by {@link PartitionCollapsingIncrementalJob} and {@link PartitionPreservingIncrementalJob}
- * to provide configuration settings for the Hadoop job.
- * 
- * @author "Matthew Hayes"
- *
- */
-public interface Setup
-{
-  /**
-   * Set custom configuration.
-   * 
-   * @param conf configuration
-   */
-  void setup(Configuration conf);
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/StagedOutputJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/StagedOutputJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/StagedOutputJob.java
deleted file mode 100644
index 0445322..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/StagedOutputJob.java
+++ /dev/null
@@ -1,662 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.TaskCompletionEvent;
-import org.apache.hadoop.mapred.TaskReport;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.log4j.Logger;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.permission.FsPermission;
-
-/**
- * A derivation of {@link Job} that stages its output in another location and only
- * moves it to the final destination if the job completes successfully.
- * It also outputs a counters file to the file system that contains counters fetched from Hadoop
- * and other task statistics.
- */
-public class StagedOutputJob extends Job implements Callable<Boolean>
-{
-  private final String _stagingPrefix;
-  private final Logger _log;
-  private Path _countersPath;
-  private Path _countersParentPath;
-  private boolean _writeCounters = false;
-  
-  /**
-   * Creates a job which using a temporary staging location for the output data.
-   * The data is only copied to the final output directory on successful completion
-   * of the job.  This prevents existing output data from being overwritten unless
-   * the job completes successfully.
-   * 
-   * @param conf configuration
-   * @param jobName job name
-   * @param inputPaths input paths
-   * @param stagingLocation where to stage output temporarily
-   * @param outputPath output path
-   * @param log logger 
-   * @return job
-   */
-  public static StagedOutputJob createStagedJob(
-    Configuration conf,
-    String jobName,
-    List<String> inputPaths,
-    String stagingLocation,
-    String outputPath,
-    final Logger log)
-  {
-    final StagedOutputJob retVal;
-    try 
-    {
-      retVal = new StagedOutputJob(conf, stagingLocation, log);
-      retVal.setJobName(jobName);
-      retVal.setJarByClass(getCallersClass());
-      FileInputFormat.setInputPathFilter(retVal, HiddenFilePathFilter.class);
-    }
-    catch (IOException e) 
-    {
-      log.error("IOException when making a job", e);
-      throw new RuntimeException(e);
-    }
-
-    if (inputPaths != null)
-    {
-      try 
-      {
-        FileInputFormat.setInputPaths(
-          retVal,
-          StringUtils.join(inputPaths.iterator(),",")
-        );
-      }
-      catch (IOException e) 
-      {
-          log.error("Unable to set up input paths.", e);
-          throw new RuntimeException(e);
-      }
-    }
-            
-    FileOutputFormat.setOutputPath(retVal, new Path(outputPath));
-
-    return retVal;
-  }
-
-  /**
-   * Initializes the job.
-   * 
-   * @param conf configuration
-   * @param stagingPrefix where to stage output temporarily
-   * @param log logger
-   * @throws IOException
-   */
-  public StagedOutputJob(Configuration conf, String stagingPrefix, Logger log) throws IOException
-  {
-    super(conf);
-    this._stagingPrefix = stagingPrefix;
-    this._log = log;
-  }
-  
-  /**
-   * Gets path to store the counters.  If this is not set then by default the counters will be
-   * stored in the output directory.
-   * 
-   * @return path parent path for counters
-   */
-  public Path getCountersParentPath()
-  {
-    return _countersParentPath;
-  }
-   
-  /**
-   * Sets path to store the counters.  If this is not set then by default the counters will be
-   * stored in the output directory.
-   * 
-   * @param path parent path for counters
-   */
-  public void setCountersParentPath(Path path)
-  {
-    _countersParentPath = path;
-  }
-  
-  /**
-   * Path to written counters.
-   * 
-   * @return counters path
-   */
-  public Path getCountersPath()
-  {
-    return _countersPath;
-  }
-  
-  /**
-   * Get whether counters should be written.
-   * 
-   * @return true if counters should be written
-   */
-  public boolean getWriteCounters()
-  {
-    return _writeCounters;
-  }
-  
-  /**
-   * Sets whether counters should be written.
-   * 
-   * @param writeCounters true if counters should be written
-   */
-  public void setWriteCounters(boolean writeCounters)
-  {
-    this._writeCounters = writeCounters;
-  }
-
-  /**
-   * Run the job.
-   */
-  @Override
-  public Boolean call() throws Exception
-  {
-    try
-    {
-      boolean success = false;    
-      success = waitForCompletion(false);      
-      String jobId = "?";
-      
-      if (getJobID() != null)
-      {
-        jobId = String.format("job_%s_%d",getJobID().getJtIdentifier(), getJobID().getId());
-      }
-      
-      if (success)
-      {
-        _log.info(String.format("Job %s with ID %s succeeded! Tracking URL: %s", getJobName(), jobId, this.getTrackingURL()));
-      }
-      else
-      {
-        _log.error(String.format("Job %s with ID %s failed! Tracking URL: %s", getJobName(), jobId, this.getTrackingURL()));
-      }
-      
-      return success;
-    }
-    catch (Exception e)
-    {
-      _log.error("Exception: " + e.toString());
-      throw new Exception(e);
-    }
-  }
-    
-  /**
-   * Run the job and wait for it to complete.  Output will be temporarily stored under the staging path.
-   * If the job is successful it will be moved to the final location.
-   */
-  @Override
-  public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException
-  {
-    final Path actualOutputPath = FileOutputFormat.getOutputPath(this);
-    final Path stagedPath = new Path(String.format("%s/%s/staged", _stagingPrefix, System.currentTimeMillis()));
-
-    FileOutputFormat.setOutputPath(
-      this,
-      stagedPath
-    );
-
-    final Thread hook = new Thread(new Runnable()
-    {
-      @Override
-      public void run()
-      {
-        try 
-        {
-          killJob();
-        }
-        catch (IOException e) 
-        {
-          e.printStackTrace();
-        }
-      }
-    });
-
-    Runtime.getRuntime().addShutdownHook(hook);
-    
-    final boolean retVal = super.waitForCompletion(verbose);
-    Runtime.getRuntime().removeShutdownHook(hook);
-
-    if (retVal) 
-    {
-      FileSystem fs = actualOutputPath.getFileSystem(getConfiguration());
-
-      fs.mkdirs(actualOutputPath);
-
-      _log.info(String.format("Deleting data at old path[%s]", actualOutputPath));
-      fs.delete(actualOutputPath, true);
-
-      _log.info(String.format("Moving from staged path[%s] to final resting place[%s]", stagedPath, actualOutputPath));
-      boolean renamed = fs.rename(stagedPath, actualOutputPath);
-      
-      if (renamed && _writeCounters)
-      {
-        writeCounters(fs);              
-      }
-      
-      return renamed;
-    }
-    else 
-    {
-      FileSystem fs = actualOutputPath.getFileSystem(getConfiguration());
-      _log.info(String.format("Job failed, deleting staged path[%s]", stagedPath));
-      try
-      {
-        fs.delete(stagedPath, true);
-      }
-      catch (IOException e)
-      {            
-      }
-    }
-
-    _log.warn("retVal was false for some reason...");
-    return retVal;
-  }
-    
-  /**
-   * Gets the class for the caller.
-   * 
-   * @return caller class
-   */
-  private static Class<?> getCallersClass()
-  {
-    StackTraceElement[] stack = Thread.currentThread().getStackTrace();
-    boolean foundSelf = false;
-    for (StackTraceElement element : stack) 
-    {
-      if (foundSelf &&
-          !StagedOutputJob.class.getName().equals(element.getClassName())) 
-      {
-        try 
-        {
-          return Class.forName(element.getClassName());
-        }
-        catch (ClassNotFoundException e) 
-        {
-          throw new RuntimeException(e);
-        }
-      }
-      else if (StagedOutputJob.class.getName().equals(element.getClassName()) 
-               && "getCallersClass".equals(element.getMethodName())) 
-      {
-        foundSelf = true;
-      }
-    }
-    return StagedOutputJob.class;
-  }
-    
-  /**
-   * Writes Hadoop counters and other task statistics to a file in the file system.
-   * 
-   * @param fs
-   * @throws IOException
-   */
-  private void writeCounters(final FileSystem fs) throws IOException
-  {
-    final Path actualOutputPath = FileOutputFormat.getOutputPath(this);
-    
-    SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyyMMddHHmmss");
-    
-    String suffix = timestampFormat.format(new Date());
-    
-    if (_countersParentPath != null)
-    {
-      if (!fs.exists(_countersParentPath))
-      {
-        _log.info("Creating counter parent path " + _countersParentPath);
-        fs.mkdirs(_countersParentPath, FsPermission.valueOf("-rwxrwxr-x"));
-      }
-      // make the name as unique as possible in this case because this may be a directory
-      // where other counter files will be dropped
-      _countersPath = new Path(_countersParentPath,".counters." + suffix);
-    }
-    else
-    {
-      _countersPath = new Path(actualOutputPath,".counters." + suffix);
-    }
-    
-    _log.info(String.format("Writing counters to %s", _countersPath));
-    FSDataOutputStream counterStream = fs.create(_countersPath);
-    BufferedOutputStream buffer = new BufferedOutputStream(counterStream,256*1024);
-    OutputStreamWriter writer = new OutputStreamWriter(buffer);      
-    for (String groupName : getCounters().getGroupNames())
-    {
-      for (Counter counter : getCounters().getGroup(groupName))
-      {
-        writeAndLog(writer,String.format("%s=%d",counter.getName(),counter.getValue()));
-      }
-    }
-          
-    JobID jobID = this.getJobID();
-    
-    org.apache.hadoop.mapred.JobID oldJobId = new org.apache.hadoop.mapred.JobID(jobID.getJtIdentifier(),jobID.getId());
-    
-    long minStart = Long.MAX_VALUE;      
-    long maxFinish = 0;
-    long setupStart = Long.MAX_VALUE;
-    long cleanupFinish = 0;
-    DescriptiveStatistics mapStats = new DescriptiveStatistics();
-    DescriptiveStatistics reduceStats = new DescriptiveStatistics();
-    boolean success = true;
-    
-    JobClient jobClient = new JobClient(this.conf);
-    
-    Map<String,String> taskIdToType = new HashMap<String,String>();
-               
-    TaskReport[] setupReports = jobClient.getSetupTaskReports(oldJobId);
-    if (setupReports.length > 0)
-    {
-      _log.info("Processing setup reports");
-      for (TaskReport report : jobClient.getSetupTaskReports(oldJobId))
-      {
-        taskIdToType.put(report.getTaskID().toString(),"SETUP");
-        if (report.getStartTime() == 0)
-        {
-          _log.warn("Skipping report with zero start time");
-          continue;
-        }
-        setupStart = Math.min(setupStart, report.getStartTime());
-      }
-    }
-    else
-    {
-      _log.error("No setup reports");
-    }
-    
-    TaskReport[] mapReports = jobClient.getMapTaskReports(oldJobId);
-    if (mapReports.length > 0)
-    {
-      _log.info("Processing map reports");
-      for (TaskReport report : mapReports)
-      {
-        taskIdToType.put(report.getTaskID().toString(),"MAP");
-        if (report.getFinishTime() == 0 || report.getStartTime() == 0)
-        {
-          _log.warn("Skipping report with zero start or finish time");
-          continue;
-        }
-        minStart = Math.min(minStart, report.getStartTime());
-        mapStats.addValue(report.getFinishTime() - report.getStartTime());
-      }
-    }
-    else
-    {
-      _log.error("No map reports");
-    }
-    
-    TaskReport[] reduceReports = jobClient.getReduceTaskReports(oldJobId);
-    if (reduceReports.length > 0)
-    {
-      _log.info("Processing reduce reports");
-      for (TaskReport report : reduceReports)
-      {      
-        taskIdToType.put(report.getTaskID().toString(),"REDUCE");
-        if (report.getFinishTime() == 0 || report.getStartTime() == 0)
-        {
-          _log.warn("Skipping report with zero start or finish time");
-          continue;
-        }
-        maxFinish = Math.max(maxFinish, report.getFinishTime());
-        reduceStats.addValue(report.getFinishTime() - report.getStartTime());
-      }
-    }
-    else
-    {
-      _log.error("No reduce reports");
-    }
-    
-    TaskReport[] cleanupReports = jobClient.getCleanupTaskReports(oldJobId);
-    if (cleanupReports.length > 0)
-    {
-      _log.info("Processing cleanup reports");
-      for (TaskReport report : cleanupReports)
-      {
-        taskIdToType.put(report.getTaskID().toString(),"CLEANUP");
-        if (report.getFinishTime() == 0)
-        {
-          _log.warn("Skipping report with finish time of zero");
-          continue;
-        }
-        cleanupFinish = Math.max(cleanupFinish, report.getFinishTime());
-      }
-    }
-    else
-    {
-      _log.error("No cleanup reports");
-    }
-      
-    if (minStart == Long.MAX_VALUE)
-    {
-      _log.error("Could not determine map-reduce start time");
-      success = false;
-    }      
-    if (maxFinish == 0)
-    {
-      _log.error("Could not determine map-reduce finish time");
-      success = false;
-    }
-    
-    if (setupStart == Long.MAX_VALUE)
-    {
-      _log.error("Could not determine setup start time");
-      success = false;
-    }      
-    if (cleanupFinish == 0)
-    {
-      _log.error("Could not determine cleanup finish time");
-      success = false;
-    }     
-    
-    // Collect statistics on successful/failed/killed task attempts, categorized by setup/map/reduce/cleanup.
-    // Unfortunately the job client doesn't have an easier way to get these statistics.
-    Map<String,Integer> attemptStats = new HashMap<String,Integer>();
-    _log.info("Processing task attempts");            
-    for (TaskCompletionEvent event : getTaskCompletionEvents(jobClient,oldJobId))
-    {
-      String type = taskIdToType.get(event.getTaskAttemptId().getTaskID().toString());
-      String status = event.getTaskStatus().toString();
-      
-      String key = String.format("%s_%s_ATTEMPTS",status,type);
-      if (!attemptStats.containsKey(key))
-      {
-        attemptStats.put(key, 0);
-      }
-      attemptStats.put(key, attemptStats.get(key) + 1);
-    }
-              
-    if (success)
-    {
-      writeAndLog(writer,String.format("SETUP_START_TIME_MS=%d",setupStart));
-      writeAndLog(writer,String.format("CLEANUP_FINISH_TIME_MS=%d",cleanupFinish));
-      writeAndLog(writer,String.format("COMPLETE_WALL_CLOCK_TIME_MS=%d",cleanupFinish - setupStart));
-      
-      writeAndLog(writer,String.format("MAP_REDUCE_START_TIME_MS=%d",minStart));
-      writeAndLog(writer,String.format("MAP_REDUCE_FINISH_TIME_MS=%d",maxFinish));
-      writeAndLog(writer,String.format("MAP_REDUCE_WALL_CLOCK_TIME_MS=%d",maxFinish - minStart));
-      
-      writeAndLog(writer,String.format("MAP_TOTAL_TASKS=%d",(long)mapStats.getN()));
-      writeAndLog(writer,String.format("MAP_MAX_TIME_MS=%d",(long)mapStats.getMax()));
-      writeAndLog(writer,String.format("MAP_MIN_TIME_MS=%d",(long)mapStats.getMin()));
-      writeAndLog(writer,String.format("MAP_AVG_TIME_MS=%d",(long)mapStats.getMean()));
-      writeAndLog(writer,String.format("MAP_STD_TIME_MS=%d",(long)mapStats.getStandardDeviation()));
-      writeAndLog(writer,String.format("MAP_SUM_TIME_MS=%d",(long)mapStats.getSum()));
-      
-      writeAndLog(writer,String.format("REDUCE_TOTAL_TASKS=%d",(long)reduceStats.getN()));
-      writeAndLog(writer,String.format("REDUCE_MAX_TIME_MS=%d",(long)reduceStats.getMax()));
-      writeAndLog(writer,String.format("REDUCE_MIN_TIME_MS=%d",(long)reduceStats.getMin()));
-      writeAndLog(writer,String.format("REDUCE_AVG_TIME_MS=%d",(long)reduceStats.getMean()));
-      writeAndLog(writer,String.format("REDUCE_STD_TIME_MS=%d",(long)reduceStats.getStandardDeviation()));
-      writeAndLog(writer,String.format("REDUCE_SUM_TIME_MS=%d",(long)reduceStats.getSum()));
-      
-      writeAndLog(writer,String.format("MAP_REDUCE_SUM_TIME_MS=%d",(long)mapStats.getSum() + (long)reduceStats.getSum()));
-      
-      for (Map.Entry<String, Integer> attemptStat : attemptStats.entrySet())
-      {
-        writeAndLog(writer,String.format("%s=%d",attemptStat.getKey(),attemptStat.getValue()));
-      }
-    }
-    
-    writer.close();
-    buffer.close();
-    counterStream.close();
-  }
-    
-  /**
-   * Get all task completion events for a particular job.
-   * 
-   * @param jobClient job client
-   * @param jobId job ID
-   * @return task completion events
-   * @throws IOException
-   */
-  private List<TaskCompletionEvent> getTaskCompletionEvents(JobClient jobClient, org.apache.hadoop.mapred.JobID jobId) throws IOException
-  {
-    List<TaskCompletionEvent> events = new ArrayList<TaskCompletionEvent>();
-    
-    // Tries to use reflection to get access to the getTaskCompletionEvents method from the private jobSubmitClient field.
-    // This method has a parameter for the size, which defaults to 10 for the top level methods and can therefore be extremely slow
-    // if the goal is to get all events.
-    
-    Method getTaskCompletionEventsMethod = null;
-    Object jobSubmitClient = null;
-    
-    try
-    {
-      Field f = JobClient.class.getDeclaredField("jobSubmitClient");
-      f.setAccessible(true);
-      jobSubmitClient = f.get(jobClient);       
-      
-      if (jobSubmitClient != null)
-      { 
-        getTaskCompletionEventsMethod = jobSubmitClient.getClass().getDeclaredMethod("getTaskCompletionEvents", org.apache.hadoop.mapred.JobID.class,int.class,int.class);
-        getTaskCompletionEventsMethod.setAccessible(true);
-      }
-    }
-    catch (NoSuchMethodException e)
-    {
-    }
-    catch (SecurityException e)
-    {
-    }
-    catch (NoSuchFieldException e)
-    {
-    }
-    catch (IllegalArgumentException e)
-    {
-    }
-    catch (IllegalAccessException e)
-    {       
-    }
-    
-    if (getTaskCompletionEventsMethod != null)
-    {
-      _log.info("Will call getTaskCompletionEvents via reflection since it's faster");
-    }
-    else
-    {
-      _log.info("Will call getTaskCompletionEvents via the slow method");
-    }
-    
-    int index = 0;
-    while(true)
-    {
-      TaskCompletionEvent[] currentEvents;
-      if (getTaskCompletionEventsMethod != null)
-      {
-        try
-        {
-          // grab events, 250 at a time, which is faster than the other method which defaults to 10 at a time (with no override ability)
-          currentEvents = (TaskCompletionEvent[])getTaskCompletionEventsMethod.invoke(jobSubmitClient, jobId, index, 250);
-        }
-        catch (IllegalArgumentException e)
-        {
-          _log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
-          getTaskCompletionEventsMethod = null;
-          continue;
-        }
-        catch (IllegalAccessException e)
-        {
-          _log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
-          getTaskCompletionEventsMethod = null;
-          continue;
-        }
-        catch (InvocationTargetException e)
-        {
-          _log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
-          getTaskCompletionEventsMethod = null;
-          continue;
-        }
-      }
-      else
-      {
-        currentEvents = this.getTaskCompletionEvents(index);
-      }        
-      if (currentEvents.length == 0) break;
-      for (TaskCompletionEvent event : currentEvents)
-      {
-        events.add(event);
-      }
-      index += currentEvents.length;
-    }
-    
-    return events;
-  }
-  
-  private void writeAndLog(OutputStreamWriter writer, String line) throws IOException
-  {
-    writer.append(line);
-    writer.append("\n");
-    _log.info(line);
-  }
-  
-  static class HiddenFilePathFilter implements PathFilter
-  {
-    @Override
-    public boolean accept(Path path)
-    {
-      String name = path.getName();
-      return ! name.startsWith("_") &&
-             ! name.startsWith(".");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/TimeBasedJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/TimeBasedJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/TimeBasedJob.java
deleted file mode 100644
index f8708c8..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/TimeBasedJob.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.text.ParseException;
-import java.util.Date;
-import java.util.Properties;
-
-import datafu.hourglass.fs.PathUtils;
-
-/**
- * Base class for Hadoop jobs that consume time-partitioned data.
- * 
- * <p>
- * This class has the same configuration and methods as {@link AbstractJob}.
- * In addition it also recognizes the following properties:
- * </p>
- * 
- * <ul>
- *   <li><em>num.days</em> - Number of consecutive days of input data to consume</li>
- *   <li><em>days.ago</em> - Number of days to subtract off the end of the consumption window</li>
- *   <li><em>start.date</em> - Start date for window in yyyyMMdd format</li>
- *   <li><em>end.date</em> - End date for window in yyyyMMdd format</li>
- * </ul>
- * 
- * <p>
- * Methods are available as well for setting these configuration parameters.
- * </p>
- * 
- * @author "Matthew Hayes"
- *
- */
-public abstract class TimeBasedJob extends AbstractJob
-{ 
-  private Integer _numDays;
-  private Integer _daysAgo;
-  private Date _startDate;
-  private Date _endDate;  
-  
-  /**
-   * Initializes the job.
-   */
-  public TimeBasedJob()
-  {    
-  }
-  
-  /**
-   * Initializes the job with a job name and properties.
-   * 
-   * @param name Job name
-   * @param props Configuration properties
-   */
-  public TimeBasedJob(String name, Properties props)
-  {        
-    super(name,props);
-  }
-  
-  @Override
-  public void setProperties(Properties props)
-  {
-    super.setProperties(props);
-    
-    if (getProperties().get("num.days") != null)
-    {
-      setNumDays(Integer.parseInt((String)getProperties().get("num.days")));
-    }
-    
-    if (getProperties().get("days.ago") != null)
-    {
-      setDaysAgo(Integer.parseInt((String)getProperties().get("days.ago")));
-    }
-    
-    if (getProperties().get("start.date") != null)
-    {
-      try
-      {
-        // start date treated as inclusive lower bound
-        setStartDate(PathUtils.datedPathFormat.parse((String)getProperties().get("start.date")));
-      }
-      catch (ParseException e)
-      {
-        throw new IllegalArgumentException(e);
-      }
-    }
-    
-    if (getProperties().get("end.date") != null)
-    {
-      try
-      {
-        setEndDate(PathUtils.datedPathFormat.parse((String)getProperties().get("end.date")));
-      }
-      catch (ParseException e)
-      {
-        throw new IllegalArgumentException(e);
-      }
-    }
-  }
-  
-  /**
-   * Gets the number of consecutive days to process.
-   * 
-   * @return number of days to process
-   */
-  public Integer getNumDays()
-  {
-    return _numDays;
-  }
-
-  /**
-   * Sets the number of consecutive days to process.
-   * 
-   * @param numDays number of days to process
-   */
-  public void setNumDays(Integer numDays)
-  {
-    this._numDays = numDays;
-  }
-
-  /**
-   * Gets the number of days to subtract off the end of the consumption window.
-   * 
-   * @return Days ago
-   */
-  public Integer getDaysAgo()
-  {
-    return _daysAgo;
-  }
-
-  /**
-   * Sets the number of days to subtract off the end of the consumption window.
-   * 
-   * @param daysAgo Days ago
-   */
-  public void setDaysAgo(Integer daysAgo)
-  {
-    this._daysAgo = daysAgo;
-  }
-
-  /**
-   * Gets the start date.
-   * 
-   * @return Start date
-   */
-  public Date getStartDate()
-  {
-    return _startDate;
-  }
-
-  /**
-   * Sets the start date.
-   * 
-   * @param startDate start date
-   */
-  public void setStartDate(Date startDate)
-  {
-    this._startDate = startDate;
-  }
-
-  /**
-   * Gets the end date.
-   * 
-   * @return end date
-   */
-  public Date getEndDate()
-  {
-    return _endDate;
-  }
-
-  /**
-   * Sets the end date.
-   * 
-   * @param endDate end date
-   */
-  public void setEndDate(Date endDate)
-  {
-    this._endDate = endDate;
-  }  
-  
-  @Override
-  protected void validate()
-  {
-    super.validate();
-    
-    if (_daysAgo != null && _endDate != null)
-    {
-      throw new IllegalArgumentException("Cannot specify both end date and days ago");
-    }
-    
-    if (_numDays != null && _startDate != null && _endDate != null)
-    {
-      throw new IllegalArgumentException("Cannot specify num days when both start date and end date are set");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/TimePartitioner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/TimePartitioner.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/TimePartitioner.java
deleted file mode 100644
index 3a9e917..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/TimePartitioner.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Partitioner;
-
-/**
- * A partitioner used by {@link AbstractPartitionPreservingIncrementalJob} to limit the number of named outputs
- * used by each reducer.
- * 
- * <p>
- * The purpose of this partitioner is to prevent a proliferation of small files created by {@link AbstractPartitionPreservingIncrementalJob}.
- * This job writes multiple outputs.  Each output corresponds to a day of input data.  By default records will be distributed across all
- * the reducers.  This means that if many input days are consumed, then each reducer will write many outputs.  These outputs will typically
- * be small.  The problem gets worse as more input data is consumed, as this will cause more reducers to be required. 
- * </p>
- * 
- * <p>
- * This partitioner solves the problem by limiting how many days of input data will be mapped to each reducer.  At the extreme each day of
- * input data could be mapped to only one reducer.  This is controlled through the configuration setting <em>incremental.reducers.per.input</em>,
- * which should be set in the Hadoop configuration.  Input days are assigned to reducers in a round-robin fashion.
- * </p>
- * 
- * @author "Matthew Hayes"
- *
- */
-public class TimePartitioner extends Partitioner<AvroKey<GenericRecord>,AvroValue<GenericRecord>> implements Configurable
-{
-  public static String INPUT_TIMES = "incremental.input.times";  
-  public static String REDUCERS_PER_INPUT = "incremental.reducers.per.input";
-  private static String REDUCE_TASKS = "mapred.reduce.tasks";
-    
-  private int numReducers;
-  private Map<Long,List<Integer>> partitionMapping;  
-  private Configuration conf;
-  
-  @Override
-  public Configuration getConf()
-  {
-    return conf;
-  }
-
-  @Override
-  public void setConf(Configuration conf)
-  {
-    this.conf = conf;
-    
-    if (conf.get(REDUCE_TASKS) == null)
-    {
-      throw new RuntimeException(REDUCE_TASKS + " is required");
-    }
-    
-    this.numReducers = Integer.parseInt(conf.get(REDUCE_TASKS));
-    
-    if (conf.get(REDUCERS_PER_INPUT) == null)
-    {
-      throw new RuntimeException(REDUCERS_PER_INPUT + " is required");
-    }
-    
-    int reducersPerInput = Integer.parseInt(conf.get(REDUCERS_PER_INPUT));
-    
-    this.partitionMapping = new HashMap<Long,List<Integer>>();
-    int partition = 0;
-    for (String part : conf.get(INPUT_TIMES).split(","))
-    {      
-      Long day = Long.parseLong(part);
-      
-      List<Integer> partitions = new ArrayList<Integer>();
-      for (int r=0; r<reducersPerInput; r++)
-      {
-        partitions.add(partition);
-        partition = (partition + 1) % this.numReducers;
-      }
-      
-      partitionMapping.put(day,partitions);
-    }  
-  }
-  
-  @Override
-  public int getPartition(AvroKey<GenericRecord> key, AvroValue<GenericRecord> value, int numReduceTasks)
-  {
-    if (numReduceTasks != this.numReducers)
-    {
-      throw new RuntimeException("numReduceTasks " + numReduceTasks + " does not match expected " + this.numReducers);
-    }
-    
-    Long time = (Long)key.datum().get("time");
-    if (time == null)
-    {
-      throw new RuntimeException("time is null");
-    }
-    
-    List<Integer> partitions = this.partitionMapping.get(time);
-  
-    if (partitions == null)
-    {
-      throw new RuntimeException("Couldn't find partition for " + time);
-    }
-    
-    GenericRecord extractedKey = (GenericRecord)key.datum().get("value");
-    
-    if (extractedKey == null)
-    {
-      throw new RuntimeException("extracted key is null");
-    }
-    
-    int partitionIndex = (extractedKey.hashCode() & Integer.MAX_VALUE) % partitions.size();
-    
-    return partitions.get(partitionIndex);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/package-info.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/package-info.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/package-info.java
deleted file mode 100644
index 010bfd0..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/package-info.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Incremental Hadoop jobs and some supporting classes.  
- * 
- * <p>
- * Jobs within this package form the core of the incremental framework implementation.
- * There are two types of incremental jobs: <em>partition-preserving</em> and 
- * <em>partition-collapsing</em>.
- * </p>
- * 
- * <p>
- * A partition-preserving job consumes input data partitioned by day and produces output data partitioned by day.
- * This is equivalent to running a MapReduce job for each individual day of input data,
- * but much more efficient.  It compares the input data against the existing output data and only processes
- * input data with no corresponding output.
- * </p>
- * 
- * <p>
- * A partition-collapsing job consumes input data partitioned by day and produces a single output.
- * What distinguishes this job from a standard MapReduce job is that it can reuse the previous output.
- * This enables it to process data much more efficiently.  Rather than consuming all input data on each
- * run, it can consume only the new data since the previous run and merges it with the previous output.
- * </p>
- * 
- * <p>
- * Partition-preserving and partition-collapsing jobs can be created by extending {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob}
- * and {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob}, respectively, and implementing the necessary methods.
- * Alternatively, there are concrete versions of these classes, {@link datafu.hourglass.jobs.PartitionPreservingIncrementalJob} and 
- * {@link datafu.hourglass.jobs.PartitionCollapsingIncrementalJob}, which can be used instead.  With these classes, the implementations are provided
- * through setters.  
- * </p>
- * 
- * <p>
- * Incremental jobs use Avro for input, intermediate, and output data.  To implement an incremental job, one must define their schemas.
- * A <em>key schema</em> and <em>intermediate value schema</em> specify the output of the mapper and combiner, which output key-value pairs.
- * The <em>key schema</em> and an <em>output value schema</em> specify the output of the reducer, which outputs a record having key and value
- * fields.
- * </p>
- * 
- * <p>
- * An incremental job also requires that implementations of map and reduce be defined, and optionally combine.  The map implementation must 
- * implement a {@link datafu.hourglass.model.Mapper} interface, which is very similar to the standard map interface in Hadoop.
- * The combine and reduce operations are implemented through an {@link datafu.hourglass.model.Accumulator} interface.
- * This is similar to the standard reduce in Hadoop, however values are provided one-at-a-time rather than by an enumerable list.
- * Also an accumulator returns either one value or no value at all by returning null.  That is, the accumulator may not return an arbitrary number of values
- * for the output.  This restriction precludes the implementation of certain operations, like flatten, which do not fit well within the 
- * incremental programming model.
- * </p>
- */
-package datafu.hourglass.jobs;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/AvroKeyValueIdentityMapper.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/AvroKeyValueIdentityMapper.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/AvroKeyValueIdentityMapper.java
deleted file mode 100644
index b8571a0..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/AvroKeyValueIdentityMapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- * A mapper which outputs key-value pairs as-is.
- * 
- * It assumes the input is an Avro record having "key" and "value" fields.
- * The output is these exact same fields.
- * 
- * @author "Matthew Hayes"
- *
- */
-public class AvroKeyValueIdentityMapper extends Mapper<Object, Object, Object, Object> 
-{
-  @Override
-  protected void map(Object keyObj, Object valueObj, Context context) throws java.io.IOException, java.lang.InterruptedException
-  {
-    @SuppressWarnings("unchecked")
-    GenericRecord input = ((AvroKey<GenericRecord>)keyObj).datum();
-    GenericRecord key = (GenericRecord)input.get("key");
-    GenericRecord value = (GenericRecord)input.get("value");
-    context.write(new AvroKey<GenericRecord>(key),new AvroValue<GenericRecord>(value));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingCombiner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingCombiner.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingCombiner.java
deleted file mode 100644
index aa8d64c..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingCombiner.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.hadoop.mapreduce.ReduceContext;
-
-
-import datafu.hourglass.fs.DateRange;
-import datafu.hourglass.jobs.DateRangeConfigurable;
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.schemas.PartitionCollapsingSchemas;
-
-/**
- * The combiner used by {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob} and its derived classes.
- * 
- * <p>
- * An implementation of {@link datafu.hourglass.model.Accumulator} is used to perform aggregation and produce the
- * intermediate value.
- * </p>
- * 
- * @author "Matthew Hayes"
- *
- */
-public class CollapsingCombiner extends ObjectReducer implements DateRangeConfigurable, Serializable
-{  
-  private Accumulator<GenericRecord,GenericRecord> _accumulator;
-  private boolean _reusePreviousOutput;
-  private PartitionCollapsingSchemas _schemas;
-  private long _beginTime;
-  private long _endTime;
-  
-  @SuppressWarnings("unchecked")
-  public void reduce(Object keyObj,
-                      Iterable<Object> values,
-                      ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
-  {
-    Accumulator<GenericRecord,GenericRecord> acc = getAccumulator();
-    
-    if (acc == null)
-    {
-      throw new RuntimeException("No combiner factory set");
-    }
-    
-    long accumulatedCount = 0;
-    
-    acc.cleanup();
-    
-    for (Object valueObj : values)
-    {       
-      GenericRecord value = ((AvroValue<GenericRecord>)valueObj).datum();
-      if (value.getSchema().getFullName().equals(getSchemas().getIntermediateValueSchema().getFullName()))
-      {        
-        acc.accumulate(value);
-        accumulatedCount++;
-      }
-      else if (value.getSchema().getFullName().equals(getSchemas().getDatedIntermediateValueSchema().getFullName()))
-      {          
-        if (!_reusePreviousOutput)
-        {
-          throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName()); 
-        }
-        
-        Long time = (Long)value.get("time");
-        GenericRecord data = (GenericData.Record)value.get("value");
-        
-        if (time == null)
-        {
-          throw new RuntimeException("time is null");
-        }
-        
-        if (data == null)
-        {
-          throw new RuntimeException("value is null");
-        }
-        
-        if (time >= _beginTime && time <= _endTime)
-        {
-          acc.accumulate(data);
-          accumulatedCount++;
-        }
-        else if (time < _beginTime)
-        {
-          // pass through unchanged, reducer will handle it
-          context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(value));
-        }
-        else
-        {
-          throw new RuntimeException(String.format("Time %d is greater than end time %d",time,_endTime));
-        }
-      }
-      else if (value.getSchema().getFullName().equals(getSchemas().getOutputValueSchema().getFullName()))
-      {   
-        if (!_reusePreviousOutput)
-        {
-          throw new RuntimeException("Did not expect " + getSchemas().getOutputValueSchema().getFullName()); 
-        }
-                
-        // pass through unchanged, reducer will handle it
-        context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(value));
-      }
-      else
-      {
-        throw new RuntimeException("Unexpected type: " + value.getSchema().getFullName());
-      }      
-    }
-    
-    if (accumulatedCount > 0)
-    {
-      GenericRecord intermediateValue = acc.getFinal();
-      if (intermediateValue != null)
-      {
-        context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(intermediateValue));
-      }
-    }
-  }
-  
-  /**
-   * Sets the schemas.
-   * 
-   * @param schemas
-   */
-  public void setSchemas(PartitionCollapsingSchemas schemas)
-  {
-    _schemas = schemas;
-  }
-  
-  /**
-   * Gets the schemas.
-   * 
-   * @return schemas
-   */
-  public PartitionCollapsingSchemas getSchemas()
-  {
-    return _schemas;
-  }
-  
-  /**
-   * Gets whether previous output is being reused.
-   * 
-   * @return true if previous output is reused
-   */
-  public boolean getReuseOutput()
-  {
-    return _reusePreviousOutput;
-  }
-  
-  /**
-   * Sets whether previous output is being reused.
-   * 
-   * @param reuseOutput true if previous output is reused
-   */
-  public void setReuseOutput(boolean reuseOutput)
-  {
-    _reusePreviousOutput = reuseOutput;
-  }
-  
-  /**
-   * Gets the accumulator used to perform aggregation. 
-   * 
-   * @return The accumulator
-   */
-  public Accumulator<GenericRecord,GenericRecord> getAccumulator()
-  {
-    return _accumulator;
-  }
-  
-  /**
-   * Sets the accumulator used to perform aggregation. 
-   * 
-   * @param acc The accumulator
-   */
-  public void setAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
-  {
-    _accumulator = cloneAccumulator(acc);
-  }
-  
-  public void setOutputDateRange(DateRange dateRange)
-  {
-    _beginTime = dateRange.getBeginDate().getTime();
-    _endTime = dateRange.getEndDate().getTime();
-  }
-  
-  /**
-   * Clone a {@link Accumulator} by serializing and deserializing it.
-   * 
-   * @param acc The accumulator to clone
-   * @return The clone accumulator
-   */
-  private Accumulator<GenericRecord,GenericRecord> cloneAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
-  {
-    try
-    {
-      // clone by serializing
-      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-      ObjectOutputStream objStream;
-      objStream = new ObjectOutputStream(outputStream);    
-      objStream.writeObject(acc);
-      objStream.close();
-      outputStream.close();
-      ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
-      ObjectInputStream objInputStream = new ObjectInputStream(inputStream);
-      @SuppressWarnings("unchecked")
-      Accumulator<GenericRecord,GenericRecord> result = (Accumulator<GenericRecord,GenericRecord>)objInputStream.readObject();
-      objInputStream.close();
-      inputStream.close();
-      return result;
-    }
-    catch (IOException e)
-    {
-      throw new RuntimeException(e);
-    }
-    catch (ClassNotFoundException e)
-    {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingMapper.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingMapper.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingMapper.java
deleted file mode 100644
index 805ed1c..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingMapper.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-import org.apache.avro.UnresolvedUnionException;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.MapContext;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.log4j.Logger;
-
-
-import datafu.hourglass.fs.PathUtils;
-import datafu.hourglass.model.KeyValueCollector;
-import datafu.hourglass.model.Mapper;
-import datafu.hourglass.schemas.PartitionCollapsingSchemas;
-
-/**
- * The mapper used by {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob} and its derived classes.
- * 
- * <p>
- * An implementation of {@link datafu.hourglass.model.Mapper} is used for the
- * map operation, which produces key and intermediate value pairs from the input.
- * </p>
- * 
- * @author "Matthew Hayes"
- *
- */
-public class CollapsingMapper extends ObjectMapper implements Serializable
-{
-  private static Logger _log = Logger.getLogger(CollapsingMapper.class);
-  
-  private transient IdentityMapCollector _mapCollector;
-  private transient TimeMapCollector _timeMapCollector;
- 
-  private boolean _reusePreviousOutput;
-  private PartitionCollapsingSchemas _schemas;  
-  private Mapper<GenericRecord,GenericRecord,GenericRecord> _mapper;
-    
-  @Override
-  public void map(Object inputObj, MapContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
-  {
-    @SuppressWarnings("unchecked")
-    GenericRecord input = ((AvroKey<GenericRecord>)inputObj).datum();   
-    try
-    {
-      getMapCollector().setContext(context);
-      getMapper().map(input, getMapCollector());
-    }
-    catch (InterruptedException e)
-    {
-      throw new IOException(e);
-    }
-    catch (UnresolvedUnionException e)
-    {
-      GenericRecord record = (GenericRecord)e.getUnresolvedDatum();
-      _log.error("UnresolvedUnionException on schema: " + record.getSchema());
-      throw e;
-    } 
-  }
-  
-  /**
-   * Gets whether previous output is being reused.
-   * 
-   * @return true if previous output is reused
-   */
-  public boolean getReuseOutput()
-  {
-    return _reusePreviousOutput;
-  }
-  
-  /**
-   * Sets whether previous output is being reused.
-   * 
-   * @param reuseOutput true if previous output is reused
-   */
-  public void setReuseOutput(boolean reuseOutput)
-  {
-    _reusePreviousOutput = reuseOutput;
-  }
-  
-  @Override
-  public void setContext(TaskInputOutputContext<Object,Object,Object,Object> context)
-  {      
-    super.setContext(context);
-    
-    if (_mapper instanceof Configurable)
-    {
-      ((Configurable)_mapper).setConf(context.getConfiguration());
-    }
-  }
-  
-  /**
-   * Gets the mapper.
-   * 
-   * @return mapper
-   */
-  public Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper()
-  {
-    return _mapper;
-  }
-  
-  /**
-   * Sets the mapper.
-   * 
-   * @param mapper
-   */
-  public void setMapper(Mapper<GenericRecord,GenericRecord,GenericRecord> mapper)
-  {
-    _mapper = mapper;
-  }
-  
-  /**
-   * Sets the Avro schemas.
-   * 
-   * @param schemas
-   */
-  public void setSchemas(PartitionCollapsingSchemas schemas)
-  {
-    _schemas = schemas;
-  }
-  
-  /**
-   * Gets the Avro schemas.
-   * 
-   * @return schemas
-   */
-  public PartitionCollapsingSchemas getSchemas()
-  {
-    return _schemas;
-  }
-  
-  /**
-   * Gets the collector used to collect key-value pairs.
-   * 
-   * @return The collector
-   */
-  private MapCollector getMapCollector()
-  {
-    if (getReuseOutput())
-    {
-      return getTimeMapCollector();
-    }
-    else
-    {
-      return getIdentityMapCollector();
-    }
-  }
-  
-  /**
-   * Gets a collector that maps key-value pairs, where each value
-   * is tagged with the partition from which it was derived. 
-   * 
-   * @return collector
-   */
-  private TimeMapCollector getTimeMapCollector()
-  {
-    if (_timeMapCollector == null)
-    {
-      _timeMapCollector = new TimeMapCollector(getSchemas());
-    }
-    
-    return _timeMapCollector;
-  }
-  
-  /**
-   * Gets a collector that maps key-value pairs as-is.
-   * 
-   * @return collector
-   */
-  private IdentityMapCollector getIdentityMapCollector()
-  {
-    if (_mapCollector == null)
-    {
-      _mapCollector = new IdentityMapCollector(getSchemas());
-    }
-    
-    return _mapCollector;
-  }
-    
-  private abstract class MapCollector implements KeyValueCollector<GenericRecord,GenericRecord>
-  {
-    private MapContext<Object,Object,Object,Object> context;
-    
-    public void setContext(MapContext<Object,Object,Object,Object> context)
-    {
-      this.context = context;
-    }
-    
-    public MapContext<Object,Object,Object,Object> getContext()
-    {
-      return context;
-    }
-  }
-  
-  /**
-   * A {@see KeyValueCollector} that outputs key-value pairs to {@link MapContext} 
-   * and tags each mapped value with the time for the partition it was derived from.
-   * 
-   * @author "Matthew Hayes"
-   *
-   */
-  private class TimeMapCollector extends MapCollector
-  {
-    private GenericRecord wrappedValue;
-    private InputSplit lastSplit;
-    private long lastTime;
-    
-    public TimeMapCollector(PartitionCollapsingSchemas schemas)
-    {
-      this.wrappedValue = new GenericData.Record(schemas.getDatedIntermediateValueSchema());
-    }
-        
-    public void collect(GenericRecord key, GenericRecord value) throws IOException, InterruptedException
-    {                
-      if (key == null)
-      {
-        throw new RuntimeException("key is null");
-      }
-      if (value == null)
-      {
-        throw new RuntimeException("value is null");
-      }
-      
-      // wrap the value with the time so we know what to merge and what to unmerge        
-      long time;        
-      if (lastSplit == getContext().getInputSplit())
-      {
-        time = lastTime;
-      }
-      else
-      {
-        FileSplit currentSplit;
-        lastSplit = getContext().getInputSplit();
-        try
-        {
-          Method m = getContext().getInputSplit().getClass().getMethod("getInputSplit");
-          m.setAccessible(true);
-          currentSplit = (FileSplit)m.invoke(getContext().getInputSplit());
-        }
-        catch (SecurityException e)
-        {
-          throw new RuntimeException(e);
-        }
-        catch (NoSuchMethodException e)
-        {
-          throw new RuntimeException(e);
-        }
-        catch (IllegalArgumentException e)
-        {
-          throw new RuntimeException(e);
-        }
-        catch (IllegalAccessException e)
-        {
-          throw new RuntimeException(e);
-        }
-        catch (InvocationTargetException e)
-        {
-          throw new RuntimeException(e);
-        }
-        time = PathUtils.getDateForNestedDatedPath((currentSplit).getPath().getParent()).getTime();
-        lastTime = time;
-      }
-              
-      wrappedValue.put("time", time);
-      wrappedValue.put("value", value);
-      
-      getContext().write(new AvroKey<GenericRecord>(key),new AvroValue<GenericRecord>(wrappedValue));
-    }
-  } 
-
-  /**
-   * A {@see KeyValueCollector} that outputs key-value pairs to {@link MapContext} as-is.
-   * 
-   * @author "Matthew Hayes"
-   *
-   */
-  private class IdentityMapCollector extends MapCollector
-  {      
-    public IdentityMapCollector(PartitionCollapsingSchemas schemas)
-    {
-    }
-    
-    public void collect(GenericRecord key, GenericRecord value) throws IOException, InterruptedException
-    {        
-      if (key == null)
-      {
-        throw new RuntimeException("key is null");
-      }
-      if (value == null)
-      {
-        throw new RuntimeException("value is null");
-      }
-      getContext().write(new AvroKey<GenericRecord>(key), new AvroValue<GenericRecord>(value));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingReducer.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingReducer.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingReducer.java
deleted file mode 100644
index 5424754..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingReducer.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.hadoop.mapreduce.ReduceContext;
-
-
-import datafu.hourglass.fs.DateRange;
-import datafu.hourglass.jobs.DateRangeConfigurable;
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.model.Merger;
-import datafu.hourglass.schemas.PartitionCollapsingSchemas;
-
-/**
- * The reducer used by {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob} and its derived classes.
- * 
- * <p>
- * An implementation of {@link datafu.hourglass.model.Accumulator} is used to perform aggregation and produce the
- * output value.
- * </p>
- * 
- * @author "Matthew Hayes"
- *
- */
-public class CollapsingReducer extends ObjectReducer implements DateRangeConfigurable, Serializable
-{  
-  protected long _beginTime;
-  protected long _endTime;
-  private Accumulator<GenericRecord,GenericRecord> _newAccumulator;
-  private Accumulator<GenericRecord,GenericRecord> _oldAccumulator;
-  private Merger<GenericRecord> _merger;
-  private Merger<GenericRecord> _oldMerger;
-  private boolean _reusePreviousOutput;
-  private PartitionCollapsingSchemas _schemas;
-  
-  @SuppressWarnings("unchecked")
-  public void reduce(Object keyObj,
-                     Iterable<Object> values,
-                     ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
-  {
-    if (_newAccumulator == null)
-    {
-      throw new RuntimeException("No reducer set");
-    }
-    
-    GenericRecord key = ((AvroKey<GenericRecord>)keyObj).datum();
-    
-    // used when processing all data (i.e. no window size)      
-    Accumulator<GenericRecord,GenericRecord> acc = getNewAccumulator();
-    acc.cleanup();
-    long accumulatedCount = 0;
-    
-    Accumulator<GenericRecord,GenericRecord> accOld = null;
-    long oldAccumulatedCount = 0;
-    if (getReuseOutput())
-    {
-      accOld = getOldAccumulator();
-      accOld.cleanup();
-    }
-    
-    GenericRecord previous = null;
-    
-    for (Object valueObj : values)
-    {       
-      GenericRecord value = ((AvroValue<GenericRecord>)valueObj).datum();
-      if (value.getSchema().getFullName().equals(getSchemas().getIntermediateValueSchema().getFullName()))
-      {        
-        acc.accumulate(value);
-        accumulatedCount++;
-      }
-      else if (value.getSchema().getFullName().equals(getSchemas().getDatedIntermediateValueSchema().getFullName()))
-      {          
-        if (!_reusePreviousOutput)
-        {
-          throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName()); 
-        }
-        
-        Long time = (Long)value.get("time");
-        GenericRecord data = (GenericData.Record)value.get("value");
-        
-        if (time == null)
-        {
-          throw new RuntimeException("time is null");
-        }
-        
-        if (data == null)
-        {
-          throw new RuntimeException("value is null");
-        }
-        
-        if (time >= _beginTime && time <= _endTime)
-        {
-          acc.accumulate(data);
-          accumulatedCount++;
-        }
-        else if (time < _beginTime)
-        {
-          accOld.accumulate(data);
-          oldAccumulatedCount++;
-        }
-        else
-        {
-          throw new RuntimeException(String.format("Time %d is greater than end time %d",time,_endTime));
-        }
-      }
-      else if (value.getSchema().getFullName().equals(getSchemas().getOutputValueSchema().getFullName()))
-      {   
-        if (!_reusePreviousOutput)
-        {
-          throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName()); 
-        }
-        
-        // deep clone the previous output fed back in
-        previous = new GenericData.Record((Record)value,true);
-      }
-      else
-      {
-        throw new RuntimeException("Unexpected type: " + value.getSchema().getFullName());
-      }      
-    }
-            
-    GenericRecord newOutputValue = null;
-    GenericRecord oldOutputValue = null;
-    
-    if (accumulatedCount > 0)
-    {
-      newOutputValue = acc.getFinal();
-    }
-    
-    if (oldAccumulatedCount > 0)
-    {
-      oldOutputValue = accOld.getFinal();
-    }
-    
-    GenericRecord outputValue = null;
-    
-    if (previous == null)
-    {
-      outputValue = newOutputValue;
-      
-      if (oldOutputValue != null)
-      {
-        if (_oldMerger == null)
-        {
-          throw new RuntimeException("No old record merger set");
-        }
-        
-        outputValue = _oldMerger.merge(outputValue, oldOutputValue);
-      }
-    }
-    else
-    {
-      outputValue = previous;
-      
-      if (oldOutputValue != null)
-      {
-        if (_oldMerger == null)
-        {
-          throw new RuntimeException("No old record merger set");
-        }
-        
-        outputValue = _oldMerger.merge(outputValue, oldOutputValue);
-      }
-      
-      if (newOutputValue != null)
-      {
-        if (_merger == null)
-        {
-          throw new RuntimeException("No new record merger set");
-        }
-        
-        outputValue = _merger.merge(outputValue, newOutputValue);
-      }
-    }
-    
-    if (outputValue != null)
-    {
-      GenericRecord output = new GenericData.Record(getSchemas().getReduceOutputSchema());
-      output.put("key", key);
-      output.put("value", outputValue);
-      context.write(new AvroKey<GenericRecord>(output),null);
-    }
-  }
-  
-  /**
-   * Sets the Avro schemas.
-   * 
-   * @param schemas
-   */
-  public void setSchemas(PartitionCollapsingSchemas schemas)
-  {
-    _schemas = schemas;
-  }
-  
-  /**
-   * Gets the Avro schemas.
-   * 
-   * @return
-   */
-  private PartitionCollapsingSchemas getSchemas()
-  {
-    return _schemas;
-  }
-  
-  /**
-   * Gets whether previous output is being reused.
-   * 
-   * @return true if previous output is reused
-   */  
-  public boolean getReuseOutput()
-  {
-    return _reusePreviousOutput;
-  }
-  
-  /**
-   * Sets whether previous output is being reused.
-   * 
-   * @param reuseOutput true if previous output is reused
-   */
-  public void setReuseOutput(boolean reuseOutput)
-  {
-    _reusePreviousOutput = reuseOutput;
-  }
-  
-  public void setAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
-  {
-    _newAccumulator = cloneAccumulator(acc);
-    _oldAccumulator = cloneAccumulator(acc);
-  }
-  
-  public Accumulator<GenericRecord,GenericRecord> getNewAccumulator()
-  {
-    return _newAccumulator;
-  }
-  
-  public Accumulator<GenericRecord,GenericRecord> getOldAccumulator()
-  {
-    return _oldAccumulator;
-  }
-  
-  public void setRecordMerger(Merger<GenericRecord> merger)
-  {
-    _merger = merger;
-  }
-  
-  public void setOldRecordMerger(Merger<GenericRecord> merger)
-  {
-    _oldMerger = merger;
-  }
-  
-  public void setOutputDateRange(DateRange dateRange)
-  {
-    _beginTime = dateRange.getBeginDate().getTime();
-    _endTime = dateRange.getEndDate().getTime();
-  }
-  
-  /**
-   * Clone a {@link Accumulator} by serializing and deserializing it.
-   * 
-   * @param acc The accumulator to clone
-   * @return The clone accumulator
-   */
-  private Accumulator<GenericRecord,GenericRecord> cloneAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
-  {
-    try
-    {
-      // clone by serializing
-      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-      ObjectOutputStream objStream;
-      objStream = new ObjectOutputStream(outputStream);    
-      objStream.writeObject(acc);
-      objStream.close();
-      outputStream.close();
-      ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
-      ObjectInputStream objInputStream = new ObjectInputStream(inputStream);
-      @SuppressWarnings("unchecked")
-      Accumulator<GenericRecord,GenericRecord> result = (Accumulator<GenericRecord,GenericRecord>)objInputStream.readObject();
-      objInputStream.close();
-      inputStream.close();
-      return result;
-    }
-    catch (IOException e)
-    {
-      throw new RuntimeException(e);
-    }
-    catch (ClassNotFoundException e)
-    {
-      throw new RuntimeException(e);
-    }
-  }
-}