You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ja...@apache.org on 2014/05/18 21:59:42 UTC
[04/14] DATAFU-44: Migrate Hourglass to Gradle
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/StagedOutputJob.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/StagedOutputJob.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/StagedOutputJob.java
new file mode 100644
index 0000000..5178133
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/StagedOutputJob.java
@@ -0,0 +1,665 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/**
+ * A derivation of {@link Job} that stages its output in another location and only
+ * moves it to the final destination if the job completes successfully.
+ * It also outputs a counters file to the file system that contains counters fetched from Hadoop
+ * and other task statistics.
+ */
+public class StagedOutputJob extends Job implements Callable<Boolean>
+{
+ private final String _stagingPrefix;
+ private final Logger _log;
+ private Path _countersPath;
+ private Path _countersParentPath;
+ private boolean _writeCounters = false;
+
+ /**
+ * Creates a job which using a temporary staging location for the output data.
+ * The data is only copied to the final output directory on successful completion
+ * of the job. This prevents existing output data from being overwritten unless
+ * the job completes successfully.
+ *
+ * @param conf configuration
+ * @param jobName job name
+ * @param inputPaths input paths
+ * @param stagingLocation where to stage output temporarily
+ * @param outputPath output path
+ * @param log logger
+ * @return job
+ */
+ public static StagedOutputJob createStagedJob(
+ Configuration conf,
+ String jobName,
+ List<String> inputPaths,
+ String stagingLocation,
+ String outputPath,
+ final Logger log)
+ {
+ final StagedOutputJob retVal;
+ try
+ {
+ retVal = new StagedOutputJob(conf, stagingLocation, log);
+ retVal.setJobName(jobName);
+ retVal.setJarByClass(getCallersClass());
+ FileInputFormat.setInputPathFilter(retVal, HiddenFilePathFilter.class);
+ }
+ catch (IOException e)
+ {
+ log.error("IOException when making a job", e);
+ throw new RuntimeException(e);
+ }
+
+ if (inputPaths != null)
+ {
+ try
+ {
+ FileInputFormat.setInputPaths(
+ retVal,
+ StringUtils.join(inputPaths.iterator(),",")
+ );
+ }
+ catch (IOException e)
+ {
+ log.error("Unable to set up input paths.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ FileOutputFormat.setOutputPath(retVal, new Path(outputPath));
+
+ return retVal;
+ }
+
+ /**
+ * Initializes the job.
+ *
+ * @param conf configuration
+ * @param stagingPrefix where to stage output temporarily
+ * @param log logger
+ * @throws IOException
+ */
+ public StagedOutputJob(Configuration conf, String stagingPrefix, Logger log) throws IOException
+ {
+ super(conf);
+ this._stagingPrefix = stagingPrefix;
+ this._log = log;
+ }
+
+ /**
+ * Gets path to store the counters. If this is not set then by default the counters will be
+ * stored in the output directory.
+ *
+ * @return path parent path for counters
+ */
+ public Path getCountersParentPath()
+ {
+ return _countersParentPath;
+ }
+
+ /**
+ * Sets path to store the counters. If this is not set then by default the counters will be
+ * stored in the output directory.
+ *
+ * @param path parent path for counters
+ */
+ public void setCountersParentPath(Path path)
+ {
+ _countersParentPath = path;
+ }
+
+ /**
+ * Path to written counters.
+ *
+ * @return counters path
+ */
+ public Path getCountersPath()
+ {
+ return _countersPath;
+ }
+
+ /**
+ * Get whether counters should be written.
+ *
+ * @return true if counters should be written
+ */
+ public boolean getWriteCounters()
+ {
+ return _writeCounters;
+ }
+
+ /**
+ * Sets whether counters should be written.
+ *
+ * @param writeCounters true if counters should be written
+ */
+ public void setWriteCounters(boolean writeCounters)
+ {
+ this._writeCounters = writeCounters;
+ }
+
+ /**
+ * Run the job.
+ */
+ @Override
+ public Boolean call() throws Exception
+ {
+ try
+ {
+ boolean success = false;
+ success = waitForCompletion(false);
+ String jobId = "?";
+
+ if (getJobID() != null)
+ {
+ jobId = String.format("job_%s_%d",getJobID().getJtIdentifier(), getJobID().getId());
+ }
+
+ if (success)
+ {
+ _log.info(String.format("Job %s with ID %s succeeded! Tracking URL: %s", getJobName(), jobId, this.getTrackingURL()));
+ }
+ else
+ {
+ _log.error(String.format("Job %s with ID %s failed! Tracking URL: %s", getJobName(), jobId, this.getTrackingURL()));
+ }
+
+ return success;
+ }
+ catch (Exception e)
+ {
+ _log.error("Exception: " + e.toString());
+ throw new Exception(e);
+ }
+ }
+
+ /**
+ * Run the job and wait for it to complete. Output will be temporarily stored under the staging path.
+ * If the job is successful it will be moved to the final location.
+ */
+ @Override
+ public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException
+ {
+ final Path actualOutputPath = FileOutputFormat.getOutputPath(this);
+ final Path stagedPath = new Path(String.format("%s/%s/staged", _stagingPrefix, System.currentTimeMillis()));
+
+ FileOutputFormat.setOutputPath(
+ this,
+ stagedPath
+ );
+
+ final Thread hook = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ killJob();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ Runtime.getRuntime().addShutdownHook(hook);
+
+ final boolean retVal = super.waitForCompletion(verbose);
+ Runtime.getRuntime().removeShutdownHook(hook);
+
+ if (retVal)
+ {
+ FileSystem fs = actualOutputPath.getFileSystem(getConfiguration());
+
+ fs.mkdirs(actualOutputPath);
+
+ _log.info(String.format("Deleting data at old path[%s]", actualOutputPath));
+ fs.delete(actualOutputPath, true);
+
+ _log.info(String.format("Moving from staged path[%s] to final resting place[%s]", stagedPath, actualOutputPath));
+ boolean renamed = fs.rename(stagedPath, actualOutputPath);
+
+ if (renamed && _writeCounters)
+ {
+ writeCounters(fs);
+ }
+
+ return renamed;
+ }
+ else
+ {
+ FileSystem fs = actualOutputPath.getFileSystem(getConfiguration());
+ _log.info(String.format("Job failed, deleting staged path[%s]", stagedPath));
+ try
+ {
+ fs.delete(stagedPath, true);
+ }
+ catch (IOException e)
+ {
+ }
+ }
+
+ _log.warn("retVal was false for some reason...");
+ return retVal;
+ }
+
+ /**
+ * Gets the class for the caller.
+ *
+ * @return caller class
+ */
+ private static Class<?> getCallersClass()
+ {
+ StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+ boolean foundSelf = false;
+ for (StackTraceElement element : stack)
+ {
+ if (foundSelf &&
+ !StagedOutputJob.class.getName().equals(element.getClassName()))
+ {
+ try
+ {
+ return Class.forName(element.getClassName());
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ else if (StagedOutputJob.class.getName().equals(element.getClassName())
+ && "getCallersClass".equals(element.getMethodName()))
+ {
+ foundSelf = true;
+ }
+ }
+ return StagedOutputJob.class;
+ }
+
+ /**
+ * Writes Hadoop counters and other task statistics to a file in the file system.
+ *
+ * @param fs
+ * @throws IOException
+ */
+ private void writeCounters(final FileSystem fs) throws IOException
+ {
+ final Path actualOutputPath = FileOutputFormat.getOutputPath(this);
+
+ SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyyMMddHHmmss");
+
+ String suffix = timestampFormat.format(new Date());
+
+ if (_countersParentPath != null)
+ {
+ if (!fs.exists(_countersParentPath))
+ {
+ _log.info("Creating counter parent path " + _countersParentPath);
+ fs.mkdirs(_countersParentPath, FsPermission.valueOf("-rwxrwxr-x"));
+ }
+ // make the name as unique as possible in this case because this may be a directory
+ // where other counter files will be dropped
+ _countersPath = new Path(_countersParentPath,".counters." + suffix);
+ }
+ else
+ {
+ _countersPath = new Path(actualOutputPath,".counters." + suffix);
+ }
+
+ _log.info(String.format("Writing counters to %s", _countersPath));
+ FSDataOutputStream counterStream = fs.create(_countersPath);
+ BufferedOutputStream buffer = new BufferedOutputStream(counterStream,256*1024);
+ OutputStreamWriter writer = new OutputStreamWriter(buffer);
+ for (String groupName : getCounters().getGroupNames())
+ {
+ for (Counter counter : getCounters().getGroup(groupName))
+ {
+ writeAndLog(writer,String.format("%s=%d",counter.getName(),counter.getValue()));
+ }
+ }
+
+ JobID jobID = this.getJobID();
+
+ org.apache.hadoop.mapred.JobID oldJobId = new org.apache.hadoop.mapred.JobID(jobID.getJtIdentifier(),jobID.getId());
+
+ long minStart = Long.MAX_VALUE;
+ long maxFinish = 0;
+ long setupStart = Long.MAX_VALUE;
+ long cleanupFinish = 0;
+ DescriptiveStatistics mapStats = new DescriptiveStatistics();
+ DescriptiveStatistics reduceStats = new DescriptiveStatistics();
+ boolean success = true;
+
+ JobClient jobClient = new JobClient(this.conf);
+
+ Map<String,String> taskIdToType = new HashMap<String,String>();
+
+ TaskReport[] setupReports = jobClient.getSetupTaskReports(oldJobId);
+ if (setupReports.length > 0)
+ {
+ _log.info("Processing setup reports");
+ for (TaskReport report : jobClient.getSetupTaskReports(oldJobId))
+ {
+ taskIdToType.put(report.getTaskID().toString(),"SETUP");
+ if (report.getStartTime() == 0)
+ {
+ _log.warn("Skipping report with zero start time");
+ continue;
+ }
+ setupStart = Math.min(setupStart, report.getStartTime());
+ }
+ }
+ else
+ {
+ _log.error("No setup reports");
+ }
+
+ TaskReport[] mapReports = jobClient.getMapTaskReports(oldJobId);
+ if (mapReports.length > 0)
+ {
+ _log.info("Processing map reports");
+ for (TaskReport report : mapReports)
+ {
+ taskIdToType.put(report.getTaskID().toString(),"MAP");
+ if (report.getFinishTime() == 0 || report.getStartTime() == 0)
+ {
+ _log.warn("Skipping report with zero start or finish time");
+ continue;
+ }
+ minStart = Math.min(minStart, report.getStartTime());
+ mapStats.addValue(report.getFinishTime() - report.getStartTime());
+ }
+ }
+ else
+ {
+ _log.error("No map reports");
+ }
+
+ TaskReport[] reduceReports = jobClient.getReduceTaskReports(oldJobId);
+ if (reduceReports.length > 0)
+ {
+ _log.info("Processing reduce reports");
+ for (TaskReport report : reduceReports)
+ {
+ taskIdToType.put(report.getTaskID().toString(),"REDUCE");
+ if (report.getFinishTime() == 0 || report.getStartTime() == 0)
+ {
+ _log.warn("Skipping report with zero start or finish time");
+ continue;
+ }
+ maxFinish = Math.max(maxFinish, report.getFinishTime());
+ reduceStats.addValue(report.getFinishTime() - report.getStartTime());
+ }
+ }
+ else
+ {
+ _log.error("No reduce reports");
+ }
+
+ TaskReport[] cleanupReports = jobClient.getCleanupTaskReports(oldJobId);
+ if (cleanupReports.length > 0)
+ {
+ _log.info("Processing cleanup reports");
+ for (TaskReport report : cleanupReports)
+ {
+ taskIdToType.put(report.getTaskID().toString(),"CLEANUP");
+ if (report.getFinishTime() == 0)
+ {
+ _log.warn("Skipping report with finish time of zero");
+ continue;
+ }
+ cleanupFinish = Math.max(cleanupFinish, report.getFinishTime());
+ }
+ }
+ else
+ {
+ _log.error("No cleanup reports");
+ }
+
+ if (minStart == Long.MAX_VALUE)
+ {
+ _log.error("Could not determine map-reduce start time");
+ success = false;
+ }
+ if (maxFinish == 0)
+ {
+ _log.error("Could not determine map-reduce finish time");
+ success = false;
+ }
+
+ if (setupStart == Long.MAX_VALUE)
+ {
+ _log.error("Could not determine setup start time");
+ success = false;
+ }
+ if (cleanupFinish == 0)
+ {
+ _log.error("Could not determine cleanup finish time");
+ success = false;
+ }
+
+ // Collect statistics on successful/failed/killed task attempts, categorized by setup/map/reduce/cleanup.
+ // Unfortunately the job client doesn't have an easier way to get these statistics.
+ Map<String,Integer> attemptStats = new HashMap<String,Integer>();
+ _log.info("Processing task attempts");
+ for (TaskCompletionEvent event : getTaskCompletionEvents(jobClient,oldJobId))
+ {
+ String type = taskIdToType.get(event.getTaskAttemptId().getTaskID().toString());
+ String status = event.getTaskStatus().toString();
+
+ String key = String.format("%s_%s_ATTEMPTS",status,type);
+ if (!attemptStats.containsKey(key))
+ {
+ attemptStats.put(key, 0);
+ }
+ attemptStats.put(key, attemptStats.get(key) + 1);
+ }
+
+ if (success)
+ {
+ writeAndLog(writer,String.format("SETUP_START_TIME_MS=%d",setupStart));
+ writeAndLog(writer,String.format("CLEANUP_FINISH_TIME_MS=%d",cleanupFinish));
+ writeAndLog(writer,String.format("COMPLETE_WALL_CLOCK_TIME_MS=%d",cleanupFinish - setupStart));
+
+ writeAndLog(writer,String.format("MAP_REDUCE_START_TIME_MS=%d",minStart));
+ writeAndLog(writer,String.format("MAP_REDUCE_FINISH_TIME_MS=%d",maxFinish));
+ writeAndLog(writer,String.format("MAP_REDUCE_WALL_CLOCK_TIME_MS=%d",maxFinish - minStart));
+
+ writeAndLog(writer,String.format("MAP_TOTAL_TASKS=%d",(long)mapStats.getN()));
+ writeAndLog(writer,String.format("MAP_MAX_TIME_MS=%d",(long)mapStats.getMax()));
+ writeAndLog(writer,String.format("MAP_MIN_TIME_MS=%d",(long)mapStats.getMin()));
+ writeAndLog(writer,String.format("MAP_AVG_TIME_MS=%d",(long)mapStats.getMean()));
+ writeAndLog(writer,String.format("MAP_STD_TIME_MS=%d",(long)mapStats.getStandardDeviation()));
+ writeAndLog(writer,String.format("MAP_SUM_TIME_MS=%d",(long)mapStats.getSum()));
+
+ writeAndLog(writer,String.format("REDUCE_TOTAL_TASKS=%d",(long)reduceStats.getN()));
+ writeAndLog(writer,String.format("REDUCE_MAX_TIME_MS=%d",(long)reduceStats.getMax()));
+ writeAndLog(writer,String.format("REDUCE_MIN_TIME_MS=%d",(long)reduceStats.getMin()));
+ writeAndLog(writer,String.format("REDUCE_AVG_TIME_MS=%d",(long)reduceStats.getMean()));
+ writeAndLog(writer,String.format("REDUCE_STD_TIME_MS=%d",(long)reduceStats.getStandardDeviation()));
+ writeAndLog(writer,String.format("REDUCE_SUM_TIME_MS=%d",(long)reduceStats.getSum()));
+
+ writeAndLog(writer,String.format("MAP_REDUCE_SUM_TIME_MS=%d",(long)mapStats.getSum() + (long)reduceStats.getSum()));
+
+ for (Map.Entry<String, Integer> attemptStat : attemptStats.entrySet())
+ {
+ writeAndLog(writer,String.format("%s=%d",attemptStat.getKey(),attemptStat.getValue()));
+ }
+ }
+
+ writer.close();
+ buffer.close();
+ counterStream.close();
+ }
+
+ /**
+ * Get all task completion events for a particular job.
+ *
+ * @param jobClient job client
+ * @param jobId job ID
+ * @return task completion events
+ * @throws IOException
+ */
+ private List<TaskCompletionEvent> getTaskCompletionEvents(JobClient jobClient, org.apache.hadoop.mapred.JobID jobId) throws IOException
+ {
+ List<TaskCompletionEvent> events = new ArrayList<TaskCompletionEvent>();
+
+ // Tries to use reflection to get access to the getTaskCompletionEvents method from the private jobSubmitClient field.
+ // This method has a parameter for the size, which defaults to 10 for the top level methods and can therefore be extremely slow
+ // if the goal is to get all events.
+
+ Method getTaskCompletionEventsMethod = null;
+ Object jobSubmitClient = null;
+
+ try
+ {
+ Field f = JobClient.class.getDeclaredField("jobSubmitClient");
+ f.setAccessible(true);
+ jobSubmitClient = f.get(jobClient);
+
+ if (jobSubmitClient != null)
+ {
+ getTaskCompletionEventsMethod = jobSubmitClient.getClass().getDeclaredMethod("getTaskCompletionEvents", org.apache.hadoop.mapred.JobID.class,int.class,int.class);
+ getTaskCompletionEventsMethod.setAccessible(true);
+ }
+ }
+ catch (NoSuchMethodException e)
+ {
+ }
+ catch (SecurityException e)
+ {
+ }
+ catch (NoSuchFieldException e)
+ {
+ }
+ catch (IllegalArgumentException e)
+ {
+ }
+ catch (IllegalAccessException e)
+ {
+ }
+
+ if (getTaskCompletionEventsMethod != null)
+ {
+ _log.info("Will call getTaskCompletionEvents via reflection since it's faster");
+ }
+ else
+ {
+ _log.info("Will call getTaskCompletionEvents via the slow method");
+ }
+
+ int index = 0;
+ while(true)
+ {
+ TaskCompletionEvent[] currentEvents;
+ if (getTaskCompletionEventsMethod != null)
+ {
+ try
+ {
+ // grab events, 250 at a time, which is faster than the other method which defaults to 10 at a time (with no override ability)
+ currentEvents = (TaskCompletionEvent[])getTaskCompletionEventsMethod.invoke(jobSubmitClient, jobId, index, 250);
+ }
+ catch (IllegalArgumentException e)
+ {
+ _log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
+ getTaskCompletionEventsMethod = null;
+ continue;
+ }
+ catch (IllegalAccessException e)
+ {
+ _log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
+ getTaskCompletionEventsMethod = null;
+ continue;
+ }
+ catch (InvocationTargetException e)
+ {
+ _log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
+ getTaskCompletionEventsMethod = null;
+ continue;
+ }
+ }
+ else
+ {
+ currentEvents = this.getTaskCompletionEvents(index);
+ }
+ if (currentEvents.length == 0) break;
+ for (TaskCompletionEvent event : currentEvents)
+ {
+ events.add(event);
+ }
+ index += currentEvents.length;
+ }
+
+ return events;
+ }
+
+ private void writeAndLog(OutputStreamWriter writer, String line) throws IOException
+ {
+ writer.append(line);
+ writer.append("\n");
+ _log.info(line);
+ }
+
+ static class HiddenFilePathFilter implements PathFilter
+ {
+ @Override
+ public boolean accept(Path path)
+ {
+ String name = path.getName();
+ return ! name.startsWith("_") &&
+ ! name.startsWith(".");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/TimeBasedJob.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/TimeBasedJob.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/TimeBasedJob.java
new file mode 100644
index 0000000..81112da
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/TimeBasedJob.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.text.ParseException;
+import java.util.Date;
+import java.util.Properties;
+
+import datafu.hourglass.fs.PathUtils;
+
+/**
+ * Base class for Hadoop jobs that consume time-partitioned data.
+ *
+ * <p>
+ * This class has the same configuration and methods as {@link AbstractJob}.
+ * In addition it also recognizes the following properties:
+ * </p>
+ *
+ * <ul>
+ * <li><em>num.days</em> - Number of consecutive days of input data to consume</li>
+ * <li><em>days.ago</em> - Number of days to subtract off the end of the consumption window</li>
+ * <li><em>start.date</em> - Start date for window in yyyyMMdd format</li>
+ * <li><em>end.date</em> - End date for window in yyyyMMdd format</li>
+ * </ul>
+ *
+ * <p>
+ * Methods are available as well for setting these configuration parameters.
+ * </p>
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public abstract class TimeBasedJob extends AbstractJob
+{
+ private Integer _numDays;
+ private Integer _daysAgo;
+ private Date _startDate;
+ private Date _endDate;
+
+ /**
+ * Initializes the job.
+ */
+ public TimeBasedJob()
+ {
+ }
+
+ /**
+ * Initializes the job with a job name and properties.
+ *
+ * @param name Job name
+ * @param props Configuration properties
+ */
+ public TimeBasedJob(String name, Properties props)
+ {
+ super(name,props);
+ }
+
+ @Override
+ public void setProperties(Properties props)
+ {
+ super.setProperties(props);
+
+ if (getProperties().get("num.days") != null)
+ {
+ setNumDays(Integer.parseInt((String)getProperties().get("num.days")));
+ }
+
+ if (getProperties().get("days.ago") != null)
+ {
+ setDaysAgo(Integer.parseInt((String)getProperties().get("days.ago")));
+ }
+
+ if (getProperties().get("start.date") != null)
+ {
+ try
+ {
+ // start date treated as inclusive lower bound
+ setStartDate(PathUtils.datedPathFormat.parse((String)getProperties().get("start.date")));
+ }
+ catch (ParseException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ if (getProperties().get("end.date") != null)
+ {
+ try
+ {
+ setEndDate(PathUtils.datedPathFormat.parse((String)getProperties().get("end.date")));
+ }
+ catch (ParseException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ }
+
+ /**
+ * Gets the number of consecutive days to process.
+ *
+ * @return number of days to process
+ */
+ public Integer getNumDays()
+ {
+ return _numDays;
+ }
+
+ /**
+ * Sets the number of consecutive days to process.
+ *
+ * @param numDays number of days to process
+ */
+ public void setNumDays(Integer numDays)
+ {
+ this._numDays = numDays;
+ }
+
+ /**
+ * Gets the number of days to subtract off the end of the consumption window.
+ *
+ * @return Days ago
+ */
+ public Integer getDaysAgo()
+ {
+ return _daysAgo;
+ }
+
+ /**
+ * Sets the number of days to subtract off the end of the consumption window.
+ *
+ * @param daysAgo Days ago
+ */
+ public void setDaysAgo(Integer daysAgo)
+ {
+ this._daysAgo = daysAgo;
+ }
+
+ /**
+ * Gets the start date.
+ *
+ * @return Start date
+ */
+ public Date getStartDate()
+ {
+ return _startDate;
+ }
+
+ /**
+ * Sets the start date.
+ *
+ * @param startDate start date
+ */
+ public void setStartDate(Date startDate)
+ {
+ this._startDate = startDate;
+ }
+
+ /**
+ * Gets the end date.
+ *
+ * @return end date
+ */
+ public Date getEndDate()
+ {
+ return _endDate;
+ }
+
+ /**
+ * Sets the end date.
+ *
+ * @param endDate end date
+ */
+ public void setEndDate(Date endDate)
+ {
+ this._endDate = endDate;
+ }
+
+ @Override
+ protected void validate()
+ {
+ super.validate();
+
+ if (_daysAgo != null && _endDate != null)
+ {
+ throw new IllegalArgumentException("Cannot specify both end date and days ago");
+ }
+
+ if (_numDays != null && _startDate != null && _endDate != null)
+ {
+ throw new IllegalArgumentException("Cannot specify num days when both start date and end date are set");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/TimePartitioner.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/TimePartitioner.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/TimePartitioner.java
new file mode 100644
index 0000000..17c460d
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/TimePartitioner.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * A partitioner used by {@link AbstractPartitionPreservingIncrementalJob} to limit the number of named outputs
+ * used by each reducer.
+ *
+ * <p>
+ * The purpose of this partitioner is to prevent a proliferation of small files created by {@link AbstractPartitionPreservingIncrementalJob}.
+ * This job writes multiple outputs. Each output corresponds to a day of input data. By default records will be distributed across all
+ * the reducers. This means that if many input days are consumed, then each reducer will write many outputs. These outputs will typically
+ * be small. The problem gets worse as more input data is consumed, as this will cause more reducers to be required.
+ * </p>
+ *
+ * <p>
+ * This partitioner solves the problem by limiting how many days of input data will be mapped to each reducer. At the extreme each day of
+ * input data could be mapped to only one reducer. This is controlled through the configuration setting <em>incremental.reducers.per.input</em>,
+ * which should be set in the Hadoop configuration. Input days are assigned to reducers in a round-robin fashion.
+ * </p>
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class TimePartitioner extends Partitioner<AvroKey<GenericRecord>,AvroValue<GenericRecord>> implements Configurable
+{
+ public static String INPUT_TIMES = "incremental.input.times";
+ public static String REDUCERS_PER_INPUT = "incremental.reducers.per.input";
+ private static String REDUCE_TASKS = "mapred.reduce.tasks";
+
+ private int numReducers;
+ private Map<Long,List<Integer>> partitionMapping;
+ private Configuration conf;
+
+ @Override
+ public Configuration getConf()
+ {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf)
+ {
+ this.conf = conf;
+
+ if (conf.get(REDUCE_TASKS) == null)
+ {
+ throw new RuntimeException(REDUCE_TASKS + " is required");
+ }
+
+ this.numReducers = Integer.parseInt(conf.get(REDUCE_TASKS));
+
+ if (conf.get(REDUCERS_PER_INPUT) == null)
+ {
+ throw new RuntimeException(REDUCERS_PER_INPUT + " is required");
+ }
+
+ int reducersPerInput = Integer.parseInt(conf.get(REDUCERS_PER_INPUT));
+
+ this.partitionMapping = new HashMap<Long,List<Integer>>();
+ int partition = 0;
+ for (String part : conf.get(INPUT_TIMES).split(","))
+ {
+ Long day = Long.parseLong(part);
+
+ List<Integer> partitions = new ArrayList<Integer>();
+ for (int r=0; r<reducersPerInput; r++)
+ {
+ partitions.add(partition);
+ partition = (partition + 1) % this.numReducers;
+ }
+
+ partitionMapping.put(day,partitions);
+ }
+ }
+
+ @Override
+ public int getPartition(AvroKey<GenericRecord> key, AvroValue<GenericRecord> value, int numReduceTasks)
+ {
+ if (numReduceTasks != this.numReducers)
+ {
+ throw new RuntimeException("numReduceTasks " + numReduceTasks + " does not match expected " + this.numReducers);
+ }
+
+ Long time = (Long)key.datum().get("time");
+ if (time == null)
+ {
+ throw new RuntimeException("time is null");
+ }
+
+ List<Integer> partitions = this.partitionMapping.get(time);
+
+ if (partitions == null)
+ {
+ throw new RuntimeException("Couldn't find partition for " + time);
+ }
+
+ GenericRecord extractedKey = (GenericRecord)key.datum().get("value");
+
+ if (extractedKey == null)
+ {
+ throw new RuntimeException("extracted key is null");
+ }
+
+ int partitionIndex = (extractedKey.hashCode() & Integer.MAX_VALUE) % partitions.size();
+
+ return partitions.get(partitionIndex);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/package-info.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/package-info.java
new file mode 100644
index 0000000..c0cac13
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/package-info.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Incremental Hadoop jobs and some supporting classes.
+ *
+ * <p>
+ * Jobs within this package form the core of the incremental framework implementation.
+ * There are two types of incremental jobs: <em>partition-preserving</em> and
+ * <em>partition-collapsing</em>.
+ * </p>
+ *
+ * <p>
+ * A partition-preserving job consumes input data partitioned by day and produces output data partitioned by day.
+ * This is equivalent to running a MapReduce job for each individual day of input data,
+ * but much more efficient. It compares the input data against the existing output data and only processes
+ * input data with no corresponding output.
+ * </p>
+ *
+ * <p>
+ * A partition-collapsing job consumes input data partitioned by day and produces a single output.
+ * What distinguishes this job from a standard MapReduce job is that it can reuse the previous output.
+ * This enables it to process data much more efficiently. Rather than consuming all input data on each
+ * run, it can consume only the new data since the previous run and merges it with the previous output.
+ * </p>
+ *
+ * <p>
+ * Partition-preserving and partition-collapsing jobs can be created by extending {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob}
+ * and {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob}, respectively, and implementing the necessary methods.
+ * Alternatively, there are concrete versions of these classes, {@link datafu.hourglass.jobs.PartitionPreservingIncrementalJob} and
+ * {@link datafu.hourglass.jobs.PartitionCollapsingIncrementalJob}, which can be used instead. With these classes, the implementations are provided
+ * through setters.
+ * </p>
+ *
+ * <p>
+ * Incremental jobs use Avro for input, intermediate, and output data. To implement an incremental job, one must define their schemas.
+ * A <em>key schema</em> and <em>intermediate value schema</em> specify the output of the mapper and combiner, which output key-value pairs.
+ * The <em>key schema</em> and an <em>output value schema</em> specify the output of the reducer, which outputs a record having key and value
+ * fields.
+ * </p>
+ *
+ * <p>
+ * An incremental job also requires that implementations of map and reduce be defined, and optionally combine. The map implementation must
+ * implement a {@link datafu.hourglass.model.Mapper} interface, which is very similar to the standard map interface in Hadoop.
+ * The combine and reduce operations are implemented through an {@link datafu.hourglass.model.Accumulator} interface.
+ * This is similar to the standard reduce in Hadoop, however values are provided one-at-a-time rather than by an enumerable list.
+ * Also an accumulator returns either one value or no value at all by returning null. That is, the accumulator may not return an arbitrary number of values
+ * for the output. This restriction precludes the implementation of certain operations, like flatten, which do not fit well within the
+ * incremental programming model.
+ * </p>
+ */
+package datafu.hourglass.jobs;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/AvroKeyValueIdentityMapper.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/AvroKeyValueIdentityMapper.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/AvroKeyValueIdentityMapper.java
new file mode 100644
index 0000000..ff317ca
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/AvroKeyValueIdentityMapper.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * A mapper which outputs key-value pairs as-is.
+ *
+ * It assumes the input is an Avro record having "key" and "value" fields.
+ * The output is these exact same fields.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class AvroKeyValueIdentityMapper extends Mapper<Object, Object, Object, Object>
+{
+ @Override
+ protected void map(Object keyObj, Object valueObj, Context context) throws java.io.IOException, java.lang.InterruptedException
+ {
+ @SuppressWarnings("unchecked")
+ GenericRecord input = ((AvroKey<GenericRecord>)keyObj).datum();
+ GenericRecord key = (GenericRecord)input.get("key");
+ GenericRecord value = (GenericRecord)input.get("value");
+ context.write(new AvroKey<GenericRecord>(key),new AvroValue<GenericRecord>(value));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingCombiner.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingCombiner.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingCombiner.java
new file mode 100644
index 0000000..60ce4fe
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingCombiner.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.mapreduce.ReduceContext;
+
+
+import datafu.hourglass.fs.DateRange;
+import datafu.hourglass.jobs.DateRangeConfigurable;
+import datafu.hourglass.model.Accumulator;
+import datafu.hourglass.schemas.PartitionCollapsingSchemas;
+
+/**
+ * The combiner used by {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob} and its derived classes.
+ *
+ * <p>
+ * An implementation of {@link datafu.hourglass.model.Accumulator} is used to perform aggregation and produce the
+ * intermediate value.
+ * </p>
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class CollapsingCombiner extends ObjectReducer implements DateRangeConfigurable, Serializable
+{
+ private Accumulator<GenericRecord,GenericRecord> _accumulator;
+ private boolean _reusePreviousOutput;
+ private PartitionCollapsingSchemas _schemas;
+ private long _beginTime;
+ private long _endTime;
+
+ @SuppressWarnings("unchecked")
+ public void reduce(Object keyObj,
+ Iterable<Object> values,
+ ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
+ {
+ Accumulator<GenericRecord,GenericRecord> acc = getAccumulator();
+
+ if (acc == null)
+ {
+ throw new RuntimeException("No combiner factory set");
+ }
+
+ long accumulatedCount = 0;
+
+ acc.cleanup();
+
+ for (Object valueObj : values)
+ {
+ GenericRecord value = ((AvroValue<GenericRecord>)valueObj).datum();
+ if (value.getSchema().getFullName().equals(getSchemas().getIntermediateValueSchema().getFullName()))
+ {
+ acc.accumulate(value);
+ accumulatedCount++;
+ }
+ else if (value.getSchema().getFullName().equals(getSchemas().getDatedIntermediateValueSchema().getFullName()))
+ {
+ if (!_reusePreviousOutput)
+ {
+ throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName());
+ }
+
+ Long time = (Long)value.get("time");
+ GenericRecord data = (GenericData.Record)value.get("value");
+
+ if (time == null)
+ {
+ throw new RuntimeException("time is null");
+ }
+
+ if (data == null)
+ {
+ throw new RuntimeException("value is null");
+ }
+
+ if (time >= _beginTime && time <= _endTime)
+ {
+ acc.accumulate(data);
+ accumulatedCount++;
+ }
+ else if (time < _beginTime)
+ {
+ // pass through unchanged, reducer will handle it
+ context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(value));
+ }
+ else
+ {
+ throw new RuntimeException(String.format("Time %d is greater than end time %d",time,_endTime));
+ }
+ }
+ else if (value.getSchema().getFullName().equals(getSchemas().getOutputValueSchema().getFullName()))
+ {
+ if (!_reusePreviousOutput)
+ {
+ throw new RuntimeException("Did not expect " + getSchemas().getOutputValueSchema().getFullName());
+ }
+
+ // pass through unchanged, reducer will handle it
+ context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(value));
+ }
+ else
+ {
+ throw new RuntimeException("Unexpected type: " + value.getSchema().getFullName());
+ }
+ }
+
+ if (accumulatedCount > 0)
+ {
+ GenericRecord intermediateValue = acc.getFinal();
+ if (intermediateValue != null)
+ {
+ context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(intermediateValue));
+ }
+ }
+ }
+
+ /**
+ * Sets the schemas.
+ *
+ * @param schemas
+ */
+ public void setSchemas(PartitionCollapsingSchemas schemas)
+ {
+ _schemas = schemas;
+ }
+
+ /**
+ * Gets the schemas.
+ *
+ * @return schemas
+ */
+ public PartitionCollapsingSchemas getSchemas()
+ {
+ return _schemas;
+ }
+
+ /**
+ * Gets whether previous output is being reused.
+ *
+ * @return true if previous output is reused
+ */
+ public boolean getReuseOutput()
+ {
+ return _reusePreviousOutput;
+ }
+
+ /**
+ * Sets whether previous output is being reused.
+ *
+ * @param reuseOutput true if previous output is reused
+ */
+ public void setReuseOutput(boolean reuseOutput)
+ {
+ _reusePreviousOutput = reuseOutput;
+ }
+
+ /**
+ * Gets the accumulator used to perform aggregation.
+ *
+ * @return The accumulator
+ */
+ public Accumulator<GenericRecord,GenericRecord> getAccumulator()
+ {
+ return _accumulator;
+ }
+
+ /**
+ * Sets the accumulator used to perform aggregation.
+ *
+ * @param acc The accumulator
+ */
+ public void setAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
+ {
+ _accumulator = cloneAccumulator(acc);
+ }
+
+ public void setOutputDateRange(DateRange dateRange)
+ {
+ _beginTime = dateRange.getBeginDate().getTime();
+ _endTime = dateRange.getEndDate().getTime();
+ }
+
+ /**
+ * Clone a {@link Accumulator} by serializing and deserializing it.
+ *
+ * @param acc The accumulator to clone
+ * @return The clone accumulator
+ */
+ private Accumulator<GenericRecord,GenericRecord> cloneAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
+ {
+ try
+ {
+ // clone by serializing
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ ObjectOutputStream objStream;
+ objStream = new ObjectOutputStream(outputStream);
+ objStream.writeObject(acc);
+ objStream.close();
+ outputStream.close();
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+ ObjectInputStream objInputStream = new ObjectInputStream(inputStream);
+ @SuppressWarnings("unchecked")
+ Accumulator<GenericRecord,GenericRecord> result = (Accumulator<GenericRecord,GenericRecord>)objInputStream.readObject();
+ objInputStream.close();
+ inputStream.close();
+ return result;
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingMapper.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingMapper.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingMapper.java
new file mode 100644
index 0000000..7c6c5a2
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingMapper.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.avro.UnresolvedUnionException;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.log4j.Logger;
+
+
+import datafu.hourglass.fs.PathUtils;
+import datafu.hourglass.model.KeyValueCollector;
+import datafu.hourglass.model.Mapper;
+import datafu.hourglass.schemas.PartitionCollapsingSchemas;
+
+/**
+ * The mapper used by {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob} and its derived classes.
+ *
+ * <p>
+ * An implementation of {@link datafu.hourglass.model.Mapper} is used for the
+ * map operation, which produces key and intermediate value pairs from the input.
+ * </p>
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class CollapsingMapper extends ObjectMapper implements Serializable
+{
+ private static Logger _log = Logger.getLogger(CollapsingMapper.class);
+
+ private transient IdentityMapCollector _mapCollector;
+ private transient TimeMapCollector _timeMapCollector;
+
+ private boolean _reusePreviousOutput;
+ private PartitionCollapsingSchemas _schemas;
+ private Mapper<GenericRecord,GenericRecord,GenericRecord> _mapper;
+
+ @Override
+ public void map(Object inputObj, MapContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
+ {
+ @SuppressWarnings("unchecked")
+ GenericRecord input = ((AvroKey<GenericRecord>)inputObj).datum();
+ try
+ {
+ getMapCollector().setContext(context);
+ getMapper().map(input, getMapCollector());
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
+ }
+ catch (UnresolvedUnionException e)
+ {
+ GenericRecord record = (GenericRecord)e.getUnresolvedDatum();
+ _log.error("UnresolvedUnionException on schema: " + record.getSchema());
+ throw e;
+ }
+ }
+
+ /**
+ * Gets whether previous output is being reused.
+ *
+ * @return true if previous output is reused
+ */
+ public boolean getReuseOutput()
+ {
+ return _reusePreviousOutput;
+ }
+
+ /**
+ * Sets whether previous output is being reused.
+ *
+ * @param reuseOutput true if previous output is reused
+ */
+ public void setReuseOutput(boolean reuseOutput)
+ {
+ _reusePreviousOutput = reuseOutput;
+ }
+
+ @Override
+ public void setContext(TaskInputOutputContext<Object,Object,Object,Object> context)
+ {
+ super.setContext(context);
+
+ if (_mapper instanceof Configurable)
+ {
+ ((Configurable)_mapper).setConf(context.getConfiguration());
+ }
+ }
+
+ /**
+ * Gets the mapper.
+ *
+ * @return mapper
+ */
+ public Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper()
+ {
+ return _mapper;
+ }
+
+ /**
+ * Sets the mapper.
+ *
+ * @param mapper
+ */
+ public void setMapper(Mapper<GenericRecord,GenericRecord,GenericRecord> mapper)
+ {
+ _mapper = mapper;
+ }
+
+ /**
+ * Sets the Avro schemas.
+ *
+ * @param schemas
+ */
+ public void setSchemas(PartitionCollapsingSchemas schemas)
+ {
+ _schemas = schemas;
+ }
+
+ /**
+ * Gets the Avro schemas.
+ *
+ * @return schemas
+ */
+ public PartitionCollapsingSchemas getSchemas()
+ {
+ return _schemas;
+ }
+
+ /**
+ * Gets the collector used to collect key-value pairs.
+ *
+ * @return The collector
+ */
+ private MapCollector getMapCollector()
+ {
+ if (getReuseOutput())
+ {
+ return getTimeMapCollector();
+ }
+ else
+ {
+ return getIdentityMapCollector();
+ }
+ }
+
+ /**
+ * Gets a collector that maps key-value pairs, where each value
+ * is tagged with the partition from which it was derived.
+ *
+ * @return collector
+ */
+ private TimeMapCollector getTimeMapCollector()
+ {
+ if (_timeMapCollector == null)
+ {
+ _timeMapCollector = new TimeMapCollector(getSchemas());
+ }
+
+ return _timeMapCollector;
+ }
+
+ /**
+ * Gets a collector that maps key-value pairs as-is.
+ *
+ * @return collector
+ */
+ private IdentityMapCollector getIdentityMapCollector()
+ {
+ if (_mapCollector == null)
+ {
+ _mapCollector = new IdentityMapCollector(getSchemas());
+ }
+
+ return _mapCollector;
+ }
+
+ private abstract class MapCollector implements KeyValueCollector<GenericRecord,GenericRecord>
+ {
+ private MapContext<Object,Object,Object,Object> context;
+
+ public void setContext(MapContext<Object,Object,Object,Object> context)
+ {
+ this.context = context;
+ }
+
+ public MapContext<Object,Object,Object,Object> getContext()
+ {
+ return context;
+ }
+ }
+
+ /**
+ * A {@see KeyValueCollector} that outputs key-value pairs to {@link MapContext}
+ * and tags each mapped value with the time for the partition it was derived from.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+ private class TimeMapCollector extends MapCollector
+ {
+ private GenericRecord wrappedValue;
+ private InputSplit lastSplit;
+ private long lastTime;
+
+ public TimeMapCollector(PartitionCollapsingSchemas schemas)
+ {
+ this.wrappedValue = new GenericData.Record(schemas.getDatedIntermediateValueSchema());
+ }
+
+ public void collect(GenericRecord key, GenericRecord value) throws IOException, InterruptedException
+ {
+ if (key == null)
+ {
+ throw new RuntimeException("key is null");
+ }
+ if (value == null)
+ {
+ throw new RuntimeException("value is null");
+ }
+
+ // wrap the value with the time so we know what to merge and what to unmerge
+ long time;
+ if (lastSplit == getContext().getInputSplit())
+ {
+ time = lastTime;
+ }
+ else
+ {
+ FileSplit currentSplit;
+ lastSplit = getContext().getInputSplit();
+ try
+ {
+ Method m = getContext().getInputSplit().getClass().getMethod("getInputSplit");
+ m.setAccessible(true);
+ currentSplit = (FileSplit)m.invoke(getContext().getInputSplit());
+ }
+ catch (SecurityException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (NoSuchMethodException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InvocationTargetException e)
+ {
+ throw new RuntimeException(e);
+ }
+ time = PathUtils.getDateForNestedDatedPath((currentSplit).getPath().getParent()).getTime();
+ lastTime = time;
+ }
+
+ wrappedValue.put("time", time);
+ wrappedValue.put("value", value);
+
+ getContext().write(new AvroKey<GenericRecord>(key),new AvroValue<GenericRecord>(wrappedValue));
+ }
+ }
+
+ /**
+ * A {@see KeyValueCollector} that outputs key-value pairs to {@link MapContext} as-is.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+ private class IdentityMapCollector extends MapCollector
+ {
+ public IdentityMapCollector(PartitionCollapsingSchemas schemas)
+ {
+ }
+
+ public void collect(GenericRecord key, GenericRecord value) throws IOException, InterruptedException
+ {
+ if (key == null)
+ {
+ throw new RuntimeException("key is null");
+ }
+ if (value == null)
+ {
+ throw new RuntimeException("value is null");
+ }
+ getContext().write(new AvroKey<GenericRecord>(key), new AvroValue<GenericRecord>(value));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingReducer.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingReducer.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingReducer.java
new file mode 100644
index 0000000..86aa66c
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingReducer.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.mapreduce.ReduceContext;
+
+
+import datafu.hourglass.fs.DateRange;
+import datafu.hourglass.jobs.DateRangeConfigurable;
+import datafu.hourglass.model.Accumulator;
+import datafu.hourglass.model.Merger;
+import datafu.hourglass.schemas.PartitionCollapsingSchemas;
+
+/**
+ * The reducer used by {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob} and its derived classes.
+ *
+ * <p>
+ * An implementation of {@link datafu.hourglass.model.Accumulator} is used to perform aggregation and produce the
+ * output value.
+ * </p>
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class CollapsingReducer extends ObjectReducer implements DateRangeConfigurable, Serializable
+{
+ protected long _beginTime;
+ protected long _endTime;
+ private Accumulator<GenericRecord,GenericRecord> _newAccumulator;
+ private Accumulator<GenericRecord,GenericRecord> _oldAccumulator;
+ private Merger<GenericRecord> _merger;
+ private Merger<GenericRecord> _oldMerger;
+ private boolean _reusePreviousOutput;
+ private PartitionCollapsingSchemas _schemas;
+
+ @SuppressWarnings("unchecked")
+ public void reduce(Object keyObj,
+ Iterable<Object> values,
+ ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
+ {
+ if (_newAccumulator == null)
+ {
+ throw new RuntimeException("No reducer set");
+ }
+
+ GenericRecord key = ((AvroKey<GenericRecord>)keyObj).datum();
+
+ // used when processing all data (i.e. no window size)
+ Accumulator<GenericRecord,GenericRecord> acc = getNewAccumulator();
+ acc.cleanup();
+ long accumulatedCount = 0;
+
+ Accumulator<GenericRecord,GenericRecord> accOld = null;
+ long oldAccumulatedCount = 0;
+ if (getReuseOutput())
+ {
+ accOld = getOldAccumulator();
+ accOld.cleanup();
+ }
+
+ GenericRecord previous = null;
+
+ for (Object valueObj : values)
+ {
+ GenericRecord value = ((AvroValue<GenericRecord>)valueObj).datum();
+ if (value.getSchema().getFullName().equals(getSchemas().getIntermediateValueSchema().getFullName()))
+ {
+ acc.accumulate(value);
+ accumulatedCount++;
+ }
+ else if (value.getSchema().getFullName().equals(getSchemas().getDatedIntermediateValueSchema().getFullName()))
+ {
+ if (!_reusePreviousOutput)
+ {
+ throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName());
+ }
+
+ Long time = (Long)value.get("time");
+ GenericRecord data = (GenericData.Record)value.get("value");
+
+ if (time == null)
+ {
+ throw new RuntimeException("time is null");
+ }
+
+ if (data == null)
+ {
+ throw new RuntimeException("value is null");
+ }
+
+ if (time >= _beginTime && time <= _endTime)
+ {
+ acc.accumulate(data);
+ accumulatedCount++;
+ }
+ else if (time < _beginTime)
+ {
+ accOld.accumulate(data);
+ oldAccumulatedCount++;
+ }
+ else
+ {
+ throw new RuntimeException(String.format("Time %d is greater than end time %d",time,_endTime));
+ }
+ }
+ else if (value.getSchema().getFullName().equals(getSchemas().getOutputValueSchema().getFullName()))
+ {
+ if (!_reusePreviousOutput)
+ {
+ throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName());
+ }
+
+ // deep clone the previous output fed back in
+ previous = new GenericData.Record((Record)value,true);
+ }
+ else
+ {
+ throw new RuntimeException("Unexpected type: " + value.getSchema().getFullName());
+ }
+ }
+
+ GenericRecord newOutputValue = null;
+ GenericRecord oldOutputValue = null;
+
+ if (accumulatedCount > 0)
+ {
+ newOutputValue = acc.getFinal();
+ }
+
+ if (oldAccumulatedCount > 0)
+ {
+ oldOutputValue = accOld.getFinal();
+ }
+
+ GenericRecord outputValue = null;
+
+ if (previous == null)
+ {
+ outputValue = newOutputValue;
+
+ if (oldOutputValue != null)
+ {
+ if (_oldMerger == null)
+ {
+ throw new RuntimeException("No old record merger set");
+ }
+
+ outputValue = _oldMerger.merge(outputValue, oldOutputValue);
+ }
+ }
+ else
+ {
+ outputValue = previous;
+
+ if (oldOutputValue != null)
+ {
+ if (_oldMerger == null)
+ {
+ throw new RuntimeException("No old record merger set");
+ }
+
+ outputValue = _oldMerger.merge(outputValue, oldOutputValue);
+ }
+
+ if (newOutputValue != null)
+ {
+ if (_merger == null)
+ {
+ throw new RuntimeException("No new record merger set");
+ }
+
+ outputValue = _merger.merge(outputValue, newOutputValue);
+ }
+ }
+
+ if (outputValue != null)
+ {
+ GenericRecord output = new GenericData.Record(getSchemas().getReduceOutputSchema());
+ output.put("key", key);
+ output.put("value", outputValue);
+ context.write(new AvroKey<GenericRecord>(output),null);
+ }
+ }
+
+ /**
+ * Sets the Avro schemas.
+ *
+ * @param schemas
+ */
+ public void setSchemas(PartitionCollapsingSchemas schemas)
+ {
+ _schemas = schemas;
+ }
+
+ /**
+ * Gets the Avro schemas.
+ *
+ * @return
+ */
+ private PartitionCollapsingSchemas getSchemas()
+ {
+ return _schemas;
+ }
+
+ /**
+ * Gets whether previous output is being reused.
+ *
+ * @return true if previous output is reused
+ */
+ public boolean getReuseOutput()
+ {
+ return _reusePreviousOutput;
+ }
+
+ /**
+ * Sets whether previous output is being reused.
+ *
+ * @param reuseOutput true if previous output is reused
+ */
+ public void setReuseOutput(boolean reuseOutput)
+ {
+ _reusePreviousOutput = reuseOutput;
+ }
+
+ public void setAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
+ {
+ _newAccumulator = cloneAccumulator(acc);
+ _oldAccumulator = cloneAccumulator(acc);
+ }
+
+ public Accumulator<GenericRecord,GenericRecord> getNewAccumulator()
+ {
+ return _newAccumulator;
+ }
+
+ public Accumulator<GenericRecord,GenericRecord> getOldAccumulator()
+ {
+ return _oldAccumulator;
+ }
+
+ public void setRecordMerger(Merger<GenericRecord> merger)
+ {
+ _merger = merger;
+ }
+
+ public void setOldRecordMerger(Merger<GenericRecord> merger)
+ {
+ _oldMerger = merger;
+ }
+
+ public void setOutputDateRange(DateRange dateRange)
+ {
+ _beginTime = dateRange.getBeginDate().getTime();
+ _endTime = dateRange.getEndDate().getTime();
+ }
+
+ /**
+ * Clone a {@link Accumulator} by serializing and deserializing it.
+ *
+ * @param acc The accumulator to clone
+ * @return The clone accumulator
+ */
+ private Accumulator<GenericRecord,GenericRecord> cloneAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
+ {
+ try
+ {
+ // clone by serializing
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ ObjectOutputStream objStream;
+ objStream = new ObjectOutputStream(outputStream);
+ objStream.writeObject(acc);
+ objStream.close();
+ outputStream.close();
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+ ObjectInputStream objInputStream = new ObjectInputStream(inputStream);
+ @SuppressWarnings("unchecked")
+ Accumulator<GenericRecord,GenericRecord> result = (Accumulator<GenericRecord,GenericRecord>)objInputStream.readObject();
+ objInputStream.close();
+ inputStream.close();
+ return result;
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingCombiner.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingCombiner.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingCombiner.java
new file mode 100644
index 0000000..8b7d574
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingCombiner.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * A Hadoop combiner which delegates to an implementation read from the distributed cache.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class DelegatingCombiner extends Reducer<Object, Object, Object, Object>
+{
+ private ObjectReducer processor;
+
+ @Override
+ protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException
+ {
+ super.setup(context);
+
+ if (context != null)
+ {
+ Configuration conf = context.getConfiguration();
+
+ String path = conf.get(Parameters.COMBINER_IMPL_PATH);
+
+ this.processor = (ObjectReducer)DistributedCacheHelper.readObject(conf, new Path(path));
+ this.processor.setContext(context);
+ }
+ }
+
+ @Override
+ protected void reduce(Object key, Iterable<Object> values, Context context) throws java.io.IOException, java.lang.InterruptedException
+ {
+ this.processor.reduce(key, values, context);
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
+ InterruptedException
+ {
+ this.processor.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingMapper.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingMapper.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingMapper.java
new file mode 100644
index 0000000..ea7a8ae
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingMapper.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * A Hadoop mapper which delegates to an implementation read from the distributed cache.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class DelegatingMapper extends Mapper<Object, Object, Object, Object>
+{
+ private ObjectMapper processor;
+
+ @Override
+ protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException
+ {
+ super.setup(context);
+
+ if (context != null)
+ {
+ Configuration conf = context.getConfiguration();
+
+ String path = conf.get(Parameters.MAPPER_IMPL_PATH);
+
+ this.processor = (ObjectMapper)DistributedCacheHelper.readObject(conf, new Path(path));
+ this.processor.setContext(context);
+ }
+ }
+
+ @Override
+ protected void map(Object key, Object value, Context context) throws java.io.IOException, java.lang.InterruptedException
+ {
+ this.processor.map(key, context);
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
+ InterruptedException
+ {
+ this.processor.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingReducer.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingReducer.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingReducer.java
new file mode 100644
index 0000000..6596a5f
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DelegatingReducer.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * A Hadoop reducer which delegates to an implementation read from the distributed cache.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class DelegatingReducer extends Reducer<Object, Object, Object, Object>
+{
+ private ObjectReducer processor;
+
+ @Override
+ protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException
+ {
+ super.setup(context);
+
+ if (context != null)
+ {
+ Configuration conf = context.getConfiguration();
+
+ String path = conf.get(Parameters.REDUCER_IMPL_PATH);
+
+ this.processor = (ObjectReducer)DistributedCacheHelper.readObject(conf, new Path(path));
+ this.processor.setContext(context);
+ }
+ }
+
+ @Override
+ protected void reduce(Object key, Iterable<Object> values, Context context) throws java.io.IOException, java.lang.InterruptedException
+ {
+ this.processor.reduce(key, values, context);
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
+ InterruptedException
+ {
+ this.processor.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java
new file mode 100644
index 0000000..78c8911
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Methods for working with the Hadoop distributed cache.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class DistributedCacheHelper
+{
+ /**
+ * Deserializes an object from a path in HDFS.
+ *
+ * @param conf Hadoop configuration
+ * @param path Path to deserialize from
+ * @return Deserialized object
+ * @throws IOException
+ */
+ public static Object readObject(Configuration conf, org.apache.hadoop.fs.Path path) throws IOException
+ {
+ String localPath = null;
+ Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(conf);
+ for (Path localCacheFile : localCacheFiles)
+ {
+ if (localCacheFile.toString().endsWith(path.toString()))
+ {
+ localPath = localCacheFile.toString();
+ break;
+ }
+ }
+ if (localPath == null)
+ {
+ throw new RuntimeException("Could not find " + path + " in local cache");
+ }
+ FileInputStream inputStream = new FileInputStream(new File(localPath));
+ ObjectInputStream objStream = new ObjectInputStream(inputStream);
+
+ try
+ {
+ try {
+ return objStream.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ finally
+ {
+ objStream.close();
+ inputStream.close();
+ }
+ }
+
+ /**
+ * Serializes an object to a path in HDFS and adds the file to the distributed cache.
+ *
+ * @param conf Hadoop configuration
+ * @param obj Object to serialize
+ * @param path Path to serialize object to
+ * @throws IOException
+ */
+ public static void writeObject(Configuration conf, Object obj, org.apache.hadoop.fs.Path path) throws IOException
+ {
+ FileSystem fs = FileSystem.get(conf);
+ FSDataOutputStream outputStream = fs.create(path, true);
+ ObjectOutputStream objStream = new ObjectOutputStream(outputStream);
+ objStream.writeObject(obj);
+ objStream.close();
+ outputStream.close();
+ DistributedCache.addCacheFile(path.toUri(), conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectMapper.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectMapper.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectMapper.java
new file mode 100644
index 0000000..9723b32
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectMapper.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.MapContext;
+
+/**
+ * Defines the interface for a mapper implementation that {@link DelegatingMapper} delegates to.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public abstract class ObjectMapper extends ObjectProcessor {
+ public abstract void map(Object input,
+ MapContext<Object,Object,Object,Object> context) throws IOException,InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectProcessor.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectProcessor.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectProcessor.java
new file mode 100644
index 0000000..b4383c7
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectProcessor.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Base class for {@link ObjectMapper} and {@link ObjectReducer}.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public abstract class ObjectProcessor
+{
+ private TaskInputOutputContext<Object,Object,Object,Object> context;
+
+ public TaskInputOutputContext<Object,Object,Object,Object> getContext()
+ {
+ return this.context;
+ }
+
+ public void setContext(TaskInputOutputContext<Object,Object,Object,Object> context)
+ {
+ this.context = context;
+ }
+
+ public void close() throws IOException, InterruptedException
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectReducer.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectReducer.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectReducer.java
new file mode 100644
index 0000000..786ca31
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/ObjectReducer.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.ReduceContext;
+
+/**
+ * Defines the interface for combiner and reducer implementations that {@link DelegatingCombiner} and
+ * {@link DelegatingReducer} delegate to.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public abstract class ObjectReducer extends ObjectProcessor {
+ public abstract void reduce(Object key,
+ Iterable<Object> values,
+ ReduceContext<Object,Object,Object,Object> context) throws IOException,InterruptedException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/Parameters.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/Parameters.java b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/Parameters.java
new file mode 100644
index 0000000..2fc690e
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/Parameters.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.mapreduce;
+
+/**
+ * Parameters used by the jobs to pass configuration settings
+ * to the mappers, combiners, and reducers.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class Parameters
+{
+ public static final String MAPPER_IMPL_PATH = "hourglass.mapper.impl.path";
+ public static final String REDUCER_IMPL_PATH = "hourglass.reducer.impl.path";
+ public static final String COMBINER_IMPL_PATH = "hourglass.combiner.impl.path";
+}