You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ja...@apache.org on 2014/05/18 21:59:50 UTC
[12/14] DATAFU-44: Migrate Hourglass to Gradle
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractPartitionPreservingIncrementalJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractPartitionPreservingIncrementalJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractPartitionPreservingIncrementalJob.java
deleted file mode 100644
index 74a8bc2..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractPartitionPreservingIncrementalJob.java
+++ /dev/null
@@ -1,700 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.log4j.Logger;
-
-
-import datafu.hourglass.avro.AvroMultipleInputsKeyInputFormat;
-import datafu.hourglass.avro.AvroMultipleInputsUtil;
-import datafu.hourglass.fs.DatePath;
-import datafu.hourglass.fs.PathUtils;
-import datafu.hourglass.mapreduce.DelegatingCombiner;
-import datafu.hourglass.mapreduce.DelegatingMapper;
-import datafu.hourglass.mapreduce.DelegatingReducer;
-import datafu.hourglass.mapreduce.DistributedCacheHelper;
-import datafu.hourglass.mapreduce.ObjectMapper;
-import datafu.hourglass.mapreduce.ObjectReducer;
-import datafu.hourglass.mapreduce.Parameters;
-import datafu.hourglass.mapreduce.PartitioningCombiner;
-import datafu.hourglass.mapreduce.PartitioningMapper;
-import datafu.hourglass.mapreduce.PartitioningReducer;
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.model.Mapper;
-import datafu.hourglass.schemas.PartitionPreservingSchemas;
-
-/**
- * An {@link IncrementalJob} that consumes partitioned input data and produces
- * output data having the same partitions.
- * Typically this is used in conjunction with {@link AbstractPartitionCollapsingIncrementalJob}
- * when computing aggregates over sliding windows. A partition-preserving job can perform
- * initial aggregation per-day, which can then be consumed by a partition-collapsing job to
- * produce the final aggregates over the time window.
- * Only Avro is supported for the input, intermediate, and output data.
- *
- * <p>
- * Implementations of this class must provide key, intermediate value, and output value schemas.
- * The key and intermediate value schemas define the output for the mapper and combiner.
- * The key and output value schemas define the output for the reducer.
- * These are defined by overriding {@link #getKeySchema()}, {@link #getIntermediateValueSchema()},
- * and {@link #getOutputValueSchema()}.
- * </p>
- *
- * <p>
- * Implementations must also provide a mapper by overriding {@link #getMapper()} and an accumulator
- * for the reducer by overriding {@link #getReducerAccumulator()}. An optional combiner may be
- * provided by overriding {@link #getCombinerAccumulator()}. For the combiner to be used
- * the property <em>use.combiner</em> must also be set to true.
- * </p>
- *
- * <p>
- * The distinguishing feature this type of job is that the input partitioning is preserved in the ouput.
- * The data from each partition is processed independently of other partitions and then output separately.
- * For example, input that is partitioned by day can be aggregated by day and then output by day.
- * This is achieved by attaching a long value to each key, which represents the partition, so that the reducer
- * receives data grouped by the key and partition together. Multiple outputs are then used so that the output
- * will have the same partitions as the input.
- * </p>
- *
- * <p>
- * The input path can be provided either through the property <em>input.path</em>
- * or by calling {@link #setInputPaths(List)}. If multiple input paths are provided then
- * this implicitly means a join is to be performed. Multiple input paths can be provided via
- * properties by prefixing each with <em>input.path.</em>, such as <em>input.path.first</em>
- * and <em>input.path.second</em>.
- * Input data must be partitioned by day according to the naming convention yyyy/MM/dd.
- * The output path can be provided either through the property <em>output.path</em>
- * or by calling {@link #setOutputPath(Path)}.
- * Output data will be written using the same naming convention as the input, namely yyyy/MM/dd, where the date used
- * to format the output path is the same the date for the input it was derived from.
- * For example, if the desired time range to process is 2013/01/01 through 2013/01/14,
- * then the output will be named 2013/01/01 through 2013/01/14.
- * By default the job will fail if any input data in the desired time window is missing. This can be overriden by setting
- * <em>fail.on.missing</em> to false.
- * </p>
- *
- * <p>
- * The job will not process input for which a corresponding output already exists. For example, if the desired date
- * range is 2013/01/01 through 2013/01/14 and the outputs 2013/01/01 through 2013/01/12 exist, then only
- * 2013/01/13 and 2013/01/14 will be processed and only 2013/01/13 and 2013/01/14 will be produced.
- * </p>
- *
- * <p>
- * The number of paths in the output to retain can be configured through the property <em>retention.count</em>,
- * or by calling {@link #setRetentionCount(Integer)}. When this property is set only the latest paths in the output
- * will be kept; the remainder will be removed. By default there is no retention count set so all output paths are kept.
- * </p>
- *
- * <p>
- * The inputs to process can be controlled by defining a desired date range. By default the job will process all input
- * data available. To limit the number of days of input to process one can set the property <em>num.days</em>
- * or call {@link #setNumDays(Integer)}. This would define a processing window with the same number of days,
- * where the end date of the window is the latest available input and the start date is <em>num.days</em> ago.
- * Only inputs within this window would be processed.
- * Because the end date is the same as the latest available input, as new input data becomes available the end of the
- * window will advance forward to include it. The end date can be adjusted backwards relative to the latest input
- * through the property <em>days.ago</em>, or by calling {@link #setDaysAgo(Integer)}. This subtracts as many days
- * from the latest available input date to determine the end date. The start date or end date can also be fixed
- * by setting the properties <em>start.date</em> or <em>end.date</em>, or by calling {@link #setStartDate(Date)}
- * or {@link #setEndDate(Date)}.
- * </p>
- *
- * <p>
- * The number of reducers to use is automatically determined based on the size of the data to process.
- * The total size is computed and then divided by the value of the property <em>num.reducers.bytes.per.reducer</em>, which
- * defaults to 256 MB. This is the number of reducers that will be used.
- * The number of reducers can also be set to a fixed value through the property <em>num.reducers</em>.
- * </p>
- *
- * <p>
- * This type of job is capable of performing its work over multiple iterations.
- * The number of days to process at a time can be limited by setting the property <em>max.days.to.process</em>,
- * or by calling {@link #setMaxToProcess(Integer)}. The default is 90 days.
- * This can be useful when there are restrictions on how many tasks
- * can be used by a single MapReduce job in the cluster. When this property is set, the job will process no more than
- * this many days at a time, and it will perform one or more iterations if necessary to complete the work.
- * The number of iterations can be limited by setting the property <em>max.iterations</em>, or by calling {@link #setMaxIterations(Integer)}.
- * If the number of iterations is exceeded the job will fail. By default the maximum number of iterations is 20.
- * </p>
- *
- * <p>
- * Hadoop configuration may be provided by setting a property with the prefix <em>hadoop-conf.</em>.
- * For example, <em>mapred.min.split.size</em> can be configured by setting property
- * <em>hadoop-conf.mapred.min.split.size</em> to the desired value.
- * </p>
- *
- * @author "Matthew Hayes"
- *
- */
-public abstract class AbstractPartitionPreservingIncrementalJob extends IncrementalJob
-{
- private final Logger _log = Logger.getLogger(AbstractPartitionPreservingIncrementalJob.class);
-
- private List<Report> _reports = new ArrayList<Report>();
- private PartitioningMapper _mapper;
- private PartitioningCombiner _combiner;
- private PartitioningReducer _reducer;
- private FileCleaner _garbage;
-
- /**
- * Initializes the job.
- */
- public AbstractPartitionPreservingIncrementalJob() throws IOException
- {
- }
-
- /**
- * Initializes the job with a job name and properties.
- *
- * @param name job name
- * @param props configuration properties
- */
- public AbstractPartitionPreservingIncrementalJob(String name, Properties props) throws IOException
- {
- super(name,props);
- }
-
- /**
- * Gets the mapper.
- *
- * @return mapper
- */
- public abstract Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper();
-
- /**
- * Gets the accumulator used for the combiner.
- *
- * @return combiner accumulator
- */
- public Accumulator<GenericRecord,GenericRecord> getCombinerAccumulator()
- {
- return null;
- }
-
- /**
- * Gets the accumulator used for the reducer.
- *
- * @return reducer accumulator
- */
- public abstract Accumulator<GenericRecord,GenericRecord> getReducerAccumulator();
-
- /**
- * Run the job.
- *
- * @throws IOException
- * @throws InterruptedException
- * @throws ClassNotFoundException
- */
- @Override
- public void run() throws IOException, InterruptedException, ClassNotFoundException
- {
- try
- {
- initialize();
- validate();
- execute();
- }
- finally
- {
- cleanup();
- }
- }
-
- /**
- * Get reports that summarize each of the job iterations.
- *
- * @return reports
- */
- public List<Report> getReports()
- {
- return Collections.unmodifiableList(_reports);
- }
-
- @Override
- protected void initialize()
- {
- _garbage = new FileCleaner(getFileSystem());
-
- if (getMaxIterations() == null)
- {
- setMaxIterations(20);
- }
-
- if (getMaxToProcess() == null)
- {
- if (getNumDays() != null)
- {
- setMaxToProcess(getNumDays());
- }
- else
- {
- setMaxToProcess(90);
- }
- }
-
- super.initialize();
- }
-
- /**
- * Get the name for the reduce output schema.
- * By default this is the name of the class with "Output" appended.
- *
- * @return output schema name
- */
- protected String getOutputSchemaName()
- {
- return this.getClass().getSimpleName() + "Output";
- }
-
- /**
- * Get the namespace for the reduce output schema.
- * By default this is the package of the class.
- *
- * @return output schema namespace
- */
- protected String getOutputSchemaNamespace()
- {
- return this.getClass().getPackage().getName();
- }
-
- protected ObjectMapper getMapProcessor()
- {
- return _mapper;
- }
-
- protected ObjectReducer getCombineProcessor()
- {
- return _combiner;
- }
-
- protected ObjectReducer getReduceProcessor()
- {
- return _reducer;
- }
-
- /**
- * Execute the job.
- *
- * @throws IOException
- * @throws InterruptedException
- * @throws ClassNotFoundException
- */
- private void execute() throws IOException, InterruptedException, ClassNotFoundException
- {
- int iterations = 0;
-
- while(true)
- {
- PartitionPreservingExecutionPlanner planner = new PartitionPreservingExecutionPlanner(getFileSystem(),getProperties());
- planner.setInputPaths(getInputPaths());
- planner.setOutputPath(getOutputPath());
- planner.setStartDate(getStartDate());
- planner.setEndDate(getEndDate());
- planner.setDaysAgo(getDaysAgo());
- planner.setNumDays(getNumDays());
- planner.setMaxToProcess(getMaxToProcess());
- planner.setFailOnMissing(isFailOnMissing());
- planner.createPlan();
-
- if (planner.getInputsToProcess().size() == 0)
- {
- _log.info("Found all necessary incremental data");
- break;
- }
-
- if (iterations >= getMaxIterations())
- {
- throw new RuntimeException(String.format("Already completed %d iterations but the max is %d and there are still %d inputs to process",
- iterations,
- getMaxIterations(),
- planner.getInputsToProcess().size()));
- }
-
- Path jobTempPath = createRandomTempPath();
- _garbage.add(jobTempPath);
- ensurePath(getOutputPath());
-
- Path incrementalStagingPath = ensurePath(new Path(jobTempPath,".incremental-staging"));
- Path incrementalStagingTmpPath = ensurePath(new Path(jobTempPath,".incremental-staging-tmp"));
-
- Report report = new Report();
-
- // create input paths for job
- List<String> inputPaths = new ArrayList<String>();
- for (DatePath input : planner.getInputsToProcess())
- {
- inputPaths.add(input.getPath().toString());
- report.inputFiles.add(input);
- }
-
- _log.info("Staging path: " + incrementalStagingPath);
- final StagedOutputJob job = StagedOutputJob.createStagedJob(
- getConf(),
- getName() + "-" + "incremental",
- inputPaths,
- incrementalStagingTmpPath.toString(),
- incrementalStagingPath.toString(),
- _log);
-
- job.setCountersParentPath(getCountersParentPath());
-
- final Configuration conf = job.getConfiguration();
-
- config(conf);
-
- PartitionPreservingSchemas fpSchemas = new PartitionPreservingSchemas(getSchemas(), planner.getInputSchemasByPath(), getOutputSchemaName(), getOutputSchemaNamespace() );
-
- job.setInputFormatClass(AvroMultipleInputsKeyInputFormat.class);
-
- job.setOutputFormatClass(AvroKeyOutputFormat.class);
-
- _log.info("Setting input path to schema mappings");
- for (String path : fpSchemas.getMapInputSchemas().keySet())
- {
- Schema schema = fpSchemas.getMapInputSchemas().get(path);
- _log.info("*** " + path);
- _log.info("*** => " + schema.toString());
- AvroMultipleInputsUtil.setInputKeySchemaForPath(job, schema, path);
- }
-
- AvroJob.setMapOutputKeySchema(job, fpSchemas.getMapOutputKeySchema());
- AvroJob.setMapOutputValueSchema(job, fpSchemas.getMapOutputValueSchema());
- AvroJob.setOutputKeySchema(job, fpSchemas.getReduceOutputSchema());
-
- StringBuilder inputTimesJoined = new StringBuilder();
- for (Date input : planner.getDatesToProcess())
- {
- String namedOutput = PathUtils.datedPathFormat.format(input);
- _log.info(String.format("Adding named output %s",namedOutput));
- AvroMultipleOutputs.addNamedOutput(job,
- namedOutput,
- AvroKeyOutputFormat.class,
- fpSchemas.getReduceOutputSchema());
-
- inputTimesJoined.append(Long.toString(input.getTime()));
- inputTimesJoined.append(",");
- }
-
- int numReducers;
-
- if (getNumReducers() != null)
- {
- numReducers = getNumReducers();
- _log.info(String.format("Using %d reducers (fixed)",numReducers));
- }
- else
- {
- numReducers = planner.getNumReducers();
- _log.info(String.format("Using %d reducers (computed)",numReducers));
- }
-
- int avgReducersPerInput = (int)Math.ceil(numReducers/(double)planner.getDatesToProcess().size());
-
- _log.info(String.format("Reducers per input path: %d", avgReducersPerInput));
-
- // counters for multiple outputs
-// conf.set("mo.counters", "true");
-
- conf.set(TimePartitioner.REDUCERS_PER_INPUT, Integer.toString(avgReducersPerInput));
- conf.set(TimePartitioner.INPUT_TIMES, inputTimesJoined.substring(0,inputTimesJoined.length()-1));
-
- job.setNumReduceTasks(numReducers);
-
- Path mapperPath = new Path(incrementalStagingPath,".mapper_impl");
- Path reducerPath = new Path(incrementalStagingPath,".reducer_impl");
- Path combinerPath = new Path(incrementalStagingPath,".combiner_impl");
-
- conf.set(Parameters.REDUCER_IMPL_PATH, reducerPath.toString());
- conf.set(Parameters.MAPPER_IMPL_PATH, mapperPath.toString());
-
- _mapper = new PartitioningMapper();
- _mapper.setSchemas(fpSchemas);
- _mapper.setMapper(getMapper());
-
- _reducer = new PartitioningReducer();
- _reducer.setSchemas(fpSchemas);
- _reducer.setAccumulator(getReducerAccumulator());
-
- DistributedCacheHelper.writeObject(conf, getMapProcessor(), mapperPath);
- DistributedCacheHelper.writeObject(conf, getReduceProcessor(), reducerPath);
-
- job.setMapperClass(DelegatingMapper.class);
- job.setReducerClass(DelegatingReducer.class);
-
- if (isUseCombiner())
- {
- _combiner = new PartitioningCombiner();
- _combiner.setAccumulator(getCombinerAccumulator());
- conf.set(Parameters.COMBINER_IMPL_PATH, combinerPath.toString());
- job.setCombinerClass(DelegatingCombiner.class);
- DistributedCacheHelper.writeObject(conf, getCombineProcessor(), combinerPath);
- }
-
- job.setPartitionerClass(TimePartitioner.class);
-
- if (!job.waitForCompletion(true))
- {
- _log.error("Job failed! Quitting...");
- throw new RuntimeException("Job failed");
- }
-
- report.jobName = job.getJobName();
- report.jobId = job.getJobID().toString();
-
- moveStagedFiles(report,incrementalStagingPath);
-
- if (getCountersParentPath() == null && job.getCountersPath() != null)
- {
- // save the counters in the target path, for lack of a better place to put it
- Path counters = job.getCountersPath();
- if (getFileSystem().exists(counters))
- {
- Path target = new Path(getOutputPath(),counters.getName());
- if (getFileSystem().exists(target))
- {
- _log.info(String.format("Removing old counters at %s",target));
- getFileSystem().delete(target, true);
- }
- _log.info(String.format("Moving %s to %s",counters.getName(),getOutputPath()));
- getFileSystem().rename(counters, target);
-
- report.countersPath = target;
- }
- else
- {
- _log.error("Could not find counters at " + counters);
- }
- }
-
- applyRetention();
-
- _reports.add(report);
-
- if (!planner.getNeedsAnotherPass())
- {
- break;
- }
-
- cleanup();
-
- iterations++;
- }
- }
-
- /**
- * Remove all temporary paths.
- *
- * @throws IOException
- */
- private void cleanup() throws IOException
- {
- if (_garbage != null)
- {
- _garbage.clean();
- }
- }
-
- /**
- * Removes all but the more recent days from the ouput that are within the retention period, if one is specified.
- *
- * @throws IOException
- */
- private void applyRetention() throws IOException
- {
- if (getRetentionCount() != null)
- {
- PathUtils.keepLatestNestedDatedPaths(getFileSystem(), getOutputPath(), getRetentionCount());
- }
- }
-
- /**
- * Moves files from the staging path to the final output path.
- *
- * @param report report to update with output paths
- * @param sourcePath source of data to move
- * @throws IOException
- */
- private void moveStagedFiles(Report report, Path sourcePath) throws IOException
- {
- _log.info("Following files produced in staging path:");
- for (FileStatus stat : getFileSystem().globStatus(new Path(sourcePath,"*.avro")))
- {
- _log.info(String.format("* %s (%d bytes)",stat.getPath(),stat.getLen()));
- }
-
- FileStatus[] incrementalParts = getFileSystem().globStatus(new Path(sourcePath,"*"), new PathFilter() {
- @Override
- public boolean accept(Path path)
- {
- String[] pathParts = path.getName().split("-");
- try
- {
- Long.parseLong(pathParts[0]);
- return true;
- }
- catch (NumberFormatException e)
- {
- return false;
- }
- }
- });
-
- // collect the new incremental data from the temp folder and move to subfolders
- Map<String,Path> incrementalTargetPaths = new HashMap<String,Path>();
- for (FileStatus stat : incrementalParts)
- {
- String[] pathParts = stat.getPath().getName().split("-");
- try
- {
- String timestamp = pathParts[0];
-
- if (!incrementalTargetPaths.containsKey(timestamp))
- {
- Path parent = new Path(sourcePath,timestamp);
-
- if (!getFileSystem().exists(parent))
- {
- getFileSystem().mkdirs(parent);
- }
- else
- {
- throw new RuntimeException("already exists: " + parent.toString());
- }
-
- incrementalTargetPaths.put(timestamp,parent);
- }
-
- Path parent = incrementalTargetPaths.get(timestamp);
- _log.info(String.format("Moving %s to %s",stat.getPath().getName(),parent.toString()));
- getFileSystem().rename(stat.getPath(), new Path(parent,stat.getPath().getName()));
- }
- catch (NumberFormatException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- for (Path src : incrementalTargetPaths.values())
- {
- Date srcDate;
- try
- {
- srcDate = PathUtils.datedPathFormat.parse(src.getName());
- }
- catch (ParseException e)
- {
- throw new RuntimeException(e);
- }
- Path target = new Path(getOutputPath(),PathUtils.nestedDatedPathFormat.format(srcDate));
- _log.info(String.format("Moving %s to %s",src.getName(),target));
-
- getFileSystem().mkdirs(target.getParent());
-
- if (!getFileSystem().rename(src, target))
- {
- throw new RuntimeException("Failed to rename " + src + " to " + target);
- }
-
- report.outputFiles.add(new DatePath(srcDate,target));
- }
- }
-
- /**
- * Reports files created and processed for an iteration of the job.
- *
- * @author "Matthew Hayes"
- *
- */
- public static class Report
- {
- private String jobName;
- private String jobId;
- private Path countersPath;
- private List<DatePath> inputFiles = new ArrayList<DatePath>();
- private List<DatePath> outputFiles = new ArrayList<DatePath>();
-
- /**
- * Gets the job name.
- *
- * @return job name
- */
- public String getJobName()
- {
- return jobName;
- }
-
- /**
- * Gets the job ID.
- *
- * @return job ID
- */
- public String getJobId()
- {
- return jobId;
- }
-
- /**
- * Gets the path to the counters file, if one was written.
- *
- * @return counters path
- */
- public Path getCountersPath()
- {
- return countersPath;
- }
-
- /**
- * Gets input files that were processed. These are files that are within
- * the desired date range.
- *
- * @return new input files
- */
- public List<DatePath> getInputFiles()
- {
- return Collections.unmodifiableList(inputFiles);
- }
-
- /**
- * Gets the output files that were produced by the job.
- *
- * @return old input files
- */
- public List<DatePath> getOutputFiles()
- {
- return Collections.unmodifiableList(outputFiles);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/DateRangeConfigurable.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/DateRangeConfigurable.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/DateRangeConfigurable.java
deleted file mode 100644
index 5a9211b..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/DateRangeConfigurable.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import datafu.hourglass.fs.DateRange;
-
-/**
- * An interface for an object with a configurable output date range.
- *
- * @author "Matthew Hayes"
- *
- */
-public interface DateRangeConfigurable
-{
- /**
- * Sets the date range for the output.
- *
- * @param dateRange output date range
- */
- public void setOutputDateRange(DateRange dateRange);
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/DateRangePlanner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/DateRangePlanner.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/DateRangePlanner.java
deleted file mode 100644
index 651a27f..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/DateRangePlanner.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-
-import org.apache.log4j.Logger;
-
-import datafu.hourglass.fs.DateRange;
-import datafu.hourglass.fs.PathUtils;
-
-/**
- * Determines the date range of inputs which should be processed.
- *
- * @author "Matthew Hayes"
- *
- */
-public class DateRangePlanner
-{
- private final static Logger _log = Logger.getLogger(DateRangePlanner.class);
-
- /**
- * Determines the date range of inputs which should be processed.
- *
- * @param beginDateOverride Begin date
- * @param endDateOverride End date
- * @param available The input dates which are available
- * @param daysAgo Number of days to subtract from the end date
- * @param numDays Number of days to process
- * @param failOnMissing true to fail if days are missing in desired date range
- * @return desired date range for inputs to be processed
- */
- public static DateRange getDateRange(Date beginDateOverride,
- Date endDateOverride,
- Collection<Date> available,
- Integer daysAgo,
- Integer numDays,
- boolean failOnMissing)
- {
- Date beginDate = null;
- Date endDate = null;
-
- Calendar cal = Calendar.getInstance(PathUtils.timeZone);
-
- // determine the range of available input data
- Date beginAvailable = null;
- Date endAvailable = null;
- if (available != null && available.size() > 0)
- {
- ArrayList<Date> sortedAvailable = new ArrayList<Date>(available);
- Collections.sort(sortedAvailable);
- beginAvailable = sortedAvailable.get(0);
- endAvailable = sortedAvailable.get(sortedAvailable.size()-1);
- }
- else
- {
- throw new IllegalArgumentException("No data available");
- }
-
- if (endDateOverride != null && beginDateOverride != null)
- {
- beginDate = beginDateOverride;
- endDate = endDateOverride;
-
- _log.info(String.format("Specified begin date is %s",
- PathUtils.datedPathFormat.format(beginDate)));
- _log.info(String.format("Specified end date is %s",
- PathUtils.datedPathFormat.format(endDate)));
-
- if (daysAgo != null)
- {
- throw new IllegalArgumentException("Cannot specify days ago when begin and end date set");
- }
-
- if (numDays != null)
- {
- throw new IllegalArgumentException("Cannot specify num days when begin and end date set");
- }
- }
- else if (beginDateOverride != null)
- {
- beginDate = beginDateOverride;
-
- _log.info(String.format("Specified begin date is %s",
- PathUtils.datedPathFormat.format(beginDate)));
-
- if (numDays != null)
- {
- cal.setTime(beginDate);
- cal.add(Calendar.DAY_OF_MONTH, numDays-1);
- endDate = cal.getTime();
- _log.info(String.format("Num days is %d, giving end date of %s",
- numDays, PathUtils.datedPathFormat.format(endDate)));
- }
- }
- else if (endDateOverride != null)
- {
- endDate = endDateOverride;
-
- _log.info(String.format("Specified end date is %s",
- PathUtils.datedPathFormat.format(endDate)));
-
- if (numDays != null)
- {
- cal.setTime(endDate);
- cal.add(Calendar.DAY_OF_MONTH, -(numDays-1));
- beginDate = cal.getTime();
- _log.info(String.format("Num days is %d, giving begin date of %s",
- numDays, PathUtils.datedPathFormat.format(beginDate)));
- }
- }
-
- if (endDate == null)
- {
- endDate = endAvailable;
-
- _log.info(String.format("No end date specified, using date for latest available input: %s",
- PathUtils.datedPathFormat.format(endDate)));
-
- if (daysAgo != null)
- {
- cal.setTime(endDate);
- cal.add(Calendar.DAY_OF_MONTH, -daysAgo);
- endDate = cal.getTime();
- _log.info(String.format("However days ago is %d, giving end date of %s",
- daysAgo, PathUtils.datedPathFormat.format(endDate)));
- }
- }
-
- if (endAvailable.compareTo(endDate) < 0 && failOnMissing)
- {
- throw new IllegalArgumentException(String.format("Latest available date %s is less than desired end date %s",
- PathUtils.datedPathFormat.format(endAvailable),
- PathUtils.datedPathFormat.format(endDate)));
- }
-
- if (beginDate == null)
- {
- beginDate = beginAvailable;
-
- if (numDays != null)
- {
- cal.setTime(endDate);
- cal.add(Calendar.DAY_OF_MONTH, -(numDays-1));
- beginDate = cal.getTime();
- _log.info(String.format("Num days is %d, giving begin date of %s",
- numDays, PathUtils.datedPathFormat.format(beginDate)));
- }
- }
-
- if (beginAvailable.compareTo(beginDate) > 0 && failOnMissing)
- {
- throw new IllegalArgumentException(String.format("Desired begin date is %s but the next available date is %s",
- PathUtils.datedPathFormat.format(beginDate),
- PathUtils.datedPathFormat.format(beginAvailable)));
- }
-
- _log.info(String.format("Determined date range of inputs to consume is [%s,%s]",
- PathUtils.datedPathFormat.format(beginDate),
- PathUtils.datedPathFormat.format(endDate)));
-
- return new DateRange(beginDate, endDate);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/ExecutionPlanner.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/ExecutionPlanner.java
deleted file mode 100644
index e075770..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/ExecutionPlanner.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Properties;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-import datafu.hourglass.fs.DatePath;
-import datafu.hourglass.fs.DateRange;
-import datafu.hourglass.fs.PathUtils;
-
-/**
- * Base class for execution planners. An execution planner determines which files should be processed
- * for a particular run.
- *
- * @author "Matthew Hayes"
- *
- */
-public abstract class ExecutionPlanner
-{
- private final Logger _log = Logger.getLogger(ExecutionPlanner.class);
-
- private FileSystem _fileSystem;
- private Properties _props;
- private Date _startDate;
- private Date _endDate;
- private Integer _daysAgo;
- private Integer _numDays;
- private Integer _maxToProcess;
- private DateRange _range;
- private Path _outputPath;
- private boolean _failOnMissing;
- private List<Path> _inputPaths = new ArrayList<Path>();
- private List<SortedMap<Date,DatePath>> _inputPathsByDate;
- private Map<Date,List<DatePath>> _availableInputsByDate = new HashMap<Date,List<DatePath>>();
-
- /**
- * Initializes the execution planner.
- *
- * @param fs file system to use
- * @param props configuration properties
- */
- public ExecutionPlanner(FileSystem fs, Properties props)
- {
- _props = props;
- _fileSystem = fs;
- }
-
- /**
- * Gets the output path.
- *
- * @return output path
- */
- public Path getOutputPath()
- {
- return _outputPath;
- }
-
- /**
- * Gets the input paths.
- *
- * @return input paths
- */
- public List<Path> getInputPaths()
- {
- return _inputPaths;
- }
-
- /**
- * Sets the output path.
- *
- * @param outputPath output path
- */
- public void setOutputPath(Path outputPath)
- {
- this._outputPath = outputPath;
- }
-
- /**
- * Sets the input paths.
- *
- * @param inputPaths input paths
- */
- public void setInputPaths(List<Path> inputPaths)
- {
- this._inputPaths = inputPaths;
- }
-
- /**
- * Sets the start date.
- *
- * @param startDate start date
- */
- public void setStartDate(Date startDate)
- {
- this._startDate = startDate;
- }
-
- /**
- * Gets the start date
- *
- * @return start date
- */
- public Date getStartDate()
- {
- return _startDate;
- }
-
- /**
- * Sets the end date.
- *
- * @param endDate end date
- */
- public void setEndDate(Date endDate)
- {
- this._endDate = endDate;
- }
-
- /**
- * Gets the end date
- *
- * @return end date
- */
- public Date getEndDate()
- {
- return _endDate;
- }
-
- /**
- * Sets the number of days to subtract off the end date.
- *
- * @param daysAgo days ago
- */
- public void setDaysAgo(Integer daysAgo)
- {
- this._daysAgo = daysAgo;
- }
-
- /**
- * Gets the number of days to subtract off the end date.
- *
- * @return days ago
- */
- public Integer getDaysAgo()
- {
- return _daysAgo;
- }
-
- /**
- * Sets the number of days to process.
- *
- * @param numDays number of days to process
- */
- public void setNumDays(Integer numDays)
- {
- this._numDays = numDays;
- }
-
- /**
- * Gets the number of days to process.
- *
- * @return number of days to process
- */
- public Integer getNumDays()
- {
- return _numDays;
- }
-
- /**
- * Sets the maximum number of days to process at a time.
- *
- * @param maxToProcess maximum number of days
- */
- public void setMaxToProcess(Integer maxToProcess)
- {
- this._maxToProcess = maxToProcess;
- }
-
- /**
- * Gets the maximum number of days to process at a time.
- *
- * @return maximum number of days
- */
- public Integer getMaxToProcess()
- {
- return _maxToProcess;
- }
-
- /**
- * Gets whether the job should fail if data is missing within the desired date range.
- *
- * @return true if the job should fail on missing data
- */
- public boolean isFailOnMissing()
- {
- return _failOnMissing;
- }
-
- /**
- * Sets whether the job should fail if data is missing within the desired date range.
- *
- * @param failOnMissing true if the job should fail on missing data
- */
- public void setFailOnMissing(boolean failOnMissing)
- {
- this._failOnMissing = failOnMissing;
- }
-
- /**
- * Gets the desired input date range to process based on the configuration and available inputs.
- *
- * @return desired date range
- */
- public DateRange getDateRange()
- {
- return _range;
- }
-
- /**
- * Gets the file system.
- *
- * @return file system
- */
- protected FileSystem getFileSystem()
- {
- return _fileSystem;
- }
-
- /**
- * Gets the configuration properties.
- *
- * @return properties
- */
- protected Properties getProps()
- {
- return _props;
- }
-
- /**
- * Gets a map from date to available input data.
- *
- * @return map from date to available input data
- */
- protected Map<Date,List<DatePath>> getAvailableInputsByDate()
- {
- return _availableInputsByDate;
- }
-
- /**
- * Get a map from date to path for all paths matching yyyy/MM/dd under the given path.
- *
- * @param path path to search under
- * @return map of date to path
- * @throws IOException
- */
- protected SortedMap<Date,DatePath> getDailyData(Path path) throws IOException
- {
- SortedMap<Date,DatePath> data = new TreeMap<Date,DatePath>();
- for (DatePath datePath : PathUtils.findNestedDatedPaths(getFileSystem(),path))
- {
- data.put(datePath.getDate(),datePath);
- }
- return data;
- }
-
- /**
- * Get a map from date to path for all paths matching yyyyMMdd under the given path.
- *
- * @param path path to search under
- * @return map of date to path
- * @throws IOException
- */
- protected SortedMap<Date,DatePath> getDatedData(Path path) throws IOException
- {
- SortedMap<Date,DatePath> data = new TreeMap<Date,DatePath>();
- for (DatePath datePath : PathUtils.findDatedPaths(getFileSystem(),path))
- {
- data.put(datePath.getDate(),datePath);
- }
- return data;
- }
-
- /**
- * Determine what input data is available.
- *
- * @throws IOException
- */
- protected void loadInputData() throws IOException
- {
- _inputPathsByDate = new ArrayList<SortedMap<Date,DatePath>>();
- for (Path inputPath : getInputPaths())
- {
- _log.info(String.format("Searching for available input data in " + inputPath));
- _inputPathsByDate.add(getDailyData(inputPath));
- }
- }
-
- /**
- * Determines what input data is available.
- */
- protected void determineAvailableInputDates()
- {
- // first find the latest date available for all inputs
- PriorityQueue<Date> dates = new PriorityQueue<Date>();
- for (SortedMap<Date,DatePath> pathMap : _inputPathsByDate)
- {
- for (Date date : pathMap.keySet())
- {
- dates.add(date);
- }
- }
- if (dates.size() == 0)
- {
- throw new RuntimeException("No input data!");
- }
- List<Date> available = new ArrayList<Date>();
- Date currentDate = dates.peek();
- int found = 0;
- int needed = getInputPaths().size();
- while (currentDate != null)
- {
- Date date = dates.poll();
-
- if (date != null && date.equals(currentDate))
- {
- found++;
- }
- else
- {
- if (found == needed)
- {
- available.add(currentDate);
- }
- else if (available.size() > 0)
- {
- _log.info("Did not find all input data for date " + PathUtils.datedPathFormat.format(currentDate));
- _log.info("Paths found for " + PathUtils.datedPathFormat.format(currentDate) + ":");
- // collect what's available for this date
- for (SortedMap<Date,DatePath> pathMap : _inputPathsByDate)
- {
- DatePath path = pathMap.get(currentDate);
- if (path != null)
- {
- _log.info("=> " + path);
- }
- }
-
- if (_failOnMissing)
- {
- throw new RuntimeException("Did not find all input data for date " + PathUtils.datedPathFormat.format(currentDate));
- }
- else
- {
- available.add(currentDate);
- }
- }
-
- found = 0;
- currentDate = date;
-
- if (currentDate != null)
- {
- found++;
- }
- }
-
- if (found > needed)
- {
- throw new RuntimeException("found more than needed");
- }
- }
-
- _availableInputsByDate.clear();
-
- for (Date date : available)
- {
- List<DatePath> paths = new ArrayList<DatePath>();
- for (SortedMap<Date,DatePath> map : _inputPathsByDate)
- {
- DatePath path = map.get(date);
- if (path != null)
- {
- paths.add(path);
- }
- }
- _availableInputsByDate.put(date, paths);
- }
- }
-
- /**
- * Determine the date range for inputs to process based on the configuration and available inputs.
- */
- protected void determineDateRange()
- {
- _log.info("Determining range of input data to consume");
- _range = DateRangePlanner.getDateRange(getStartDate(), getEndDate(), getAvailableInputsByDate().keySet(), getDaysAgo(), getNumDays(), isFailOnMissing());
- if (_range.getBeginDate() == null || _range.getEndDate() == null)
- {
- throw new RuntimeException("Expected start and end date");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/FileCleaner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/FileCleaner.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/FileCleaner.java
deleted file mode 100644
index 1dfc24c..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/FileCleaner.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-/**
- * Used to remove files from the file system when they are no longer needed.
- *
- * @author "Matthew Hayes"
- *
- */
-public class FileCleaner
-{
- private final Logger log = Logger.getLogger(FileCleaner.class);
-
- private final Set<Path> garbage = new HashSet<Path>();
- private final FileSystem fs;
-
- public FileCleaner(FileSystem fs)
- {
- this.fs = fs;
- }
-
- /**
- * Add a path to be removed later.
- *
- * @param path
- * @return added path
- */
- public Path add(Path path)
- {
- garbage.add(path);
- return path;
- }
-
- /**
- * Add a path to be removed later.
- *
- * @param path
- * @return added path
- */
- public String add(String path)
- {
- garbage.add(new Path(path));
- return path;
- }
-
- /**
- * Removes added paths from the file system.
- *
- * @throws IOException
- */
- @SuppressWarnings("unchecked")
- public void clean() throws IOException
- {
- List<Path> sorted = new ArrayList<Path>(garbage);
- Collections.sort(sorted);
- for (Path p : sorted)
- {
- if (fs.exists(p))
- {
- log.info(String.format("Removing %s",p));
- fs.delete(p, true);
- }
- }
- garbage.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/IncrementalJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/IncrementalJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/IncrementalJob.java
deleted file mode 100644
index 0a25cba..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/IncrementalJob.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.util.Properties;
-
-import org.apache.avro.Schema;
-
-import datafu.hourglass.schemas.TaskSchemas;
-
-/**
- * Base class for incremental jobs. Incremental jobs consume day-partitioned input data.
- *
- * <p>
- * Implementations of this class must provide key, intermediate value, and output value schemas.
- * The key and intermediate value schemas define the output for the mapper and combiner.
- * The key and output value schemas define the output for the reducer.
- * </p>
- *
- * <p>
- * This class has the same configuration and methods as {@link TimeBasedJob}.
- * In addition it also recognizes the following properties:
- * </p>
- *
- * <ul>
- * <li><em>max.iterations</em> - maximum number of iterations for the job</li>
- * <li><em>max.days.to.process</em> - maximum number of days of input data to process in a single run</li>
- * <li><em>fail.on.missing</em> - whether the job should fail if input data within the desired range is missing</li>
- * </ul>
- *
- * @author "Matthew Hayes"
- *
- */
-public abstract class IncrementalJob extends TimeBasedJob
-{
- private Integer _maxToProcess;
- private Integer _maxIterations;
- private boolean _failOnMissing;
- private TaskSchemas _schemas;
-
- /**
- * Initializes the job.
- */
- public IncrementalJob()
- {
- }
-
- /**
- * Initializes the job with a job name and properties.
- *
- * @param name job name
- * @param props configuration properties
- */
- public IncrementalJob(String name, Properties props)
- {
- super(name,props);
- }
-
- public void setProperties(Properties props)
- {
- super.setProperties(props);
-
- if (getProperties().get("max.iterations") != null)
- {
- setMaxIterations(Integer.parseInt((String)getProperties().get("max.iterations")));
- }
-
- if (getProperties().get("max.days.to.process") != null)
- {
- setMaxToProcess(Integer.parseInt((String)getProperties().get("max.days.to.process")));
- }
-
- if (getProperties().get("fail.on.missing") != null)
- {
- setFailOnMissing(Boolean.parseBoolean((String)getProperties().get("max.days.to.process")));
- }
- }
-
- protected void initialize()
- {
- super.initialize();
-
- if (getKeySchema() == null)
- {
- throw new RuntimeException("Key schema not specified");
- }
-
- if (getIntermediateValueSchema() == null)
- {
- throw new RuntimeException("Intermediate schema not specified");
- }
-
- if (getOutputValueSchema() == null)
- {
- throw new RuntimeException("Output schema not specified");
- }
-
- _schemas = new TaskSchemas.Builder()
- .setKeySchema(getKeySchema())
- .setIntermediateValueSchema(getIntermediateValueSchema())
- .setOutputValueSchema(getOutputValueSchema())
- .build();
- }
-
- /**
- * Gets the Avro schema for the key.
- * <p>
- * This is also used as the key for the map output.
- *
- * @return key schema.
- */
- protected abstract Schema getKeySchema();
-
- /**
- * Gets the Avro schema for the intermediate value.
- * <p>
- * This is also used for the value for the map output.
- *
- * @return intermediate value schema
- */
- protected abstract Schema getIntermediateValueSchema();
-
- /**
- * Gets the Avro schema for the output data.
- *
- * @return output data schema
- */
- protected abstract Schema getOutputValueSchema();
-
- /**
- * Gets the schemas.
- *
- * @return schemas
- */
- protected TaskSchemas getSchemas()
- {
- return _schemas;
- }
-
- /**
- * Gets the maximum number of days of input data to process in a single run.
- *
- * @return maximum number of days to process
- */
- public Integer getMaxToProcess()
- {
- return _maxToProcess;
- }
-
- /**
- * Sets the maximum number of days of input data to process in a single run.
- *
- * @param maxToProcess maximum number of days to process
- */
- public void setMaxToProcess(Integer maxToProcess)
- {
- _maxToProcess = maxToProcess;
- }
-
- /**
- * Gets the maximum number of iterations for the job. Multiple iterations will only occur
- * when there is a maximum set for the number of days to process in a single run.
- * An error should be thrown if this number will be exceeded.
- *
- * @return maximum number of iterations
- */
- public Integer getMaxIterations()
- {
- return _maxIterations;
- }
-
- /**
- * Sets the maximum number of iterations for the job. Multiple iterations will only occur
- * when there is a maximum set for the number of days to process in a single run.
- * An error should be thrown if this number will be exceeded.
- *
- * @param maxIterations maximum number of iterations
- */
- public void setMaxIterations(Integer maxIterations)
- {
- _maxIterations = maxIterations;
- }
-
- /**
- * Gets whether the job should fail if input data within the desired range is missing.
- *
- * @return true if the job should fail on missing data
- */
- public boolean isFailOnMissing()
- {
- return _failOnMissing;
- }
-
- /**
- * Sets whether the job should fail if input data within the desired range is missing.
- *
- * @param failOnMissing true if the job should fail on missing data
- */
- public void setFailOnMissing(boolean failOnMissing)
- {
- _failOnMissing = failOnMissing;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/MaxInputDataExceededException.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/MaxInputDataExceededException.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/MaxInputDataExceededException.java
deleted file mode 100644
index f991752..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/MaxInputDataExceededException.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package datafu.hourglass.jobs;
-
-public class MaxInputDataExceededException extends Throwable
-{
- public MaxInputDataExceededException()
- {
- }
-
- public MaxInputDataExceededException(String message)
- {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java
deleted file mode 100644
index 35e9294..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionCollapsingExecutionPlanner.java
+++ /dev/null
@@ -1,555 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.SortedMap;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-import datafu.hourglass.avro.AvroDateRangeMetadata;
-import datafu.hourglass.fs.DatePath;
-import datafu.hourglass.fs.DateRange;
-import datafu.hourglass.fs.PathUtils;
-
-/**
- * Execution planner used by {@link AbstractPartitionCollapsingIncrementalJob} and its derived classes.
- * This creates a plan to process partitioned input data and collapse the partitions into a single output.
- *
- * <p>
- * To use this class, the input and output paths must be specified. In addition the desired input date
- * range can be specified through several methods. Then {@link #createPlan()} can be called and the
- * execution plan will be created. The inputs to process will be available from {@link #getInputsToProcess()},
- * the number of reducers to use will be available from {@link #getNumReducers()}, and the input schemas
- * will be available from {@link #getInputSchemas()}.
- * </p>
- *
- * <p>
- * Previous output may be reused by using {@link #setReusePreviousOutput(boolean)}. If previous output exists
- * and it is to be reused then it will be available from {@link #getPreviousOutputToProcess()}. New input data
- * to process that is after the previous output time range is available from {@link #getNewInputsToProcess()}.
- * Old input data to process that is before the previous output time range and should be subtracted from the
- * previous output is available from {@link #getOldInputsToProcess()}.
- * </p>
- *
- * <p>
- * Configuration properties are used to configure a {@link ReduceEstimator} instance. This is used to
- * calculate how many reducers should be used.
- * The number of reducers to use is based on the input data size and the
- * <em>num.reducers.bytes.per.reducer</em> property. This setting can be controlled more granularly
- * through <em>num.reducers.input.bytes.per.reducer</em> and <em>num.reducers.previous.bytes.per.reducer</em>.
- * Check {@link ReduceEstimator} for more details on how the properties are used.
- * </p>
- *
- * @author "Matthew Hayes"
- *
- */
-public class PartitionCollapsingExecutionPlanner extends ExecutionPlanner
-{
- private final Logger _log = Logger.getLogger(PartitionCollapsingExecutionPlanner.class);
-
- private SortedMap<Date,DatePath> _outputPathsByDate;
- private boolean _reusePreviousOutput;
-
- // the chosen execution plan
- private Plan _plan;
-
- /**
- * An execution plan. Encapsulates what inputs will be processed.
- *
- * @author mhayes
- *
- */
- private class Plan
- {
- private List<DatePath> _inputsToProcess = new ArrayList<DatePath>();
- private List<DatePath> _newInputsToProcess = new ArrayList<DatePath>();
- private List<DatePath> _oldInputsToProcess = new ArrayList<DatePath>();
- private Map<String,String> _latestInputByPath = new HashMap<String,String>();
- private DatePath _previousOutputToProcess;
- private List<Schema> _inputSchemas = new ArrayList<Schema>();
- private Map<String,Schema> _inputSchemasByPath = new HashMap<String,Schema>();
- private boolean _needAnotherPass;
- private DateRange _currentDateRange;
- private int _numReducers;
- private Long _totalBytes;
-
- public void finalizePlan() throws IOException
- {
- determineInputSchemas();
- determineNumReducers();
- determineTotalBytes();
- }
-
- /**
- * Determines the number of bytes that will be consumed by this execution plan.
- * This is used to compare alternative plans so the one with the least bytes
- * consumed can be used.
- *
- * @throws IOException
- */
- private void determineTotalBytes() throws IOException
- {
- _totalBytes = 0L;
- for (DatePath dp : _inputsToProcess)
- {
- _totalBytes += PathUtils.countBytes(getFileSystem(), dp.getPath());
- }
- if (_previousOutputToProcess != null)
- {
- _totalBytes += PathUtils.countBytes(getFileSystem(), _previousOutputToProcess.getPath());
- }
- _log.info("Total bytes consumed: " + _totalBytes);
- }
-
- /**
- * Determines the input schemas. There may be multiple input schemas because multiple inputs are allowed.
- * The latest available inputs are used to determine the schema, the assumption being that schemas are
- * backwards-compatible.
- *
- * @throws IOException
- */
- private void determineInputSchemas() throws IOException
- {
- if (_latestInputByPath.size() > 0)
- {
- _log.info("Determining input schemas");
- for (Entry<String,String> entry : _latestInputByPath.entrySet())
- {
- String root = entry.getKey();
- String input = entry.getValue();
- _log.info("Loading schema for " + input);
- Schema schema = PathUtils.getSchemaFromPath(getFileSystem(),new Path(input));
- _inputSchemas.add(schema);
- _inputSchemasByPath.put(root, schema);
- }
- }
- }
-
- /**
- * Determines the number of reducers to use based on the input data size and the previous output,
- * if it exists and is being reused.
- * The number of reducers to use is based on the input data size and the
- * <em>num.reducers.bytes.per.reducer</em> property. This setting can be controlled more granularly
- * through <em>num.reducers.input.bytes.per.reducer</em> and <em>num.reducers.previous.bytes.per.reducer</em>.
- * See {@link ReduceEstimator} for details on reducer estimation.
- *
- * @throws IOException
- */
- private void determineNumReducers() throws IOException
- {
- ReduceEstimator estimator = new ReduceEstimator(getFileSystem(),getProps());
- List<String> inputPaths = new ArrayList<String>();
- for (DatePath input : _inputsToProcess)
- {
- inputPaths.add(input.getPath().toString());
- estimator.addInputPath("input",input.getPath());
- }
- if (_previousOutputToProcess != null)
- {
- estimator.addInputPath("previous",_previousOutputToProcess.getPath());
- }
- _numReducers = estimator.getNumReducers();
- }
- }
-
- /**
- * Initializes the execution planner.
- *
- * @param fs file system
- * @param props configuration properties
- */
- public PartitionCollapsingExecutionPlanner(FileSystem fs, Properties props)
- {
- super(fs, props);
- }
-
- /**
- * Create the execution plan.
- *
- * @throws IOException
- */
- public void createPlan() throws IOException
- {
- if (_plan != null) throw new RuntimeException("Plan already exists");
-
- _log.info("Creating execution plan");
-
- loadInputData();
- loadOutputData();
- determineAvailableInputDates();
- determineDateRange();
-
- List<Plan> plans = new ArrayList<Plan>();
- Plan plan;
-
- if (_reusePreviousOutput)
- {
- _log.info("Output may be reused, will create alternative plan that does not reuse output");
- plan = new Plan();
- try
- {
- determineInputsToProcess(false,plan);
- plan.finalizePlan();
- plans.add(plan);
- }
- catch (MaxInputDataExceededException e)
- {
- _log.info(e.getMessage());
- }
- }
-
- _log.info(String.format("Creating plan that %s previous output",(_reusePreviousOutput ? "reuses" : "does not reuse")));
- plan = new Plan();
- try
- {
- determineInputsToProcess(_reusePreviousOutput,plan);
- }
- catch (MaxInputDataExceededException e)
- {
- throw new RuntimeException(e);
- }
- plan.finalizePlan();
- plans.add(plan);
-
- if (plans.size() > 1)
- {
- _log.info(String.format("There are %d alternative execution plans:",plans.size()));
-
- for (Plan option : plans)
- {
- _log.info(String.format("* Consume %d new inputs, %d old inputs, %s previous output (%d bytes)",
- option._newInputsToProcess.size(),
- option._oldInputsToProcess.size(),
- option._previousOutputToProcess != null ? "reuse" : "no",
- option._totalBytes));
- }
-
- // choose plan with least bytes consumed
- Collections.sort(plans, new Comparator<Plan>() {
- @Override
- public int compare(Plan o1, Plan o2)
- {
- return o1._totalBytes.compareTo(o2._totalBytes);
- }
- });
- _plan = plans.get(0);
-
- _log.info(String.format("Choosing plan consuming %d bytes",_plan._totalBytes));
- }
- else
- {
- _plan = plans.get(0);
- }
- }
-
- /**
- * Gets whether previous output should be reused, if it exists.
- *
- * @return true if previous output should be reused
- */
- public boolean getReusePreviousOutput()
- {
- return _reusePreviousOutput;
- }
-
- /**
- * Sets whether previous output should be reused, if it exists.
- *
- * @param reuse true if previous output should be reused
- */
- public void setReusePreviousOutput(boolean reuse)
- {
- _reusePreviousOutput = reuse;
- }
-
- /**
- * Get the number of reducers to use based on the input and previous output data size.
- * Must call {@link #createPlan()} first.
- *
- * @return number of reducers to use
- */
- public int getNumReducers()
- {
- checkPlanExists();
- return getPlan()._numReducers;
- }
-
- public DateRange getCurrentDateRange()
- {
- checkPlanExists();
- return getPlan()._currentDateRange;
- }
-
- /**
- * Gets the previous output to reuse, or null if no output is being reused.
- * Must call {@link #createPlan()} first.
- *
- * @return previous output to reuse, or null
- */
- public DatePath getPreviousOutputToProcess()
- {
- return getPlan()._previousOutputToProcess;
- }
-
- /**
- * Gets all inputs that will be processed. This includes both old and new data.
- * Must call {@link #createPlan()} first.
- *
- * @return inputs to process
- */
- public List<DatePath> getInputsToProcess()
- {
- return getPlan()._inputsToProcess;
- }
-
- /**
- * Gets only the new data that will be processed. New data is data that falls within the
- * desired date range.
- * Must call {@link #createPlan()} first.
- *
- * @return new inputs to process
- */
- public List<DatePath> getNewInputsToProcess()
- {
- return getPlan()._newInputsToProcess;
- }
-
- /**
- * Gets only the old data that will be processed. Old data is data that falls before the
- * desired date range. It will be subtracted out from the previous output.
- * Must call {@link #createPlan()} first.
- *
- * @return old inputs to process
- */
- public List<DatePath> getOldInputsToProcess()
- {
- return getPlan()._oldInputsToProcess;
- }
-
- /**
- * Gets whether another pass will be required. Because there may be a limit on the number of inputs processed
- * in a single run, multiple runs may be required to process all data in the desired date range.
- * Must call {@link #createPlan()} first.
- *
- * @return true if another pass is required
- */
- public boolean getNeedsAnotherPass()
- {
- return getPlan()._needAnotherPass;
- }
-
- /**
- * Gets the input schemas. Because multiple inputs are allowed, there may be multiple schemas.
- * Must call {@link #createPlan()} first.
- *
- * <p>
- * This does not include the output schema, even though previous output may be fed back as input.
- * The reason is that the ouput schema it determined based on the input schema.
- * </p>
- *
- * @return input schemas
- */
- public List<Schema> getInputSchemas()
- {
- return getPlan()._inputSchemas;
- }
-
- /**
- * Gets a map from input path to schema. Because multiple inputs are allowed, there may be multiple schemas.
- * Must call {@link #createPlan()} first.
- *
- * @return map from path to input schema
- */
- public Map<String,Schema> getInputSchemasByPath()
- {
- return getPlan()._inputSchemasByPath;
- }
-
- /**
- * Determines what output data already exists. Previous output may be reused.
- *
- * @throws IOException
- */
- private void loadOutputData() throws IOException
- {
- if (getOutputPath() == null)
- {
- throw new RuntimeException("No output path specified");
- }
- _log.info(String.format("Searching for existing output data in " + getOutputPath()));
- _outputPathsByDate = getDatedData(getOutputPath());
- _log.info(String.format("Found %d output paths",_outputPathsByDate.size()));
- }
-
- /**
- * Determines what input data to process.
- *
- * <p>
- * The input data to consume is determined by the desired date range. If previous output is not reused then the input data to process
- * will coincide with the date range. If previous output may be reused and previous output exists, then the input data to process
- * will consist of new data and potentially old data. The new input data to process is data that has time after the previous output date range,
- * so that it may be added to the previous output.
- * The old data to process is data that has time before the previous output date range, so that it may be subtracted from the previous output.
- * </p>
- *
- * <p>
- * If there is a limit on how many days of input data can be processed then it may be the case that not all input data will be processed in
- * a single run.
- * </p>
- *
- * @throws IOException
- * @throws MaxInputDataExceededException
- */
- private void determineInputsToProcess(boolean reusePreviousOutput, Plan plan) throws IOException, MaxInputDataExceededException
- {
- Calendar cal = Calendar.getInstance(PathUtils.timeZone);
-
- DateRange outputDateRange = null;
-
- if (reusePreviousOutput)
- {
- if (_outputPathsByDate.size() > 0)
- {
- DatePath latestPriorOutput = _outputPathsByDate.get(Collections.max(_outputPathsByDate.keySet()));
- _log.info("Have previous output, determining what previous incremental data to difference out");
- outputDateRange = AvroDateRangeMetadata.getOutputFileDateRange(getFileSystem(),latestPriorOutput.getPath());
- _log.info(String.format("Previous output has date range %s to %s",
- PathUtils.datedPathFormat.format(outputDateRange.getBeginDate()),
- PathUtils.datedPathFormat.format(outputDateRange.getEndDate())));
-
- for (Date currentDate=outputDateRange.getBeginDate();
- currentDate.compareTo(getDateRange().getBeginDate()) < 0
- && currentDate.compareTo(outputDateRange.getEndDate()) <= 0;)
- {
- if (!getAvailableInputsByDate().containsKey(currentDate))
- {
- throw new RuntimeException(String.format("Missing incremental data for %s, so can't remove it from previous output",PathUtils.datedPathFormat.format(currentDate)));
- }
-
- List<DatePath> inputs = getAvailableInputsByDate().get(currentDate);
-
- for (DatePath input : inputs)
- {
- _log.info(String.format("Old Input: %s",input.getPath()));
- plan._inputsToProcess.add(input);
- plan._oldInputsToProcess.add(input);
-
- Path root = PathUtils.getNestedPathRoot(input.getPath());
- plan._latestInputByPath.put(root.toString(), input.getPath().toString());
- }
-
- cal.setTime(currentDate);
- cal.add(Calendar.DAY_OF_MONTH, 1);
- currentDate = cal.getTime();
- }
-
- plan._previousOutputToProcess = latestPriorOutput;
- _log.info("Previous Output: " + plan._previousOutputToProcess.getPath());
- }
- else
- {
- _log.info("No previous output to reuse");
- }
- }
-
- // consume the incremental data and produce the final output
-
- int newDataCount = 0;
- Date startDate = getDateRange().getBeginDate();
- Date endDate = startDate;
- for (Date currentDate=startDate; currentDate.compareTo(getDateRange().getEndDate()) <= 0; )
- {
- if (getMaxToProcess() != null && newDataCount >= getMaxToProcess())
- {
- if (!reusePreviousOutput)
- {
- throw new MaxInputDataExceededException(String.format("Amount of input data has exceeded max of %d however output is not being reused so cannot do in multiple passes", getMaxToProcess()));
- }
-
- // too much data to process in a single run, will require another pass
- plan._needAnotherPass = true;
- break;
- }
-
- if (outputDateRange == null || currentDate.compareTo(outputDateRange.getEndDate()) > 0)
- {
- if (!getAvailableInputsByDate().containsKey(currentDate))
- {
- if (isFailOnMissing())
- {
- throw new RuntimeException("missing " + PathUtils.datedPathFormat.format(currentDate));
- }
- else
- {
- _log.info("No input data found for " + PathUtils.datedPathFormat.format(currentDate));
- }
- }
- else
- {
- List<DatePath> inputs = getAvailableInputsByDate().get(currentDate);
-
- for (DatePath input : inputs)
- {
- _log.info(String.format("New Input: %s",input.getPath()));
- plan._inputsToProcess.add(input);
- plan._newInputsToProcess.add(input);
-
- Path root = PathUtils.getNestedPathRoot(input.getPath());
- plan._latestInputByPath.put(root.toString(), input.getPath().toString());
- }
-
- newDataCount++;
- }
- }
-
- cal.setTime(currentDate);
- endDate = cal.getTime();
- cal.add(Calendar.DAY_OF_MONTH, 1);
- currentDate = cal.getTime();
- }
-
- plan._currentDateRange = new DateRange(startDate,endDate);
- }
-
- /**
- * Throws an exception if the plan hasn't been created.
- */
- private void checkPlanExists()
- {
- if (_plan == null) throw new RuntimeException("Must call createPlan first");
- }
-
- private Plan getPlan()
- {
- checkPlanExists();
- return _plan;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionCollapsingIncrementalJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionCollapsingIncrementalJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionCollapsingIncrementalJob.java
deleted file mode 100644
index f92b76b..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionCollapsingIncrementalJob.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.model.Mapper;
-import datafu.hourglass.model.Merger;
-
-/**
- * A concrete version of {@link AbstractPartitionCollapsingIncrementalJob}.
- *
- * This provides an alternative to extending {@link AbstractPartitionCollapsingIncrementalJob}.
- * Instead of extending this class and implementing the abstract methods, this concrete version
- * can be used instead. Getters and setters have been provided for the abstract methods.
- *
- * @author "Matthew Hayes"
- *
- */
-public class PartitionCollapsingIncrementalJob extends AbstractPartitionCollapsingIncrementalJob
-{
- private Mapper<GenericRecord,GenericRecord,GenericRecord> _mapper;
- private Accumulator<GenericRecord,GenericRecord> _combiner;
- private Accumulator<GenericRecord,GenericRecord> _reducer;
- private Schema _keySchema;
- private Schema _intermediateValueSchema;
- private Schema _outputValueSchema;
- private Merger<GenericRecord> _merger;
- private Merger<GenericRecord> _oldMerger;
- private Setup _setup;
-
- /**
- * Initializes the job. The job name is derived from the name of a provided class.
- *
- * @param cls class to base job name on
- * @throws IOException
- */
- public PartitionCollapsingIncrementalJob(@SuppressWarnings("rawtypes") Class cls) throws IOException
- {
- setName(cls.getName());
- }
-
- @Override
- public Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper()
- {
- return _mapper;
- }
-
- @Override
- public Accumulator<GenericRecord,GenericRecord> getCombinerAccumulator()
- {
- return _combiner;
- }
-
- @Override
- public Accumulator<GenericRecord,GenericRecord> getReducerAccumulator()
- {
- return _reducer;
- }
-
- @Override
- protected Schema getKeySchema()
- {
- return _keySchema;
- }
-
- @Override
- protected Schema getIntermediateValueSchema()
- {
- return _intermediateValueSchema;
- }
-
- @Override
- protected Schema getOutputValueSchema()
- {
- return _outputValueSchema;
- }
-
- @Override
- public Merger<GenericRecord> getRecordMerger()
- {
- return _merger;
- }
-
- @Override
- public Merger<GenericRecord> getOldRecordMerger()
- {
- return _oldMerger;
- }
-
- /**
- * Set the mapper.
- *
- * @param mapper
- */
- public void setMapper(Mapper<GenericRecord,GenericRecord,GenericRecord> mapper)
- {
- this._mapper = mapper;
- }
-
- /**
- * Set the accumulator for the combiner
- *
- * @param combiner accumulator for the combiner
- */
- public void setCombinerAccumulator(Accumulator<GenericRecord,GenericRecord> combiner)
- {
- this._combiner = combiner;
- }
-
- /**
- * Set the accumulator for the reducer.
- *
- * @param reducer accumulator for the reducer
- */
- public void setReducerAccumulator(Accumulator<GenericRecord,GenericRecord> reducer)
- {
- this._reducer = reducer;
- }
-
- /**
- * Sets the Avro schema for the key.
- * <p>
- * This is also used as the key for the map output.
- *
- * @param keySchema key schema
- */
- public void setKeySchema(Schema keySchema)
- {
- this._keySchema = keySchema;
- }
-
- /**
- * Sets the Avro schema for the intermediate value.
- * <p>
- * This is also used for the value for the map output.
- *
- * @param intermediateValueSchema intermediate value schema
- */
- public void setIntermediateValueSchema(Schema intermediateValueSchema)
- {
- this._intermediateValueSchema = intermediateValueSchema;
- }
-
- /**
- * Sets the Avro schema for the output data.
- *
- * @param outputValueSchema output value schema
- */
- public void setOutputValueSchema(Schema outputValueSchema)
- {
- this._outputValueSchema = outputValueSchema;
- }
-
- /**
- * Sets the record merger that is capable of merging previous output with a new partial output.
- * This is only needed when reusing previous output where the intermediate and output schemas are different.
- * New partial output is produced by the reducer from new input that is after the previous output.
- *
- * @param merger
- */
- public void setMerger(Merger<GenericRecord> merger)
- {
- this._merger = merger;
- }
-
- /**
- * Sets the record merger that is capable of unmerging old partial output from the new output.
- * This is only needed when reusing previous output for a fixed-length sliding window.
- * The new output is the result of merging the previous output with the new partial output.
- * The old partial output is produced by the reducer from old input data before the time range of
- * the previous output.
- *
- * @param oldMerger merger
- */
- public void setOldMerger(Merger<GenericRecord> oldMerger)
- {
- this._oldMerger = oldMerger;
- }
-
- /**
- * Set callback to provide custom configuration before job begins execution.
- *
- * @param setup object with callback method
- */
- public void setOnSetup(Setup setup)
- {
- _setup = setup;
- }
-
- @Override
- public void config(Configuration conf)
- {
- super.config(conf);
- if (_setup != null)
- {
- _setup.setup(conf);
- }
- }
-}