You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ja...@apache.org on 2014/05/18 21:59:43 UTC
[05/14] DATAFU-44: Migrate Hourglass to Gradle
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/ExecutionPlanner.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/ExecutionPlanner.java
new file mode 100644
index 0000000..ce1c1d6
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/ExecutionPlanner.java
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Properties;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import datafu.hourglass.fs.DatePath;
+import datafu.hourglass.fs.DateRange;
+import datafu.hourglass.fs.PathUtils;
+
+/**
+ * Base class for execution planners. An execution planner determines which files should be processed
+ * for a particular run.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public abstract class ExecutionPlanner
+{
+ private final Logger _log = Logger.getLogger(ExecutionPlanner.class);
+
+ private FileSystem _fileSystem;
+ private Properties _props;
+ private Date _startDate;
+ private Date _endDate;
+ private Integer _daysAgo;
+ private Integer _numDays;
+ private Integer _maxToProcess;
+ private DateRange _range;
+ private Path _outputPath;
+ private boolean _failOnMissing;
+ private List<Path> _inputPaths = new ArrayList<Path>();
+ private List<SortedMap<Date,DatePath>> _inputPathsByDate;
+ private Map<Date,List<DatePath>> _availableInputsByDate = new HashMap<Date,List<DatePath>>();
+
+ /**
+ * Initializes the execution planner.
+ *
+ * @param fs file system to use
+ * @param props configuration properties
+ */
+ public ExecutionPlanner(FileSystem fs, Properties props)
+ {
+ _props = props;
+ _fileSystem = fs;
+ }
+
+ /**
+ * Gets the output path.
+ *
+ * @return output path
+ */
+ public Path getOutputPath()
+ {
+ return _outputPath;
+ }
+
+ /**
+ * Gets the input paths.
+ *
+ * @return input paths
+ */
+ public List<Path> getInputPaths()
+ {
+ return _inputPaths;
+ }
+
+ /**
+ * Sets the output path.
+ *
+ * @param outputPath output path
+ */
+ public void setOutputPath(Path outputPath)
+ {
+ this._outputPath = outputPath;
+ }
+
+ /**
+ * Sets the input paths.
+ *
+ * @param inputPaths input paths
+ */
+ public void setInputPaths(List<Path> inputPaths)
+ {
+ this._inputPaths = inputPaths;
+ }
+
+ /**
+ * Sets the start date.
+ *
+ * @param startDate start date
+ */
+ public void setStartDate(Date startDate)
+ {
+ this._startDate = startDate;
+ }
+
+ /**
+ * Gets the start date
+ *
+ * @return start date
+ */
+ public Date getStartDate()
+ {
+ return _startDate;
+ }
+
+ /**
+ * Sets the end date.
+ *
+ * @param endDate end date
+ */
+ public void setEndDate(Date endDate)
+ {
+ this._endDate = endDate;
+ }
+
+ /**
+ * Gets the end date
+ *
+ * @return end date
+ */
+ public Date getEndDate()
+ {
+ return _endDate;
+ }
+
+ /**
+ * Sets the number of days to subtract off the end date.
+ *
+ * @param daysAgo days ago
+ */
+ public void setDaysAgo(Integer daysAgo)
+ {
+ this._daysAgo = daysAgo;
+ }
+
+ /**
+ * Gets the number of days to subtract off the end date.
+ *
+ * @return days ago
+ */
+ public Integer getDaysAgo()
+ {
+ return _daysAgo;
+ }
+
+ /**
+ * Sets the number of days to process.
+ *
+ * @param numDays number of days to process
+ */
+ public void setNumDays(Integer numDays)
+ {
+ this._numDays = numDays;
+ }
+
+ /**
+ * Gets the number of days to process.
+ *
+ * @return number of days to process
+ */
+ public Integer getNumDays()
+ {
+ return _numDays;
+ }
+
+ /**
+ * Sets the maximum number of days to process at a time.
+ *
+ * @param maxToProcess maximum number of days
+ */
+ public void setMaxToProcess(Integer maxToProcess)
+ {
+ this._maxToProcess = maxToProcess;
+ }
+
+ /**
+ * Gets the maximum number of days to process at a time.
+ *
+ * @return maximum number of days
+ */
+ public Integer getMaxToProcess()
+ {
+ return _maxToProcess;
+ }
+
+ /**
+ * Gets whether the job should fail if data is missing within the desired date range.
+ *
+ * @return true if the job should fail on missing data
+ */
+ public boolean isFailOnMissing()
+ {
+ return _failOnMissing;
+ }
+
+ /**
+ * Sets whether the job should fail if data is missing within the desired date range.
+ *
+ * @param failOnMissing true if the job should fail on missing data
+ */
+ public void setFailOnMissing(boolean failOnMissing)
+ {
+ this._failOnMissing = failOnMissing;
+ }
+
+ /**
+ * Gets the desired input date range to process based on the configuration and available inputs.
+ *
+ * @return desired date range
+ */
+ public DateRange getDateRange()
+ {
+ return _range;
+ }
+
+ /**
+ * Gets the file system.
+ *
+ * @return file system
+ */
+ protected FileSystem getFileSystem()
+ {
+ return _fileSystem;
+ }
+
+ /**
+ * Gets the configuration properties.
+ *
+ * @return properties
+ */
+ protected Properties getProps()
+ {
+ return _props;
+ }
+
+ /**
+ * Gets a map from date to available input data.
+ *
+ * @return map from date to available input data
+ */
+ protected Map<Date,List<DatePath>> getAvailableInputsByDate()
+ {
+ return _availableInputsByDate;
+ }
+
+ /**
+ * Get a map from date to path for all paths matching yyyy/MM/dd under the given path.
+ *
+ * @param path path to search under
+ * @return map of date to path
+ * @throws IOException
+ */
+ protected SortedMap<Date,DatePath> getDailyData(Path path) throws IOException
+ {
+ SortedMap<Date,DatePath> data = new TreeMap<Date,DatePath>();
+ for (DatePath datePath : PathUtils.findNestedDatedPaths(getFileSystem(),path))
+ {
+ data.put(datePath.getDate(),datePath);
+ }
+ return data;
+ }
+
+ /**
+ * Get a map from date to path for all paths matching yyyyMMdd under the given path.
+ *
+ * @param path path to search under
+ * @return map of date to path
+ * @throws IOException
+ */
+ protected SortedMap<Date,DatePath> getDatedData(Path path) throws IOException
+ {
+ SortedMap<Date,DatePath> data = new TreeMap<Date,DatePath>();
+ for (DatePath datePath : PathUtils.findDatedPaths(getFileSystem(),path))
+ {
+ data.put(datePath.getDate(),datePath);
+ }
+ return data;
+ }
+
+ /**
+ * Determine what input data is available.
+ *
+ * @throws IOException
+ */
+ protected void loadInputData() throws IOException
+ {
+ _inputPathsByDate = new ArrayList<SortedMap<Date,DatePath>>();
+ for (Path inputPath : getInputPaths())
+ {
+ _log.info(String.format("Searching for available input data in " + inputPath));
+ _inputPathsByDate.add(getDailyData(inputPath));
+ }
+ }
+
+ /**
+ * Determines what input data is available.
+ */
+ protected void determineAvailableInputDates()
+ {
+ // first find the latest date available for all inputs
+ PriorityQueue<Date> dates = new PriorityQueue<Date>();
+ for (SortedMap<Date,DatePath> pathMap : _inputPathsByDate)
+ {
+ for (Date date : pathMap.keySet())
+ {
+ dates.add(date);
+ }
+ }
+ if (dates.size() == 0)
+ {
+ throw new RuntimeException("No input data!");
+ }
+ List<Date> available = new ArrayList<Date>();
+ Date currentDate = dates.peek();
+ int found = 0;
+ int needed = getInputPaths().size();
+ while (currentDate != null)
+ {
+ Date date = dates.poll();
+
+ if (date != null && date.equals(currentDate))
+ {
+ found++;
+ }
+ else
+ {
+ if (found == needed)
+ {
+ available.add(currentDate);
+ }
+ else if (available.size() > 0)
+ {
+ _log.info("Did not find all input data for date " + PathUtils.datedPathFormat.format(currentDate));
+ _log.info("Paths found for " + PathUtils.datedPathFormat.format(currentDate) + ":");
+ // collect what's available for this date
+ for (SortedMap<Date,DatePath> pathMap : _inputPathsByDate)
+ {
+ DatePath path = pathMap.get(currentDate);
+ if (path != null)
+ {
+ _log.info("=> " + path);
+ }
+ }
+
+ if (_failOnMissing)
+ {
+ throw new RuntimeException("Did not find all input data for date " + PathUtils.datedPathFormat.format(currentDate));
+ }
+ else
+ {
+ available.add(currentDate);
+ }
+ }
+
+ found = 0;
+ currentDate = date;
+
+ if (currentDate != null)
+ {
+ found++;
+ }
+ }
+
+ if (found > needed)
+ {
+ throw new RuntimeException("found more than needed");
+ }
+ }
+
+ _availableInputsByDate.clear();
+
+ for (Date date : available)
+ {
+ List<DatePath> paths = new ArrayList<DatePath>();
+ for (SortedMap<Date,DatePath> map : _inputPathsByDate)
+ {
+ DatePath path = map.get(date);
+ if (path != null)
+ {
+ paths.add(path);
+ }
+ }
+ _availableInputsByDate.put(date, paths);
+ }
+ }
+
+ /**
+ * Determine the date range for inputs to process based on the configuration and available inputs.
+ */
+ protected void determineDateRange()
+ {
+ _log.info("Determining range of input data to consume");
+ _range = DateRangePlanner.getDateRange(getStartDate(), getEndDate(), getAvailableInputsByDate().keySet(), getDaysAgo(), getNumDays(), isFailOnMissing());
+ if (_range.getBeginDate() == null || _range.getEndDate() == null)
+ {
+ throw new RuntimeException("Expected start and end date");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/FileCleaner.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/FileCleaner.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/FileCleaner.java
new file mode 100644
index 0000000..7248969
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/FileCleaner.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Used to remove files from the file system when they are no longer needed.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class FileCleaner
+{
+ private final Logger log = Logger.getLogger(FileCleaner.class);
+
+ private final Set<Path> garbage = new HashSet<Path>();
+ private final FileSystem fs;
+
+ public FileCleaner(FileSystem fs)
+ {
+ this.fs = fs;
+ }
+
+ /**
+ * Add a path to be removed later.
+ *
+ * @param path
+ * @return added path
+ */
+ public Path add(Path path)
+ {
+ garbage.add(path);
+ return path;
+ }
+
+ /**
+ * Add a path to be removed later.
+ *
+ * @param path
+ * @return added path
+ */
+ public String add(String path)
+ {
+ garbage.add(new Path(path));
+ return path;
+ }
+
+ /**
+ * Removes added paths from the file system.
+ *
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public void clean() throws IOException
+ {
+ List<Path> sorted = new ArrayList<Path>(garbage);
+ Collections.sort(sorted);
+ for (Path p : sorted)
+ {
+ if (fs.exists(p))
+ {
+ log.info(String.format("Removing %s",p));
+ fs.delete(p, true);
+ }
+ }
+ garbage.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/IncrementalJob.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/IncrementalJob.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/IncrementalJob.java
new file mode 100644
index 0000000..86880b9
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/IncrementalJob.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.util.Properties;
+
+import org.apache.avro.Schema;
+
+import datafu.hourglass.schemas.TaskSchemas;
+
+/**
+ * Base class for incremental jobs. Incremental jobs consume day-partitioned input data.
+ *
+ * <p>
+ * Implementations of this class must provide key, intermediate value, and output value schemas.
+ * The key and intermediate value schemas define the output for the mapper and combiner.
+ * The key and output value schemas define the output for the reducer.
+ * </p>
+ *
+ * <p>
+ * This class has the same configuration and methods as {@link TimeBasedJob}.
+ * In addition it also recognizes the following properties:
+ * </p>
+ *
+ * <ul>
+ * <li><em>max.iterations</em> - maximum number of iterations for the job</li>
+ * <li><em>max.days.to.process</em> - maximum number of days of input data to process in a single run</li>
+ * <li><em>fail.on.missing</em> - whether the job should fail if input data within the desired range is missing</li>
+ * </ul>
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public abstract class IncrementalJob extends TimeBasedJob
+{
+ private Integer _maxToProcess;
+ private Integer _maxIterations;
+ private boolean _failOnMissing;
+ private TaskSchemas _schemas;
+
+ /**
+ * Initializes the job.
+ */
+ public IncrementalJob()
+ {
+ }
+
+ /**
+ * Initializes the job with a job name and properties.
+ *
+ * @param name job name
+ * @param props configuration properties
+ */
+ public IncrementalJob(String name, Properties props)
+ {
+ super(name,props);
+ }
+
+ public void setProperties(Properties props)
+ {
+ super.setProperties(props);
+
+ if (getProperties().get("max.iterations") != null)
+ {
+ setMaxIterations(Integer.parseInt((String)getProperties().get("max.iterations")));
+ }
+
+ if (getProperties().get("max.days.to.process") != null)
+ {
+ setMaxToProcess(Integer.parseInt((String)getProperties().get("max.days.to.process")));
+ }
+
+ if (getProperties().get("fail.on.missing") != null)
+ {
+ setFailOnMissing(Boolean.parseBoolean((String)getProperties().get("max.days.to.process")));
+ }
+ }
+
+ protected void initialize()
+ {
+ super.initialize();
+
+ if (getKeySchema() == null)
+ {
+ throw new RuntimeException("Key schema not specified");
+ }
+
+ if (getIntermediateValueSchema() == null)
+ {
+ throw new RuntimeException("Intermediate schema not specified");
+ }
+
+ if (getOutputValueSchema() == null)
+ {
+ throw new RuntimeException("Output schema not specified");
+ }
+
+ _schemas = new TaskSchemas.Builder()
+ .setKeySchema(getKeySchema())
+ .setIntermediateValueSchema(getIntermediateValueSchema())
+ .setOutputValueSchema(getOutputValueSchema())
+ .build();
+ }
+
+ /**
+ * Gets the Avro schema for the key.
+ * <p>
+ * This is also used as the key for the map output.
+ *
+ * @return key schema.
+ */
+ protected abstract Schema getKeySchema();
+
+ /**
+ * Gets the Avro schema for the intermediate value.
+ * <p>
+ * This is also used for the value for the map output.
+ *
+ * @return intermediate value schema
+ */
+ protected abstract Schema getIntermediateValueSchema();
+
+ /**
+ * Gets the Avro schema for the output data.
+ *
+ * @return output data schema
+ */
+ protected abstract Schema getOutputValueSchema();
+
+ /**
+ * Gets the schemas.
+ *
+ * @return schemas
+ */
+ protected TaskSchemas getSchemas()
+ {
+ return _schemas;
+ }
+
+ /**
+ * Gets the maximum number of days of input data to process in a single run.
+ *
+ * @return maximum number of days to process
+ */
+ public Integer getMaxToProcess()
+ {
+ return _maxToProcess;
+ }
+
+ /**
+ * Sets the maximum number of days of input data to process in a single run.
+ *
+ * @param maxToProcess maximum number of days to process
+ */
+ public void setMaxToProcess(Integer maxToProcess)
+ {
+ _maxToProcess = maxToProcess;
+ }
+
+ /**
+ * Gets the maximum number of iterations for the job. Multiple iterations will only occur
+ * when there is a maximum set for the number of days to process in a single run.
+ * An error should be thrown if this number will be exceeded.
+ *
+ * @return maximum number of iterations
+ */
+ public Integer getMaxIterations()
+ {
+ return _maxIterations;
+ }
+
+ /**
+ * Sets the maximum number of iterations for the job. Multiple iterations will only occur
+ * when there is a maximum set for the number of days to process in a single run.
+ * An error should be thrown if this number will be exceeded.
+ *
+ * @param maxIterations maximum number of iterations
+ */
+ public void setMaxIterations(Integer maxIterations)
+ {
+ _maxIterations = maxIterations;
+ }
+
+ /**
+ * Gets whether the job should fail if input data within the desired range is missing.
+ *
+ * @return true if the job should fail on missing data
+ */
+ public boolean isFailOnMissing()
+ {
+ return _failOnMissing;
+ }
+
+ /**
+ * Sets whether the job should fail if input data within the desired range is missing.
+ *
+ * @param failOnMissing true if the job should fail on missing data
+ */
+ public void setFailOnMissing(boolean failOnMissing)
+ {
+ _failOnMissing = failOnMissing;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/MaxInputDataExceededException.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/MaxInputDataExceededException.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/MaxInputDataExceededException.java
new file mode 100644
index 0000000..e2024fb
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/MaxInputDataExceededException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.hourglass.jobs;
+
+public class MaxInputDataExceededException extends Throwable
+{
+ public MaxInputDataExceededException()
+ {
+ }
+
+ public MaxInputDataExceededException(String message)
+ {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java
new file mode 100644
index 0000000..34368f7
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java
@@ -0,0 +1,558 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.SortedMap;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import datafu.hourglass.avro.AvroDateRangeMetadata;
+import datafu.hourglass.fs.DatePath;
+import datafu.hourglass.fs.DateRange;
+import datafu.hourglass.fs.PathUtils;
+
+/**
+ * Execution planner used by {@link AbstractPartitionCollapsingIncrementalJob} and its derived classes.
+ * This creates a plan to process partitioned input data and collapse the partitions into a single output.
+ *
+ * <p>
+ * To use this class, the input and output paths must be specified. In addition the desired input date
+ * range can be specified through several methods. Then {@link #createPlan()} can be called and the
+ * execution plan will be created. The inputs to process will be available from {@link #getInputsToProcess()},
+ * the number of reducers to use will be available from {@link #getNumReducers()}, and the input schemas
+ * will be available from {@link #getInputSchemas()}.
+ * </p>
+ *
+ * <p>
+ * Previous output may be reused by using {@link #setReusePreviousOutput(boolean)}. If previous output exists
+ * and it is to be reused then it will be available from {@link #getPreviousOutputToProcess()}. New input data
+ * to process that is after the previous output time range is available from {@link #getNewInputsToProcess()}.
+ * Old input data to process that is before the previous output time range and should be subtracted from the
+ * previous output is available from {@link #getOldInputsToProcess()}.
+ * </p>
+ *
+ * <p>
+ * Configuration properties are used to configure a {@link ReduceEstimator} instance. This is used to
+ * calculate how many reducers should be used.
+ * The number of reducers to use is based on the input data size and the
+ * <em>num.reducers.bytes.per.reducer</em> property. This setting can be controlled more granularly
+ * through <em>num.reducers.input.bytes.per.reducer</em> and <em>num.reducers.previous.bytes.per.reducer</em>.
+ * Check {@link ReduceEstimator} for more details on how the properties are used.
+ * </p>
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class PartitionCollapsingExecutionPlanner extends ExecutionPlanner
+{
+ private final Logger _log = Logger.getLogger(PartitionCollapsingExecutionPlanner.class);
+
+ private SortedMap<Date,DatePath> _outputPathsByDate;
+ private boolean _reusePreviousOutput;
+
+ // the chosen execution plan
+ private Plan _plan;
+
+ /**
+ * An execution plan. Encapsulates what inputs will be processed.
+ *
+ * @author mhayes
+ *
+ */
+ private class Plan
+ {
+ private List<DatePath> _inputsToProcess = new ArrayList<DatePath>();
+ private List<DatePath> _newInputsToProcess = new ArrayList<DatePath>();
+ private List<DatePath> _oldInputsToProcess = new ArrayList<DatePath>();
+ private Map<String,String> _latestInputByPath = new HashMap<String,String>();
+ private DatePath _previousOutputToProcess;
+ private List<Schema> _inputSchemas = new ArrayList<Schema>();
+ private Map<String,Schema> _inputSchemasByPath = new HashMap<String,Schema>();
+ private boolean _needAnotherPass;
+ private DateRange _currentDateRange;
+ private int _numReducers;
+ private Long _totalBytes;
+
+ public void finalizePlan() throws IOException
+ {
+ determineInputSchemas();
+ determineNumReducers();
+ determineTotalBytes();
+ }
+
+ /**
+ * Determines the number of bytes that will be consumed by this execution plan.
+ * This is used to compare alternative plans so the one with the least bytes
+ * consumed can be used.
+ *
+ * @throws IOException
+ */
+ private void determineTotalBytes() throws IOException
+ {
+ _totalBytes = 0L;
+ for (DatePath dp : _inputsToProcess)
+ {
+ _totalBytes += PathUtils.countBytes(getFileSystem(), dp.getPath());
+ }
+ if (_previousOutputToProcess != null)
+ {
+ _totalBytes += PathUtils.countBytes(getFileSystem(), _previousOutputToProcess.getPath());
+ }
+ _log.info("Total bytes consumed: " + _totalBytes);
+ }
+
+ /**
+ * Determines the input schemas. There may be multiple input schemas because multiple inputs are allowed.
+ * The latest available inputs are used to determine the schema, the assumption being that schemas are
+ * backwards-compatible.
+ *
+ * @throws IOException
+ */
+ private void determineInputSchemas() throws IOException
+ {
+ if (_latestInputByPath.size() > 0)
+ {
+ _log.info("Determining input schemas");
+ for (Entry<String,String> entry : _latestInputByPath.entrySet())
+ {
+ String root = entry.getKey();
+ String input = entry.getValue();
+ _log.info("Loading schema for " + input);
+ Schema schema = PathUtils.getSchemaFromPath(getFileSystem(),new Path(input));
+ _inputSchemas.add(schema);
+ _inputSchemasByPath.put(root, schema);
+ }
+ }
+ }
+
+ /**
+ * Determines the number of reducers to use based on the input data size and the previous output,
+ * if it exists and is being reused.
+ * The number of reducers to use is based on the input data size and the
+ * <em>num.reducers.bytes.per.reducer</em> property. This setting can be controlled more granularly
+ * through <em>num.reducers.input.bytes.per.reducer</em> and <em>num.reducers.previous.bytes.per.reducer</em>.
+ * See {@link ReduceEstimator} for details on reducer estimation.
+ *
+ * @throws IOException
+ */
+ private void determineNumReducers() throws IOException
+ {
+ ReduceEstimator estimator = new ReduceEstimator(getFileSystem(),getProps());
+ List<String> inputPaths = new ArrayList<String>();
+ for (DatePath input : _inputsToProcess)
+ {
+ inputPaths.add(input.getPath().toString());
+ estimator.addInputPath("input",input.getPath());
+ }
+ if (_previousOutputToProcess != null)
+ {
+ estimator.addInputPath("previous",_previousOutputToProcess.getPath());
+ }
+ _numReducers = estimator.getNumReducers();
+ }
+ }
+
+ /**
+ * Initializes the execution planner.
+ *
+ * @param fs file system
+ * @param props configuration properties
+ */
+ public PartitionCollapsingExecutionPlanner(FileSystem fs, Properties props)
+ {
+ super(fs, props);
+ }
+
+ /**
+ * Create the execution plan.
+ *
+ * @throws IOException
+ */
+ public void createPlan() throws IOException
+ {
+ if (_plan != null) throw new RuntimeException("Plan already exists");
+
+ _log.info("Creating execution plan");
+
+ loadInputData();
+ loadOutputData();
+ determineAvailableInputDates();
+ determineDateRange();
+
+ List<Plan> plans = new ArrayList<Plan>();
+ Plan plan;
+
+ if (_reusePreviousOutput)
+ {
+ _log.info("Output may be reused, will create alternative plan that does not reuse output");
+ plan = new Plan();
+ try
+ {
+ determineInputsToProcess(false,plan);
+ plan.finalizePlan();
+ plans.add(plan);
+ }
+ catch (MaxInputDataExceededException e)
+ {
+ _log.info(e.getMessage());
+ }
+ }
+
+ _log.info(String.format("Creating plan that %s previous output",(_reusePreviousOutput ? "reuses" : "does not reuse")));
+ plan = new Plan();
+ try
+ {
+ determineInputsToProcess(_reusePreviousOutput,plan);
+ }
+ catch (MaxInputDataExceededException e)
+ {
+ throw new RuntimeException(e);
+ }
+ plan.finalizePlan();
+ plans.add(plan);
+
+ if (plans.size() > 1)
+ {
+ _log.info(String.format("There are %d alternative execution plans:",plans.size()));
+
+ for (Plan option : plans)
+ {
+ _log.info(String.format("* Consume %d new inputs, %d old inputs, %s previous output (%d bytes)",
+ option._newInputsToProcess.size(),
+ option._oldInputsToProcess.size(),
+ option._previousOutputToProcess != null ? "reuse" : "no",
+ option._totalBytes));
+ }
+
+ // choose plan with least bytes consumed
+ Collections.sort(plans, new Comparator<Plan>() {
+ @Override
+ public int compare(Plan o1, Plan o2)
+ {
+ return o1._totalBytes.compareTo(o2._totalBytes);
+ }
+ });
+ _plan = plans.get(0);
+
+ _log.info(String.format("Choosing plan consuming %d bytes",_plan._totalBytes));
+ }
+ else
+ {
+ _plan = plans.get(0);
+ }
+ }
+
+ /**
+ * Gets whether previous output should be reused, if it exists.
+ *
+ * @return true if previous output should be reused
+ */
+ public boolean getReusePreviousOutput()
+ {
+ return _reusePreviousOutput;
+ }
+
+ /**
+ * Sets whether previous output should be reused, if it exists.
+ *
+ * @param reuse true if previous output should be reused
+ */
+ public void setReusePreviousOutput(boolean reuse)
+ {
+ _reusePreviousOutput = reuse;
+ }
+
+ /**
+ * Get the number of reducers to use based on the input and previous output data size.
+ * Must call {@link #createPlan()} first.
+ *
+ * @return number of reducers to use
+ */
+ public int getNumReducers()
+ {
+ checkPlanExists();
+ return getPlan()._numReducers;
+ }
+
+ public DateRange getCurrentDateRange()
+ {
+ checkPlanExists();
+ return getPlan()._currentDateRange;
+ }
+
+ /**
+ * Gets the previous output to reuse, or null if no output is being reused.
+ * Must call {@link #createPlan()} first.
+ *
+ * @return previous output to reuse, or null
+ */
+ public DatePath getPreviousOutputToProcess()
+ {
+ return getPlan()._previousOutputToProcess;
+ }
+
+ /**
+ * Gets all inputs that will be processed. This includes both old and new data.
+ * Must call {@link #createPlan()} first.
+ *
+ * @return inputs to process
+ */
+ public List<DatePath> getInputsToProcess()
+ {
+ return getPlan()._inputsToProcess;
+ }
+
+ /**
+ * Gets only the new data that will be processed. New data is data that falls within the
+ * desired date range.
+ * Must call {@link #createPlan()} first.
+ *
+ * @return new inputs to process
+ */
+ public List<DatePath> getNewInputsToProcess()
+ {
+ return getPlan()._newInputsToProcess;
+ }
+
+ /**
+ * Gets only the old data that will be processed. Old data is data that falls before the
+ * desired date range. It will be subtracted out from the previous output.
+ * Must call {@link #createPlan()} first.
+ *
+ * @return old inputs to process
+ */
+ public List<DatePath> getOldInputsToProcess()
+ {
+ return getPlan()._oldInputsToProcess;
+ }
+
+ /**
+ * Gets whether another pass will be required. Because there may be a limit on the number of inputs processed
+ * in a single run, multiple runs may be required to process all data in the desired date range.
+ * Must call {@link #createPlan()} first.
+ *
+ * @return true if another pass is required
+ */
+ public boolean getNeedsAnotherPass()
+ {
+ return getPlan()._needAnotherPass;
+ }
+
+ /**
+ * Gets the input schemas. Because multiple inputs are allowed, there may be multiple schemas.
+ * Must call {@link #createPlan()} first.
+ *
+ * <p>
+ * This does not include the output schema, even though previous output may be fed back as input.
+ * The reason is that the ouput schema it determined based on the input schema.
+ * </p>
+ *
+ * @return input schemas
+ */
+ public List<Schema> getInputSchemas()
+ {
+ return getPlan()._inputSchemas;
+ }
+
+ /**
+ * Gets a map from input path to schema. Because multiple inputs are allowed, there may be multiple schemas.
+ * Must call {@link #createPlan()} first.
+ *
+ * @return map from path to input schema
+ */
+ public Map<String,Schema> getInputSchemasByPath()
+ {
+ return getPlan()._inputSchemasByPath;
+ }
+
+ /**
+ * Determines what output data already exists. Previous output may be reused.
+ *
+ * @throws IOException
+ */
+ private void loadOutputData() throws IOException
+ {
+ if (getOutputPath() == null)
+ {
+ throw new RuntimeException("No output path specified");
+ }
+ _log.info(String.format("Searching for existing output data in " + getOutputPath()));
+ _outputPathsByDate = getDatedData(getOutputPath());
+ _log.info(String.format("Found %d output paths",_outputPathsByDate.size()));
+ }
+
+ /**
+ * Determines what input data to process.
+ *
+ * <p>
+ * The input data to consume is determined by the desired date range. If previous output is not reused then the input data to process
+ * will coincide with the date range. If previous output may be reused and previous output exists, then the input data to process
+ * will consist of new data and potentially old data. The new input data to process is data that has time after the previous output date range,
+ * so that it may be added to the previous output.
+ * The old data to process is data that has time before the previous output date range, so that it may be subtracted from the previous output.
+ * </p>
+ *
+ * <p>
+ * If there is a limit on how many days of input data can be processed then it may be the case that not all input data will be processed in
+ * a single run.
+ * </p>
+ *
+ * @throws IOException
+ * @throws MaxInputDataExceededException
+ */
+ private void determineInputsToProcess(boolean reusePreviousOutput, Plan plan) throws IOException, MaxInputDataExceededException
+ {
+ Calendar cal = Calendar.getInstance(PathUtils.timeZone);
+
+ DateRange outputDateRange = null;
+
+ if (reusePreviousOutput)
+ {
+ if (_outputPathsByDate.size() > 0)
+ {
+ DatePath latestPriorOutput = _outputPathsByDate.get(Collections.max(_outputPathsByDate.keySet()));
+ _log.info("Have previous output, determining what previous incremental data to difference out");
+ outputDateRange = AvroDateRangeMetadata.getOutputFileDateRange(getFileSystem(),latestPriorOutput.getPath());
+ _log.info(String.format("Previous output has date range %s to %s",
+ PathUtils.datedPathFormat.format(outputDateRange.getBeginDate()),
+ PathUtils.datedPathFormat.format(outputDateRange.getEndDate())));
+
+ for (Date currentDate=outputDateRange.getBeginDate();
+ currentDate.compareTo(getDateRange().getBeginDate()) < 0
+ && currentDate.compareTo(outputDateRange.getEndDate()) <= 0;)
+ {
+ if (!getAvailableInputsByDate().containsKey(currentDate))
+ {
+ throw new RuntimeException(String.format("Missing incremental data for %s, so can't remove it from previous output",PathUtils.datedPathFormat.format(currentDate)));
+ }
+
+ List<DatePath> inputs = getAvailableInputsByDate().get(currentDate);
+
+ for (DatePath input : inputs)
+ {
+ _log.info(String.format("Old Input: %s",input.getPath()));
+ plan._inputsToProcess.add(input);
+ plan._oldInputsToProcess.add(input);
+
+ Path root = PathUtils.getNestedPathRoot(input.getPath());
+ plan._latestInputByPath.put(root.toString(), input.getPath().toString());
+ }
+
+ cal.setTime(currentDate);
+ cal.add(Calendar.DAY_OF_MONTH, 1);
+ currentDate = cal.getTime();
+ }
+
+ plan._previousOutputToProcess = latestPriorOutput;
+ _log.info("Previous Output: " + plan._previousOutputToProcess.getPath());
+ }
+ else
+ {
+ _log.info("No previous output to reuse");
+ }
+ }
+
+ // consume the incremental data and produce the final output
+
+ int newDataCount = 0;
+ Date startDate = getDateRange().getBeginDate();
+ Date endDate = startDate;
+ for (Date currentDate=startDate; currentDate.compareTo(getDateRange().getEndDate()) <= 0; )
+ {
+ if (getMaxToProcess() != null && newDataCount >= getMaxToProcess())
+ {
+ if (!reusePreviousOutput)
+ {
+ throw new MaxInputDataExceededException(String.format("Amount of input data has exceeded max of %d however output is not being reused so cannot do in multiple passes", getMaxToProcess()));
+ }
+
+ // too much data to process in a single run, will require another pass
+ plan._needAnotherPass = true;
+ break;
+ }
+
+ if (outputDateRange == null || currentDate.compareTo(outputDateRange.getEndDate()) > 0)
+ {
+ if (!getAvailableInputsByDate().containsKey(currentDate))
+ {
+ if (isFailOnMissing())
+ {
+ throw new RuntimeException("missing " + PathUtils.datedPathFormat.format(currentDate));
+ }
+ else
+ {
+ _log.info("No input data found for " + PathUtils.datedPathFormat.format(currentDate));
+ }
+ }
+ else
+ {
+ List<DatePath> inputs = getAvailableInputsByDate().get(currentDate);
+
+ for (DatePath input : inputs)
+ {
+ _log.info(String.format("New Input: %s",input.getPath()));
+ plan._inputsToProcess.add(input);
+ plan._newInputsToProcess.add(input);
+
+ Path root = PathUtils.getNestedPathRoot(input.getPath());
+ plan._latestInputByPath.put(root.toString(), input.getPath().toString());
+ }
+
+ newDataCount++;
+ }
+ }
+
+ cal.setTime(currentDate);
+ endDate = cal.getTime();
+ cal.add(Calendar.DAY_OF_MONTH, 1);
+ currentDate = cal.getTime();
+ }
+
+ plan._currentDateRange = new DateRange(startDate,endDate);
+ }
+
+ /**
+ * Throws an exception if the plan hasn't been created.
+ */
+ private void checkPlanExists()
+ {
+ if (_plan == null) throw new RuntimeException("Must call createPlan first");
+ }
+
+ private Plan getPlan()
+ {
+ checkPlanExists();
+ return _plan;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingIncrementalJob.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingIncrementalJob.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingIncrementalJob.java
new file mode 100644
index 0000000..68e776a
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionCollapsingIncrementalJob.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+
+import datafu.hourglass.model.Accumulator;
+import datafu.hourglass.model.Mapper;
+import datafu.hourglass.model.Merger;
+
+/**
+ * A concrete version of {@link AbstractPartitionCollapsingIncrementalJob}.
+ *
+ * This provides an alternative to extending {@link AbstractPartitionCollapsingIncrementalJob}.
+ * Instead of extending this class and implementing the abstract methods, this concrete version
+ * can be used instead. Getters and setters have been provided for the abstract methods.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class PartitionCollapsingIncrementalJob extends AbstractPartitionCollapsingIncrementalJob
+{
+ private Mapper<GenericRecord,GenericRecord,GenericRecord> _mapper;
+ private Accumulator<GenericRecord,GenericRecord> _combiner;
+ private Accumulator<GenericRecord,GenericRecord> _reducer;
+ private Schema _keySchema;
+ private Schema _intermediateValueSchema;
+ private Schema _outputValueSchema;
+ private Merger<GenericRecord> _merger;
+ private Merger<GenericRecord> _oldMerger;
+ private Setup _setup;
+
+ /**
+ * Initializes the job. The job name is derived from the name of a provided class.
+ *
+ * @param cls class to base job name on
+ * @throws IOException
+ */
+ public PartitionCollapsingIncrementalJob(@SuppressWarnings("rawtypes") Class cls) throws IOException
+ {
+ setName(cls.getName());
+ }
+
+ @Override
+ public Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper()
+ {
+ return _mapper;
+ }
+
+ @Override
+ public Accumulator<GenericRecord,GenericRecord> getCombinerAccumulator()
+ {
+ return _combiner;
+ }
+
+ @Override
+ public Accumulator<GenericRecord,GenericRecord> getReducerAccumulator()
+ {
+ return _reducer;
+ }
+
+ @Override
+ protected Schema getKeySchema()
+ {
+ return _keySchema;
+ }
+
+ @Override
+ protected Schema getIntermediateValueSchema()
+ {
+ return _intermediateValueSchema;
+ }
+
+ @Override
+ protected Schema getOutputValueSchema()
+ {
+ return _outputValueSchema;
+ }
+
+ @Override
+ public Merger<GenericRecord> getRecordMerger()
+ {
+ return _merger;
+ }
+
+ @Override
+ public Merger<GenericRecord> getOldRecordMerger()
+ {
+ return _oldMerger;
+ }
+
+ /**
+ * Set the mapper.
+ *
+ * @param mapper
+ */
+ public void setMapper(Mapper<GenericRecord,GenericRecord,GenericRecord> mapper)
+ {
+ this._mapper = mapper;
+ }
+
+ /**
+ * Set the accumulator for the combiner
+ *
+ * @param combiner accumulator for the combiner
+ */
+ public void setCombinerAccumulator(Accumulator<GenericRecord,GenericRecord> combiner)
+ {
+ this._combiner = combiner;
+ }
+
+ /**
+ * Set the accumulator for the reducer.
+ *
+ * @param reducer accumulator for the reducer
+ */
+ public void setReducerAccumulator(Accumulator<GenericRecord,GenericRecord> reducer)
+ {
+ this._reducer = reducer;
+ }
+
+ /**
+ * Sets the Avro schema for the key.
+ * <p>
+ * This is also used as the key for the map output.
+ *
+ * @param keySchema key schema
+ */
+ public void setKeySchema(Schema keySchema)
+ {
+ this._keySchema = keySchema;
+ }
+
+ /**
+ * Sets the Avro schema for the intermediate value.
+ * <p>
+ * This is also used for the value for the map output.
+ *
+ * @param intermediateValueSchema intermediate value schema
+ */
+ public void setIntermediateValueSchema(Schema intermediateValueSchema)
+ {
+ this._intermediateValueSchema = intermediateValueSchema;
+ }
+
+ /**
+ * Sets the Avro schema for the output data.
+ *
+ * @param outputValueSchema output value schema
+ */
+ public void setOutputValueSchema(Schema outputValueSchema)
+ {
+ this._outputValueSchema = outputValueSchema;
+ }
+
+ /**
+ * Sets the record merger that is capable of merging previous output with a new partial output.
+ * This is only needed when reusing previous output where the intermediate and output schemas are different.
+ * New partial output is produced by the reducer from new input that is after the previous output.
+ *
+ * @param merger
+ */
+ public void setMerger(Merger<GenericRecord> merger)
+ {
+ this._merger = merger;
+ }
+
+ /**
+ * Sets the record merger that is capable of unmerging old partial output from the new output.
+ * This is only needed when reusing previous output for a fixed-length sliding window.
+ * The new output is the result of merging the previous output with the new partial output.
+ * The old partial output is produced by the reducer from old input data before the time range of
+ * the previous output.
+ *
+ * @param oldMerger merger
+ */
+ public void setOldMerger(Merger<GenericRecord> oldMerger)
+ {
+ this._oldMerger = oldMerger;
+ }
+
+ /**
+ * Set callback to provide custom configuration before job begins execution.
+ *
+ * @param setup object with callback method
+ */
+ public void setOnSetup(Setup setup)
+ {
+ _setup = setup;
+ }
+
+ @Override
+ public void config(Configuration conf)
+ {
+ super.config(conf);
+ if (_setup != null)
+ {
+ _setup.setup(conf);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java
new file mode 100644
index 0000000..6ac55a8
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import datafu.hourglass.fs.DatePath;
+import datafu.hourglass.fs.PathUtils;
+
+/**
+ * Execution planner used by {@link AbstractPartitionPreservingIncrementalJob} and its derived classes.
+ * This creates a plan to process partitioned input data and produce partitioned output data.
+ *
+ * <p>
+ * To use this class, the input and output paths must be specified. In addition the desired input date
+ * range can be specified through several methods. Then {@link #createPlan()} can be called and the
+ * execution plan will be created. The inputs to process will be available from {@link #getInputsToProcess()},
+ * the number of reducers to use will be available from {@link #getNumReducers()}, and the input schemas
+ * will be available from {@link #getInputSchemas()}.
+ * </p>
+ *
+ * <p>
+ * Configuration properties are used to configure a {@link ReduceEstimator} instance. This is used to
+ * calculate how many reducers should be used.
+ * The number of reducers to use is based on the input data size and the
+ * <em>num.reducers.bytes.per.reducer</em> property.
+ * Check {@link ReduceEstimator} for more details on how the properties are used.
+ * </p>
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class PartitionPreservingExecutionPlanner extends ExecutionPlanner
+{
+ private final Logger _log = Logger.getLogger(PartitionPreservingExecutionPlanner.class);
+
+ private SortedMap<Date,DatePath> _outputPathsByDate;
+ private Map<String,String> _latestInputByPath = new HashMap<String,String>();
+ private List<DatePath> _inputsToProcess = new ArrayList<DatePath>();
+ private List<Schema> _inputSchemas = new ArrayList<Schema>();
+ private Map<String,Schema> _inputSchemasByPath = new HashMap<String,Schema>();
+ private boolean _needAnotherPass;
+ private int _numReducers;
+ private boolean _planExists;
+
+ /**
+ * Initializes the execution planner.
+ *
+ * @param fs file system
+ * @param props configuration properties
+ */
+ public PartitionPreservingExecutionPlanner(FileSystem fs, Properties props)
+ {
+ super(fs,props);
+ }
+
+ /**
+ * Create the execution plan.
+ *
+ * @throws IOException
+ */
+ public void createPlan() throws IOException
+ {
+ if (_planExists) throw new RuntimeException("Plan already exists");
+ _planExists = true;
+ loadInputData();
+ loadOutputData();
+ determineAvailableInputDates();
+ determineDateRange();
+ determineInputsToProcess();
+ determineInputSchemas();
+ determineNumReducers();
+ }
+
+ /**
+ * Get the number of reducers to use based on the input data size.
+ * Must call {@link #createPlan()} first.
+ *
+ * @return number of reducers to use
+ */
+ public int getNumReducers()
+ {
+ checkPlanExists();
+ return _numReducers;
+ }
+
+ /**
+ * Gets the input schemas. Because multiple inputs are allowed, there may be multiple schemas.
+ * Must call {@link #createPlan()} first.
+ *
+ * @return input schemas
+ */
+ public List<Schema> getInputSchemas()
+ {
+ checkPlanExists();
+ return _inputSchemas;
+ }
+
+ /**
+ * Gets a map from input path to schema. Because multiple inputs are allowed, there may be multiple schemas.
+ * Must call {@link #createPlan()} first.
+ *
+ * @return map from path to input schema
+ */
+ public Map<String,Schema> getInputSchemasByPath()
+ {
+ checkPlanExists();
+ return _inputSchemasByPath;
+ }
+
+ /**
+ * Gets whether another pass will be required. Because there may be a limit on the number of inputs processed
+ * in a single run, multiple runs may be required to process all data in the desired date range.
+ * Must call {@link #createPlan()} first.
+ *
+ * @return true if another pass is required
+ */
+ public boolean getNeedsAnotherPass()
+ {
+ checkPlanExists();
+ return _needAnotherPass;
+ }
+
+ /**
+ * Gets the inputs which are to be processed.
+ * Must call {@link #createPlan()} first.
+ *
+ * @return inputs to process
+ */
+ public List<DatePath> getInputsToProcess()
+ {
+ checkPlanExists();
+ return _inputsToProcess;
+ }
+
+ /**
+ * Gets the input dates which are to be processed.
+ * Must call {@link #createPlan()} first.
+ *
+ * @return dates to process
+ */
+ public List<Date> getDatesToProcess()
+ {
+ checkPlanExists();
+ Set<Date> dates = new TreeSet<Date>();
+ for (DatePath dp : _inputsToProcess)
+ {
+ dates.add(dp.getDate());
+ }
+ return new ArrayList<Date>(dates);
+ }
+
+ /**
+ * Determines the number of reducers to use based on the input data size.
+ * The number of reducers to use is based on the input data size and the
+ * <em>num.reducers.bytes.per.reducer</em> property. See {@link ReduceEstimator}
+ * for details on reducer estimation.
+ *
+ * @throws IOException
+ */
+ private void determineNumReducers() throws IOException
+ {
+ ReduceEstimator estimator = new ReduceEstimator(getFileSystem(),getProps());
+ List<String> inputPaths = new ArrayList<String>();
+ for (DatePath input : getInputsToProcess())
+ {
+ inputPaths.add(input.getPath().toString());
+ estimator.addInputPath("input",input.getPath());
+ }
+ _numReducers = estimator.getNumReducers();
+ }
+
+ /**
+ * Determines the input schemas. There may be multiple input schemas because multiple inputs are allowed.
+ * The latest available inputs are used to determine the schema, the assumption being that schemas are
+ * backwards-compatible.
+ *
+ * @throws IOException
+ */
+ private void determineInputSchemas() throws IOException
+ {
+ if (_latestInputByPath.size() > 0)
+ {
+ _log.info("Determining input schemas");
+ for (Entry<String,String> entry : _latestInputByPath.entrySet())
+ {
+ String root = entry.getKey();
+ String input = entry.getValue();
+ _log.info("Loading schema for " + input);
+ Schema schema = PathUtils.getSchemaFromPath(getFileSystem(),new Path(input));
+ _inputSchemas.add(schema);
+ _inputSchemasByPath.put(root, schema);
+ }
+ }
+ }
+
+ /**
+ * Determines which input data should be processed. This checks the availability of input data within
+ * the desired date range and also checks whether the output already exists. Only inputs with no
+ * corresponding output are processed.
+ */
+ private void determineInputsToProcess()
+ {
+ _log.info("Determining inputs to process");
+ _latestInputByPath.clear();
+ int newDataCount = 0;
+ Calendar cal = Calendar.getInstance(PathUtils.timeZone);
+ for (Date currentDate=getDateRange().getBeginDate(); currentDate.compareTo(getDateRange().getEndDate()) <= 0; )
+ {
+ if (!_outputPathsByDate.containsKey(currentDate))
+ {
+ List<DatePath> inputs = getAvailableInputsByDate().get(currentDate);
+ if (inputs != null)
+ {
+ if (getMaxToProcess() != null && newDataCount >= getMaxToProcess())
+ {
+ // too much data to process in a single run, will require another pass
+ _needAnotherPass = true;
+ break;
+ }
+
+ for (DatePath input : inputs)
+ {
+ _log.info(String.format("Input: %s",input.getPath()));
+ _inputsToProcess.add(input);
+
+ Path root = PathUtils.getNestedPathRoot(input.getPath());
+ _latestInputByPath.put(root.toString(), input.getPath().toString());
+ }
+
+ newDataCount++;
+ }
+ else if (isFailOnMissing())
+ {
+ throw new RuntimeException("missing input data for " + currentDate);
+ }
+ }
+
+ cal.setTime(currentDate);
+ cal.add(Calendar.DAY_OF_MONTH, 1);
+ currentDate = cal.getTime();
+ }
+ }
+
+ /**
+ * Determines what output data already exists. Inputs will not be consumed if the output already exists.
+ *
+ * @throws IOException
+ */
+ private void loadOutputData() throws IOException
+ {
+ _log.info(String.format("Checking output data in " + getOutputPath()));
+ _outputPathsByDate = getDailyData(getOutputPath());
+ }
+
+ /**
+ * Throws an exception if the plan hasn't been created.
+ */
+ private void checkPlanExists()
+ {
+ if (!_planExists) throw new RuntimeException("Must call createPlan first");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionPreservingIncrementalJob.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionPreservingIncrementalJob.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionPreservingIncrementalJob.java
new file mode 100644
index 0000000..99aba47
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/PartitionPreservingIncrementalJob.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+
+import datafu.hourglass.model.Accumulator;
+import datafu.hourglass.model.Mapper;
+
+/**
+ * A concrete version of {@link AbstractPartitionPreservingIncrementalJob}.
+ *
+ * This provides an alternative to extending {@link AbstractPartitionPreservingIncrementalJob}.
+ * Instead of extending this class and implementing the abstract methods, this concrete version
+ * can be used instead. Getters and setters have been provided for the abstract methods.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class PartitionPreservingIncrementalJob extends AbstractPartitionPreservingIncrementalJob
+{
+ private Mapper<GenericRecord,GenericRecord,GenericRecord> _mapper;
+ private Accumulator<GenericRecord,GenericRecord> _combiner;
+ private Accumulator<GenericRecord,GenericRecord> _reducer;
+ private Schema _keySchema;
+ private Schema _intermediateValueSchema;
+ private Schema _outputValueSchema;
+ private Setup _setup;
+
+ /**
+ * Initializes the job. The job name is derived from the name of a provided class.
+ *
+ * @param cls class to base job name on
+ * @throws IOException
+ */
+ public PartitionPreservingIncrementalJob(@SuppressWarnings("rawtypes") Class cls) throws IOException
+ {
+ setName(cls.getName());
+ }
+
+ @Override
+ public Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper()
+ {
+ return _mapper;
+ }
+
+ @Override
+ public Accumulator<GenericRecord,GenericRecord> getCombinerAccumulator()
+ {
+ return _combiner;
+ }
+
+ @Override
+ public Accumulator<GenericRecord,GenericRecord> getReducerAccumulator()
+ {
+ return _reducer;
+ }
+
+ @Override
+ protected Schema getKeySchema()
+ {
+ return _keySchema;
+ }
+
+ @Override
+ protected Schema getIntermediateValueSchema()
+ {
+ return _intermediateValueSchema;
+ }
+
+ @Override
+ protected Schema getOutputValueSchema()
+ {
+ return _outputValueSchema;
+ }
+
+ /**
+ * Set the mapper.
+ *
+ * @param mapper
+ */
+ public void setMapper(Mapper<GenericRecord,GenericRecord,GenericRecord> mapper)
+ {
+ this._mapper = mapper;
+ }
+
+ /**
+ * Set the accumulator for the combiner
+ *
+ * @param combiner accumulator for the combiner
+ */
+ public void setCombinerAccumulator(Accumulator<GenericRecord,GenericRecord> combiner)
+ {
+ this._combiner = combiner;
+ }
+
+ /**
+ * Set the accumulator for the reducer.
+ *
+ * @param reducer accumulator for the reducer
+ */
+ public void setReducerAccumulator(Accumulator<GenericRecord,GenericRecord> reducer)
+ {
+ this._reducer = reducer;
+ }
+
+ /**
+ * Sets the Avro schema for the key.
+ * <p>
+ * This is also used as the key for the map output.
+ *
+ * @param keySchema key schema
+ */
+ public void setKeySchema(Schema keySchema)
+ {
+ this._keySchema = keySchema;
+ }
+
+ /**
+ * Sets the Avro schema for the intermediate value.
+ * <p>
+ * This is also used for the value for the map output.
+ *
+ * @param intermediateValueSchema intermediate value schema
+ */
+ public void setIntermediateValueSchema(Schema intermediateValueSchema)
+ {
+ this._intermediateValueSchema = intermediateValueSchema;
+ }
+
+ /**
+ * Sets the Avro schema for the output data.
+ *
+ * @param outputValueSchema output value schema
+ */
+ public void setOutputValueSchema(Schema outputValueSchema)
+ {
+ this._outputValueSchema = outputValueSchema;
+ }
+
+ /**
+ * Set callback to provide custom configuration before job begins execution.
+ *
+ * @param setup object with callback method
+ */
+ public void setOnSetup(Setup setup)
+ {
+ _setup = setup;
+ }
+
+ @Override
+ public void config(Configuration conf)
+ {
+ super.config(conf);
+ if (_setup != null)
+ {
+ _setup.setup(conf);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/ReduceEstimator.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/ReduceEstimator.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/ReduceEstimator.java
new file mode 100644
index 0000000..6931f7b
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/ReduceEstimator.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import datafu.hourglass.fs.PathUtils;
+
+/**
+ * Estimates the number of reducers needed based on input size.
+ *
+ * <p>
+ * This sums the size of the inputs and uses bytes-per-reducer
+ * settings to compute the number of reducers. By default,
+ * the bytes-per-reducer is 256 MB. This means that if the
+ * total input size is 1 GB, the total number of reducers
+ * computed will be 4.
+ * </p>
+ *
+ * <p>
+ * The bytes-per-reducer can be configured through properties
+ * provided in the constructor. The default bytes-per-reducer
+ * can be overriden by setting <em>num.reducers.bytes.per.reducer</em>.
+ * For example, if 536870912 (512 MB) is used for this setting,
+ * then 2 reducers would be used for 1 GB.
+ * </p>
+ *
+ * <p>
+ * The bytes-per-reducer can also be configured separately for
+ * different types of inputs. Inputs can be identified by a tag.
+ * For example, if an input is tagged with <em>mydata</em>, then
+ * the reducers for this input data can be configured with
+ * <em>num.reducers.mydata.bytes.per.reducer</em>.
+ * </p>
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class ReduceEstimator
+{
+ private final Logger _log = Logger.getLogger(ReduceEstimator.class);
+
+ private final Set<Path> inputPaths = new HashSet<Path>();
+ private final Map<Path,String> pathToTag = new HashMap<Path,String>();
+ private final Map<String, Long> tagToBytesPerReducer = new HashMap<String,Long>();
+ private final FileSystem fs;
+
+ private final static String DEFAULT = "default";
+ private final static Long DEFAULT_BYTES_PER_REDUCER = 256L*1024L*1024L; // 256 MB
+
+ public ReduceEstimator(FileSystem fs, Properties props)
+ {
+ this.fs = fs;
+
+ if (props != null)
+ {
+ for (Object o : props.keySet())
+ {
+ String key = (String)o;
+ if (key.startsWith("num.reducers."))
+ {
+ if (key.equals("num.reducers.bytes.per.reducer"))
+ {
+ tagToBytesPerReducer.put(DEFAULT, Long.parseLong(props.getProperty(key)));
+ }
+ else
+ {
+ Pattern p = Pattern.compile("num\\.reducers\\.([a-z]+)\\.bytes\\.per\\.reducer");
+ Matcher m = p.matcher(key);
+ if (m.matches())
+ {
+ String tag = m.group(1);
+ tagToBytesPerReducer.put(tag, Long.parseLong(props.getProperty(key)));
+ }
+ else
+ {
+ throw new RuntimeException("Property not recognized: " + key);
+ }
+ }
+ }
+ }
+ }
+
+ if (!tagToBytesPerReducer.containsKey(DEFAULT))
+ {
+ long defaultValue = DEFAULT_BYTES_PER_REDUCER;
+ _log.info(String.format("No default bytes per reducer set, using %.2f MB",toMB(defaultValue)));
+ tagToBytesPerReducer.put(DEFAULT, defaultValue);
+ }
+ }
+
+ public void addInputPath(Path input)
+ {
+ addInputPath(DEFAULT,input);
+ }
+
+ public void addInputPath(String tag, Path input)
+ {
+ if (!inputPaths.contains(input))
+ {
+ inputPaths.add(input);
+ pathToTag.put(input, tag);
+ }
+ else
+ {
+ throw new RuntimeException("Already added input: " + input);
+ }
+ }
+
+ public int getNumReducers() throws IOException
+ {
+ Map<String,Long> bytesPerTag = getTagToInputBytes();
+
+ double numReducers = 0.0;
+ for (String tag : bytesPerTag.keySet())
+ {
+ long bytes = bytesPerTag.get(tag);
+ _log.info(String.format("Found %d bytes (%.2f GB) for inputs tagged with '%s'",bytes,toGB(bytes),tag));
+ Long bytesPerReducer = tagToBytesPerReducer.get(tag);
+ if (bytesPerReducer == null)
+ {
+ bytesPerReducer = tagToBytesPerReducer.get(DEFAULT);
+
+ if (bytesPerReducer == null)
+ {
+ throw new RuntimeException("Could not determine bytes per reducer");
+ }
+
+ _log.info(String.format("No configured bytes per reducer for '%s', using default value of %.2f MB",tag,toMB(bytesPerReducer)));
+ }
+ else
+ {
+ _log.info(String.format("Using configured bytes per reducer for '%s' of %.2f MB",tag,toMB(bytesPerReducer)));
+ }
+
+ double partialNumReducers = bytes/(double)bytesPerReducer;
+
+ _log.info(String.format("Reducers computed for '%s' is %.2f",tag,partialNumReducers));
+
+ numReducers += bytes/(double)bytesPerReducer;
+ }
+
+ int finalNumReducers = Math.max(1, (int)Math.ceil(numReducers));
+
+ _log.info(String.format("Final computed reducers is: %d",finalNumReducers));
+
+ return finalNumReducers;
+ }
+
+ private static double toGB(long bytes)
+ {
+ return bytes/(1024.0*1024.0*1024.0);
+ }
+
+ private static double toMB(long bytes)
+ {
+ return bytes/(1024.0*1024.0);
+ }
+
+ /**
+ * Gets the total number of bytes per tag.
+ *
+ * @return Map from tag to total bytes
+ * @throws IOException
+ */
+ private Map<String,Long> getTagToInputBytes() throws IOException
+ {
+ Map<String,Long> result = new HashMap<String,Long>();
+ for (Path input : inputPaths)
+ {
+ long bytes = PathUtils.countBytes(fs, input);
+ String tag = pathToTag.get(input);
+ if (tag == null) throw new RuntimeException("Could not find tag for input: " + input);
+ Long current = result.get(tag);
+ if (current == null) current = 0L;
+ current += bytes;
+ result.put(tag, current);
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/jobs/Setup.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/jobs/Setup.java b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/Setup.java
new file mode 100644
index 0000000..2e26900
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/jobs/Setup.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.jobs;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Used as a callback by {@link PartitionCollapsingIncrementalJob} and {@link PartitionPreservingIncrementalJob}
+ * to provide configuration settings for the Hadoop job.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public interface Setup
+{
+ /**
+ * Set custom configuration.
+ *
+ * @param conf configuration
+ */
+ void setup(Configuration conf);
+}