You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ja...@apache.org on 2014/05/18 21:59:51 UTC
[13/14] DATAFU-44: Migrate Hourglass to Gradle
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsKeyInputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsKeyInputFormat.java b/contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsKeyInputFormat.java
deleted file mode 100644
index 2686c32..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsKeyInputFormat.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroKeyRecordReader;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- * A MapReduce InputFormat that can handle Avro container files and multiple inputs.
- * The input schema is determine based on the split. The mapping from input path
- * to schema is stored in the job configuration.
- *
- * <p>Keys are AvroKey wrapper objects that contain the Avro data. Since Avro
- * container files store only records (not key/value pairs), the value from
- * this InputFormat is a NullWritable.</p>
- */
-public class AvroMultipleInputsKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable>
-{
- /** {@inheritDoc} */
- @Override
- public RecordReader<AvroKey<T>, NullWritable> createRecordReader(
- InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException
- {
- Schema readerSchema = AvroMultipleInputsUtil.getInputKeySchemaForSplit(context.getConfiguration(), split);
- if (readerSchema == null)
- {
- throw new RuntimeException("Could not determine input schema");
- }
- return new AvroKeyRecordReader<T>(readerSchema);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsUtil.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsUtil.java b/contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsUtil.java
deleted file mode 100644
index b15041a..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/avro/AvroMultipleInputsUtil.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.avro;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.log4j.Logger;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-/**
- * Helper methods for dealing with multiple Avro input schemas. A mapping is stored in the configuration
- * that maps each input path to its corresponding schema. Methods in this class help with loading and
- * storing these schema mappings.
- *
- * @author "Matthew Hayes"
- *
- */
-public class AvroMultipleInputsUtil
-{
- private static final Logger _log = Logger.getLogger(AvroMultipleInputsUtil.class);
-
- private static final String CONF_INPUT_KEY_SCHEMA = "avro.schema.multiple.inputs.keys";
-
- /**
- * Gets the schema for a particular input split.
- *
- * @param conf configuration to get schema from
- * @param split input split to get schema for
- * @return schema
- */
- public static Schema getInputKeySchemaForSplit(Configuration conf, InputSplit split)
- {
- String path = ((FileSplit)split).getPath().toString();
- _log.info("Determining schema for input path " + path);
- JSONObject schemas;
- try
- {
- schemas = getInputSchemas(conf);
- }
- catch (JSONException e1)
- {
- throw new RuntimeException(e1);
- }
- Schema schema = null;
- if (schemas != null)
- {
- for (String key : JSONObject.getNames(schemas))
- {
- _log.info("Checking against " + key);
- if (path.startsWith(key))
- {
- try
- {
- schema = new Schema.Parser().parse(schemas.getString(key));
- _log.info("Input schema found: " + schema.toString(true));
- break;
- }
- catch (JSONException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
- }
- if (schema == null)
- {
- _log.info("Could not determine input schema");
- }
- return schema;
- }
-
- /**
- * Sets the job input key schema for a path.
- *
- * @param job The job to configure.
- * @param schema The input key schema.
- * @param path the path to set the schema for
- */
- public static void setInputKeySchemaForPath(Job job, Schema schema, String path)
- {
- JSONObject schemas;
- try
- {
- schemas = getInputSchemas(job.getConfiguration());
- schemas.put(path, schema.toString());
- }
- catch (JSONException e)
- {
- throw new RuntimeException(e);
- }
- job.getConfiguration().set(CONF_INPUT_KEY_SCHEMA, schemas.toString());
- }
-
- /**
- * Get a mapping from path to input schema.
- *
- * @param conf
- * @return mapping from path to input schem
- * @throws JSONException
- */
- private static JSONObject getInputSchemas(Configuration conf) throws JSONException
- {
- JSONObject schemas;
-
- String schemasJson = conf.get(CONF_INPUT_KEY_SCHEMA);
-
- if (schemasJson == null)
- {
- schemas = new JSONObject();
- }
- else
- {
- schemas = new JSONObject(schemasJson);
- }
-
- return schemas;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/avro/CombinedAvroKeyInputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/avro/CombinedAvroKeyInputFormat.java b/contrib/hourglass/src/java/datafu/hourglass/avro/CombinedAvroKeyInputFormat.java
deleted file mode 100644
index d2b171d..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/avro/CombinedAvroKeyInputFormat.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyRecordReader;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.log4j.Logger;
-
-import datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob;
-
-/**
- * A combined input format for reading Avro data.
- *
- * @author "Matthew Hayes"
- *
- * @param <T> Type of data to be read
- */
-public class CombinedAvroKeyInputFormat<T> extends CombineFileInputFormat<AvroKey<T>, NullWritable>
-{
- private final Logger LOG = Logger.getLogger(AbstractPartitionPreservingIncrementalJob.class);
-
- public static class CombinedAvroKeyRecordReader<T> extends AvroKeyRecordReader<T>
- {
- private CombineFileSplit inputSplit;
- private Integer idx;
-
- public CombinedAvroKeyRecordReader(CombineFileSplit inputSplit, TaskAttemptContext context, Integer idx)
- {
- super(AvroJob.getInputKeySchema(context.getConfiguration()));
- this.inputSplit = inputSplit;
- this.idx = idx;
- }
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
- InterruptedException
- {
- this.inputSplit = (CombineFileSplit)inputSplit;
-
- FileSplit fileSplit = new FileSplit(this.inputSplit.getPath(idx),
- this.inputSplit.getOffset(idx),
- this.inputSplit.getLength(idx),
- this.inputSplit.getLocations());
-
- super.initialize(fileSplit, context);
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public RecordReader<AvroKey<T>, NullWritable> createRecordReader(InputSplit inputSplit,
- TaskAttemptContext context) throws IOException
- {
- Schema readerSchema = AvroJob.getInputKeySchema(context.getConfiguration());
- if (null == readerSchema) {
- LOG.warn("Reader schema was not set. Use AvroJob.setInputKeySchema() if desired.");
- LOG.info("Using a reader schema equal to the writer schema.");
- }
-
- Object c = CombinedAvroKeyRecordReader.class;
- return new CombineFileRecordReader<AvroKey<T>, NullWritable>((CombineFileSplit) inputSplit, context, (Class<? extends RecordReader<AvroKey<T>, NullWritable>>)c);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/avro/package-info.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/avro/package-info.java b/contrib/hourglass/src/java/datafu/hourglass/avro/package-info.java
deleted file mode 100644
index 54a428f..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/avro/package-info.java
+++ /dev/null
@@ -1,6 +0,0 @@
-/**
- * Input and output formats for using Avro in incremental Hadoop jobs.
- * These are used internally by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob}
- * and {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob}.
- */
-package datafu.hourglass.avro;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/fs/DatePath.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/fs/DatePath.java b/contrib/hourglass/src/java/datafu/hourglass/fs/DatePath.java
deleted file mode 100644
index 0ffba6e..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/fs/DatePath.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.fs;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.TimeZone;
-
-import org.apache.hadoop.fs.Path;
-
-/**
- * Represents a path and the corresponding date that is associated with it.
- *
- * @author "Matthew Hayes"
- *
- */
-public class DatePath implements Comparable<DatePath>
-{
- private static final SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyyMMddHHmmss");
-
- private final Date date;
- private final Path path;
-
- static
- {
- timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
- }
-
- public DatePath(Date date, Path path)
- {
- this.date = date;
- this.path = path;
- }
-
- public Date getDate() { return this.date; }
- public Path getPath() { return this.path; }
-
- public static DatePath createDatedPath(Path parent, Date date)
- {
- return new DatePath(date,new Path(parent,PathUtils.datedPathFormat.format(date)));
- }
-
- public static DatePath createNestedDatedPath(Path parent, Date date)
- {
- return new DatePath(date,new Path(parent,PathUtils.nestedDatedPathFormat.format(date)));
- }
-
- @Override
- public String toString()
- {
- return String.format("[date=%s, path=%s]",timestampFormat.format(this.date), this.path.toString());
- }
-
- @Override
- public int compareTo(DatePath o)
- {
- return this.date.compareTo(o.date);
- }
-
- @Override
- public int hashCode()
- {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((date == null) ? 0 : date.hashCode());
- result = prime * result + ((path == null) ? 0 : path.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- DatePath other = (DatePath) obj;
- if (date == null)
- {
- if (other.date != null)
- return false;
- }
- else if (!date.equals(other.date))
- return false;
- if (path == null)
- {
- if (other.path != null)
- return false;
- }
- else if (!path.equals(other.path))
- return false;
- return true;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/fs/DateRange.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/fs/DateRange.java b/contrib/hourglass/src/java/datafu/hourglass/fs/DateRange.java
deleted file mode 100644
index 485dd82..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/fs/DateRange.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.fs;
-
-import java.util.Date;
-
-/**
- * A date range, consisting of a start and end date.
- *
- * @author "Matthew Hayes"
- *
- */
-public class DateRange
-{
- private final Date _beginDate;
- private final Date _endDate;
-
- public DateRange(Date beginDate, Date endDate)
- {
- _beginDate = beginDate;
- _endDate = endDate;
- }
-
- public Date getBeginDate()
- {
- return _beginDate;
- }
-
- public Date getEndDate()
- {
- return _endDate;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/fs/PathUtils.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/fs/PathUtils.java b/contrib/hourglass/src/java/datafu/hourglass/fs/PathUtils.java
deleted file mode 100644
index 4dfd5d1..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/fs/PathUtils.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.fs;
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.log4j.Logger;
-
-
-/**
- * A collection of utility methods for dealing with files and paths.
- *
- * @author "Matthew Hayes"
- *
- */
-public class PathUtils
-{
- private static Logger _log = Logger.getLogger(PathUtils.class);
-
- public final static TimeZone timeZone = TimeZone.getTimeZone("UTC");
- public static final SimpleDateFormat datedPathFormat = new SimpleDateFormat("yyyyMMdd");
- public static final SimpleDateFormat nestedDatedPathFormat = new SimpleDateFormat("yyyy/MM/dd");
- private static final Pattern timestampPathPattern = Pattern.compile(".+/(\\d{8})");
- private static final Pattern dailyPathPattern = Pattern.compile("(.+)/(\\d{4}/\\d{2}/\\d{2})");
-
- /**
- * Filters out paths starting with "." and "_".
- */
- public static final PathFilter nonHiddenPathFilter = new PathFilter() {
- @Override
- public boolean accept(Path path)
- {
- String s = path.getName().toString();
- return !s.startsWith(".") && !s.startsWith("_");
- }
- };
-
- static
- {
- datedPathFormat.setTimeZone(timeZone);
- nestedDatedPathFormat.setTimeZone(timeZone);
- }
-
- /**
- * Delete all but the last N days of paths matching the "yyyyMMdd" format.
- *
- * @param fs
- * @param path
- * @param retentionCount
- * @throws IOException
- */
- public static void keepLatestDatedPaths(FileSystem fs, Path path, int retentionCount) throws IOException
- {
- LinkedList<DatePath> outputs = new LinkedList<DatePath>(PathUtils.findDatedPaths(fs, path));
-
- while (outputs.size() > retentionCount)
- {
- DatePath toDelete = outputs.removeFirst();
- _log.info(String.format("Removing %s",toDelete.getPath()));
- fs.delete(toDelete.getPath(),true);
- }
- }
-
- /**
- * Delete all but the last N days of paths matching the "yyyy/MM/dd" format.
- *
- * @param fs
- * @param path
- * @param retentionCount
- * @throws IOException
- */
- public static void keepLatestNestedDatedPaths(FileSystem fs, Path path, int retentionCount) throws IOException
- {
- List<DatePath> outputPaths = PathUtils.findNestedDatedPaths(fs, path);
-
- if (outputPaths.size() > retentionCount)
- {
- Collections.sort(outputPaths);
- _log.info(String.format("Applying retention range policy"));
- for (DatePath output : outputPaths.subList(0, outputPaths.size() - retentionCount))
- {
- _log.info(String.format("Removing %s because it is outside retention range",output.getPath()));
- fs.delete(output.getPath(),true);
- }
- }
- }
-
- /**
- * List all paths matching the "yyyy/MM/dd" format under a given path.
- *
- * @param fs file system
- * @param input path to search under
- * @return paths
- * @throws IOException
- */
- public static List<DatePath> findNestedDatedPaths(FileSystem fs, Path input) throws IOException
- {
- List<DatePath> inputDates = new ArrayList<DatePath>();
-
- FileStatus[] pathsStatus = fs.globStatus(new Path(input,"*/*/*"), nonHiddenPathFilter);
-
- if (pathsStatus == null)
- {
- return inputDates;
- }
-
- for (FileStatus pathStatus : pathsStatus)
- {
- Matcher matcher = dailyPathPattern.matcher(pathStatus.getPath().toString());
- if (matcher.matches())
- {
- String datePath = matcher.group(2);
- Date date;
- try
- {
- date = nestedDatedPathFormat.parse(datePath);
- }
- catch (ParseException e)
- {
- continue;
- }
-
- Calendar cal = Calendar.getInstance(timeZone);
-
- cal.setTimeInMillis(date.getTime());
-
- inputDates.add(new DatePath(cal.getTime(), pathStatus.getPath()));
- }
- }
-
- return inputDates;
- }
-
- /**
- * List all paths matching the "yyyyMMdd" format under a given path.
- *
- * @param fs file system
- * @param path path to search under
- * @return paths
- * @throws IOException
- */
- public static List<DatePath> findDatedPaths(FileSystem fs, Path path) throws IOException
- {
- FileStatus[] outputPaths = fs.listStatus(path, nonHiddenPathFilter);
-
- List<DatePath> outputs = new ArrayList<DatePath>();
-
- if (outputPaths != null)
- {
- for (FileStatus outputPath : outputPaths)
- {
- Date date;
- try
- {
- date = datedPathFormat.parse(outputPath.getPath().getName());
- }
- catch (ParseException e)
- {
- continue;
- }
-
- outputs.add(new DatePath(date,outputPath.getPath()));
- }
- }
-
- Collections.sort(outputs);
-
- return outputs;
- }
-
- /**
- * Gets the schema from a given Avro data file.
- *
- * @param fs
- * @param path
- * @return The schema read from the data file's metadata.
- * @throws IOException
- */
- public static Schema getSchemaFromFile(FileSystem fs, Path path) throws IOException
- {
- FSDataInputStream dataInputStream = fs.open(path);
- DatumReader <GenericRecord> reader = new GenericDatumReader<GenericRecord>();
- DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(dataInputStream, reader);
- try
- {
- return dataFileStream.getSchema();
- }
- finally
- {
- dataFileStream.close();
- }
- }
-
- /**
- * Gets the schema for the first Avro file under the given path.
- *
- * @param path path to fetch schema for
- * @return Avro schema
- * @throws IOException
- */
- public static Schema getSchemaFromPath(FileSystem fs, Path path) throws IOException
- {
- return getSchemaFromFile(fs,fs.listStatus(path, nonHiddenPathFilter)[0].getPath());
- }
-
- /**
- * Sums the size of all files listed under a given path.
- *
- * @param fs file system
- * @param path path to count bytes for
- * @return total bytes under path
- * @throws IOException
- */
- public static long countBytes(FileSystem fs, Path path) throws IOException
- {
- FileStatus[] files = fs.listStatus(path, nonHiddenPathFilter);
- long totalForPath = 0L;
- for (FileStatus file : files)
- {
- totalForPath += file.getLen();
- }
- return totalForPath;
- }
-
- /**
- * Gets the date for a path in the "yyyyMMdd" format.
- *
- * @param path path to check
- * @return date for path
- */
- public static Date getDateForDatedPath(Path path)
- {
- Matcher matcher = timestampPathPattern.matcher(path.toString());
-
- if (!matcher.matches())
- {
- throw new RuntimeException("Unexpected input filename: " + path);
- }
-
- try
- {
- return PathUtils.datedPathFormat.parse(matcher.group(1));
- }
- catch (ParseException e)
- {
- throw new RuntimeException("Unexpected input filename: " + path);
- }
- }
-
- /**
- * Gets the date for a path in the "yyyy/MM/dd" format.
- *
- * @param path path to check
- * @return date
- */
- public static Date getDateForNestedDatedPath(Path path)
- {
- Matcher matcher = dailyPathPattern.matcher(path.toString());
-
- if (!matcher.matches())
- {
- throw new RuntimeException("Unexpected input filename: " + path);
- }
-
- try
- {
- return PathUtils.nestedDatedPathFormat.parse(matcher.group(2));
- }
- catch (ParseException e)
- {
- throw new RuntimeException("Unexpected input filename: " + path);
- }
- }
-
- /**
- * Gets the root path for a path in the "yyyy/MM/dd" format. This is part of the path preceding the
- * "yyyy/MM/dd" portion.
- *
- * @param path
- * @return
- */
- public static Path getNestedPathRoot(Path path)
- {
- Matcher matcher = dailyPathPattern.matcher(path.toString());
-
- if (!matcher.matches())
- {
- throw new RuntimeException("Unexpected input filename: " + path);
- }
-
- return new Path(matcher.group(1));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/fs/package-info.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/fs/package-info.java b/contrib/hourglass/src/java/datafu/hourglass/fs/package-info.java
deleted file mode 100644
index 9d81006..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/fs/package-info.java
+++ /dev/null
@@ -1,6 +0,0 @@
-/**
- * Classes for working with the file system.
- * These are used internally by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob}
- * and {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob}.
- */
-package datafu.hourglass.fs;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractJob.java
deleted file mode 100644
index 3b998a9..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractJob.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Base class for Hadoop jobs.
- *
- * <p>
- * This class defines a set of common methods and configuration shared by Hadoop jobs.
- * Jobs can be configured either by providing properties or by calling setters.
- * Each property has a corresponding setter.
- * </p>
- *
- * This class recognizes the following properties:
- *
- * <ul>
- * <li><em>input.path</em> - Input path job will read from</li>
- * <li><em>output.path</em> - Output path job will write to</li>
- * <li><em>temp.path</em> - Temporary path under which intermediate files are stored</li>
- * <li><em>retention.count</em> - Number of days to retain in output directory</li>
- * <li><em>num.reducers</em> - Number of reducers to use</li>
- * <li><em>use.combiner</em> - Whether to use a combiner or not</li>
- * <li><em>counters.path</em> - Path to store job counters in</li>
- * </ul>
- *
- * <p>
- * The <em>input.path</em> property may be a comma-separated list of paths. When there is more
- * than one it implies a join is to be performed. Alternatively the paths may be listed separately.
- * For example, <em>input.path.first</em> and <em>input.path.second</em> define two separate input
- * paths.
- * </p>
- *
- * <p>
- * The <em>num.reducers</em> fixes the number of reducers. When not set the number of reducers
- * is computed based on the input size.
- * </p>
- *
- * <p>
- * The <em>temp.path</em> property defines the parent directory for temporary paths, not the
- * temporary path itself. Temporary paths are created under this directory with an <em>hourglass-</em>
- * prefix followed by a GUID.
- * </p>
- *
- * <p>
- * The input and output paths are the only required parameters. The rest are optional.
- * </p>
- *
- * <p>
- * Hadoop configuration may be provided by setting a property with the prefix <em>hadoop-conf.</em>.
- * For example, <em>mapred.min.split.size</em> can be configured by setting property
- * <em>hadoop-conf.mapred.min.split.size</em> to the desired value.
- * </p>
- *
- * @author "Matthew Hayes"
- *
- */
-public abstract class AbstractJob extends Configured
-{
- private static String HADOOP_PREFIX = "hadoop-conf.";
-
- private Properties _props;
- private String _name;
- private boolean _useCombiner;
- private Path _countersParentPath;
- private Integer _numReducers;
- private Integer _retentionCount;
- private List<Path> _inputPaths;
- private Path _outputPath;
- private Path _tempPath = new Path("/tmp");
- private FileSystem _fs;
-
- /**
- * Initializes the job.
- */
- public AbstractJob()
- {
- setConf(new Configuration());
- }
-
- /**
- * Initializes the job with a job name and properties.
- *
- * @param name Job name
- * @param props Configuration properties
- */
- public AbstractJob(String name, Properties props)
- {
- this();
- setName(name);
- setProperties(props);
- }
-
- /**
- * Gets the job name
- *
- * @return Job name
- */
- public String getName()
- {
- return _name;
- }
-
- /**
- * Sets the job name
- *
- * @param name Job name
- */
- public void setName(String name)
- {
- _name = name;
- }
-
- /**
- * Gets the configuration properties.
- *
- * @return Configuration properties
- */
- public Properties getProperties()
- {
- return _props;
- }
-
- /**
- * Sets the configuration properties.
- *
- * @param props Properties
- */
- public void setProperties(Properties props)
- {
- _props = props;
- updateConfigurationFromProps(_props);
-
- if (_props.get("input.path") != null)
- {
- String[] pathSplit = ((String)_props.get("input.path")).split(",");
- List<Path> paths = new ArrayList<Path>();
- for (String path : pathSplit)
- {
- if (path != null && path.length() > 0)
- {
- path = path.trim();
- if (path.length() > 0)
- {
- paths.add(new Path(path));
- }
- }
- }
- if (paths.size() > 0)
- {
- setInputPaths(paths);
- }
- else
- {
- throw new RuntimeException("Could not extract input paths from: " + _props.get("input.path"));
- }
- }
- else
- {
- List<Path> inputPaths = new ArrayList<Path>();
- for (Object o : _props.keySet())
- {
- String prop = o.toString();
- if (prop.startsWith("input.path."))
- {
- inputPaths.add(new Path(_props.getProperty(prop)));
- }
- }
- if (inputPaths.size() > 0)
- {
- setInputPaths(inputPaths);
- }
- }
-
- if (_props.get("output.path") != null)
- {
- setOutputPath(new Path((String)_props.get("output.path")));
- }
-
- if (_props.get("temp.path") != null)
- {
- setTempPath(new Path((String)_props.get("temp.path")));
- }
-
- if (_props.get("retention.count") != null)
- {
- setRetentionCount(Integer.parseInt((String)_props.get("retention.count")));
- }
-
- if (_props.get("num.reducers") != null)
- {
- setNumReducers(Integer.parseInt((String)_props.get("num.reducers")));
- }
-
- if (_props.get("use.combiner") != null)
- {
- setUseCombiner(Boolean.parseBoolean((String)_props.get("use.combiner")));
- }
-
- if (_props.get("counters.path") != null)
- {
- setCountersParentPath(new Path((String)_props.get("counters.path")));
- }
- }
-
- /**
- * Overridden to provide custom configuration before the job starts.
- *
- * @param conf
- */
- public void config(Configuration conf)
- {
- }
-
- /**
- * Gets the number of reducers to use.
- *
- * @return Number of reducers
- */
- public Integer getNumReducers()
- {
- return _numReducers;
- }
-
- /**
- * Sets the number of reducers to use. Can also be set with <em>num.reducers</em> property.
- *
- * @param numReducers Number of reducers to use
- */
- public void setNumReducers(Integer numReducers)
- {
- this._numReducers = numReducers;
- }
-
- /**
- * Gets whether the combiner should be used.
- *
- * @return True if combiner should be used, otherwise false.
- */
- public boolean isUseCombiner()
- {
- return _useCombiner;
- }
-
- /**
- * Sets whether the combiner should be used. Can also be set with <em>use.combiner</em>.
- *
- * @param useCombiner True if a combiner should be used, otherwise false.
- */
- public void setUseCombiner(boolean useCombiner)
- {
- this._useCombiner = useCombiner;
- }
-
- /**
- * Gets the path where counters will be stored.
- *
- * @return Counters path
- */
- public Path getCountersParentPath()
- {
- return _countersParentPath;
- }
-
- /**
- * Sets the path where counters will be stored. Can also be set with <em>counters.path</em>.
- *
- * @param countersParentPath Counters path
- */
- public void setCountersParentPath(Path countersParentPath)
- {
- this._countersParentPath = countersParentPath;
- }
-
- /**
- * Gets the number of days of data which will be retained in the output path.
- * Only the latest will be kept. Older paths will be removed.
- *
- * @return retention count
- */
- public Integer getRetentionCount()
- {
- return _retentionCount;
- }
-
- /**
- * Sets the number of days of data which will be retained in the output path.
- * Only the latest will be kept. Older paths will be removed.
- * Can also be set with <em>retention.count</em>.
- *
- * @param retentionCount
- */
- public void setRetentionCount(Integer retentionCount)
- {
- this._retentionCount = retentionCount;
- }
-
- /**
- * Gets the input paths. Multiple input paths imply a join is to be performed.
- *
- * @return input paths
- */
- public List<Path> getInputPaths()
- {
- return _inputPaths;
- }
-
- /**
- * Sets the input paths. Multiple input paths imply a join is to be performed.
- * Can also be set with <em>input.path</em> or several properties starting with
- * <em>input.path.</em>.
- *
- * @param inputPaths input paths
- */
- public void setInputPaths(List<Path> inputPaths)
- {
- this._inputPaths = inputPaths;
- }
-
- /**
- * Gets the output path.
- *
- * @return output path
- */
- public Path getOutputPath()
- {
- return _outputPath;
- }
-
- /**
- * Sets the output path. Can also be set with <em>output.path</em>.
- *
- * @param outputPath output path
- */
- public void setOutputPath(Path outputPath)
- {
- this._outputPath = outputPath;
- }
-
- /**
- * Gets the temporary path under which intermediate files will be stored. Defaults to /tmp.
- *
- * @return Temporary path
- */
- public Path getTempPath()
- {
- return _tempPath;
- }
-
- /**
- * Sets the temporary path where intermediate files will be stored. Defaults to /tmp.
- *
- * @param tempPath Temporary path
- */
- public void setTempPath(Path tempPath)
- {
- this._tempPath = tempPath;
- }
-
- /**
- * Gets the file system.
- *
- * @return File system
- * @throws IOException
- */
- protected FileSystem getFileSystem()
- {
- if (_fs == null)
- {
- try
- {
- _fs = FileSystem.get(getConf());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
- return _fs;
- }
-
- /**
- * Generates a random temporary path within the file system. This does not create the path.
- *
- * @return Random temporary path
- */
- protected Path randomTempPath()
- {
- return new Path(_tempPath,String.format("hourglass-%s",UUID.randomUUID()));
- }
-
- /**
- * Creates a random temporary path within the file system.
- *
- * @return Random temporary path
- * @throws IOException
- */
- protected Path createRandomTempPath() throws IOException
- {
- return ensurePath(randomTempPath());
- }
-
- /**
- * Creates a path, if it does not already exist.
- *
- * @param path Path to create
- * @return The same path that was provided
- * @throws IOException
- */
- protected Path ensurePath(Path path) throws IOException
- {
- if (!getFileSystem().exists(path))
- {
- getFileSystem().mkdirs(path);
- }
- return path;
- }
-
- /**
- * Validation required before running job.
- */
- protected void validate()
- {
- if (_inputPaths == null || _inputPaths.size() == 0)
- {
- throw new IllegalArgumentException("Input path is not specified.");
- }
-
- if (_outputPath == null)
- {
- throw new IllegalArgumentException("Output path is not specified.");
- }
- }
-
- /**
- * Initialization required before running job.
- */
- protected void initialize()
- {
- }
-
- /**
- * Run the job.
- *
- * @throws IOException
- * @throws InterruptedException
- * @throws ClassNotFoundException
- */
- public abstract void run() throws IOException, InterruptedException, ClassNotFoundException;
-
- /**
- * Creates Hadoop configuration using the provided properties.
- *
- * @param props
- * @return
- */
- private void updateConfigurationFromProps(Properties props)
- {
- Configuration config = getConf();
-
- if (config == null)
- {
- config = new Configuration();
- }
-
- // to enable unit tests to inject configuration
- if (props.containsKey("test.conf"))
- {
- try
- {
- byte[] decoded = Base64.decodeBase64(props.getProperty("test.conf"));
- ByteArrayInputStream byteInput = new ByteArrayInputStream(decoded);
- DataInputStream inputStream = new DataInputStream(byteInput);
- config.readFields(inputStream);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
- else
- {
- for (String key : props.stringPropertyNames())
- {
- String newKey = key;
- String value = props.getProperty(key);
-
- if (key.toLowerCase().startsWith(HADOOP_PREFIX)) {
- newKey = key.substring(HADOOP_PREFIX.length());
- config.set(newKey, value);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractNonIncrementalJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractNonIncrementalJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractNonIncrementalJob.java
deleted file mode 100644
index 1c09cb4..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractNonIncrementalJob.java
+++ /dev/null
@@ -1,425 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.log4j.Logger;
-
-
-import datafu.hourglass.avro.CombinedAvroKeyInputFormat;
-import datafu.hourglass.fs.DatePath;
-import datafu.hourglass.fs.DateRange;
-import datafu.hourglass.fs.PathUtils;
-
-/**
- * Base class for Hadoop jobs that consume time-partitioned data
- * in a non-incremental way. Typically this is only used for comparing incremental
- * jobs against a non-incremental baseline.
- * It is essentially the same as {@link AbstractPartitionCollapsingIncrementalJob}
- * without all the incremental features.
- *
- * <p>
- * Jobs extending this class consume input data partitioned according to yyyy/MM/dd.
- * Only a single input path is supported. The output will be written to a directory
- * in the output path with name format yyyyMMdd derived from the end of the time
- * window that is consumed.
- * </p>
- *
- * <p>
- * This class has the same configuration and methods as {@link TimeBasedJob}.
- * In addition it also recognizes the following properties:
- * </p>
- *
- * <ul>
- * <li><em>combine.inputs</em> - True if inputs should be combined (defaults to false)</li>
- * <li><em>num.reducers.bytes.per.reducer</em> - Number of input bytes per reducer</li>
- * </ul>
- *
- * <p>
- * When <em>combine.inputs</em> is true, then CombinedAvroKeyInputFormat is used
- * instead of AvroKeyInputFormat. This enables a single map task to consume more than
- * one file.
- * </p>
- *
- * <p>
- * The <em>num.reducers.bytes.per.reducer</em> property controls the number of reducers to
- * use based on the input size. The total size of the input files is divided by this number
- * and then rounded up.
- * </p>
- *
- * @author "Matthew Hayes"
- *
- */
-public abstract class AbstractNonIncrementalJob extends TimeBasedJob
-{
- private final Logger _log = Logger.getLogger(AbstractNonIncrementalJob.class);
-
- private boolean _combineInputs;
- private Report _report;
-
- /**
- * Initializes the job.
- *
- * @param name job name
- * @param props configuration properties
- * @throws IOException
- */
- public AbstractNonIncrementalJob(String name, Properties props) throws IOException
- {
- super(name,props);
-
- if (props.containsKey("combine.inputs"))
- {
- setCombineInputs(Boolean.parseBoolean(props.getProperty("combine.inputs")));
- }
- }
-
- /**
- * Gets whether inputs should be combined.
- *
- * @return true if inputs are to be combined
- */
- public boolean getCombineInputs()
- {
- return _combineInputs;
- }
-
- /**
- * Sets whether inputs should be combined.
- *
- * @param combineInputs true to combine inputs
- */
- public void setCombineInputs(boolean combineInputs)
- {
- _combineInputs = combineInputs;
- }
-
- /**
- * Gets a report summarizing the run.
- *
- * @return report
- */
- public Report getReport()
- {
- return _report;
- }
-
- /**
- * Runs the job.
- *
- * @throws IOException
- * @throws InterruptedException
- * @throws ClassNotFoundException
- */
- @Override
- public void run() throws IOException, InterruptedException, ClassNotFoundException
- {
- _report = new Report();
-
- Calendar cal = Calendar.getInstance(PathUtils.timeZone);
-
- if (!getFileSystem().exists(getOutputPath()))
- {
- getFileSystem().mkdirs(getOutputPath());
- }
-
- if (getInputPaths().size() > 1)
- {
- throw new RuntimeException("Only a single input is supported");
- }
-
- List<DatePath> inputs = PathUtils.findNestedDatedPaths(getFileSystem(), getInputPaths().get(0));
-
- DatePath latestInput = (inputs.size() > 0) ? inputs.get(inputs.size() - 1) : null;
-
- if (inputs.size() == 0)
- {
- throw new RuntimeException("no input data available");
- }
-
- List<Date> dates = new ArrayList<Date>();
- for (DatePath dp : inputs)
- {
- dates.add(dp.getDate());
- }
-
- DateRange dateRange = DateRangePlanner.getDateRange(getStartDate(), getEndDate(), dates, getDaysAgo(), getNumDays(), true);
-
- Map<Date,DatePath> existingInputs = new HashMap<Date,DatePath>();
- for (DatePath input : inputs)
- {
- existingInputs.put(input.getDate(), input);
- }
-
- _log.info("Getting schema for input " + latestInput.getPath());
- Schema inputSchema = PathUtils.getSchemaFromPath(getFileSystem(),latestInput.getPath());
-
- ReduceEstimator estimator = new ReduceEstimator(getFileSystem(),getProperties());
-
- List<String> inputPaths = new ArrayList<String>();
- for (Date currentDate=dateRange.getBeginDate(); currentDate.compareTo(dateRange.getEndDate()) <= 0; )
- {
- DatePath input = existingInputs.get(currentDate);
- if (input != null)
- {
- _log.info(String.format("Processing %s",input.getPath()));
- inputPaths.add(input.getPath().toString());
- estimator.addInputPath(input.getPath());
- _report.inputFiles.add(input);
- latestInput = input;
- }
- else
- {
- throw new RuntimeException(String.format("Missing input for %s",PathUtils.datedPathFormat.format(currentDate)));
- }
-
- cal.setTime(currentDate);
- cal.add(Calendar.DAY_OF_MONTH, 1);
- currentDate = cal.getTime();
- }
-
- Path timestampOutputPath = new Path(getOutputPath(),PathUtils.datedPathFormat.format(latestInput.getDate()));
-
- final StagedOutputJob job = StagedOutputJob.createStagedJob(
- getConf(),
- getName() + "-" + PathUtils.datedPathFormat.format(latestInput.getDate()),
- inputPaths,
- "/tmp" + timestampOutputPath.toString(),
- timestampOutputPath.toString(),
- _log);
-
-
- job.setCountersParentPath(getCountersParentPath());
-
- if (_combineInputs)
- {
- job.setInputFormatClass(CombinedAvroKeyInputFormat.class);
- }
- else
- {
- job.setInputFormatClass(AvroKeyInputFormat.class);
- }
-
- job.setOutputFormatClass(AvroKeyOutputFormat.class);
-
- AvroJob.setInputKeySchema(job, inputSchema);
- AvroJob.setMapOutputKeySchema(job, getMapOutputKeySchema());
- AvroJob.setMapOutputValueSchema(job, getMapOutputValueSchema());
- AvroJob.setOutputKeySchema(job, getReduceOutputSchema());
-
- int numReducers;
- if (getNumReducers() != null)
- {
- numReducers = getNumReducers();
- _log.info(String.format("Using %d reducers (fixed)",numReducers));
- }
- else
- {
- numReducers = estimator.getNumReducers();
- _log.info(String.format("Using %d reducers (computed)",numReducers));
- }
-
- job.setNumReduceTasks(numReducers);
-
- job.setMapperClass(getMapperClass());
- job.setReducerClass(getReducerClass());
-
- if (isUseCombiner() && getCombinerClass() != null)
- {
- job.setCombinerClass(getCombinerClass());
- }
-
- config(job.getConfiguration());
-
- if (!job.waitForCompletion(true))
- {
- _log.error("Job failed! Quitting...");
- throw new RuntimeException("Job failed");
- }
-
- _report.jobId = job.getJobID().toString();
- _report.jobName = job.getJobName();
- _report.countersPath = job.getCountersPath();
- _report.outputFile = new DatePath(latestInput.getDate(),timestampOutputPath);
-
- if (getRetentionCount() != null)
- {
- PathUtils.keepLatestDatedPaths(getFileSystem(), getOutputPath(), getRetentionCount());
- }
- }
-
- /**
- * Gets the key schema for the map output.
- *
- * @return map output key schema
- */
- protected abstract Schema getMapOutputKeySchema();
-
- /**
- * Gets the value schema for the map output.
- *
- * @return map output value schema
- */
- protected abstract Schema getMapOutputValueSchema();
-
- /**
- * Gets the reduce output schema.
- *
- * @return reduce output schema
- */
- protected abstract Schema getReduceOutputSchema();
-
- /**
- * Gets the mapper class.
- *
- * @return the mapper
- */
- public abstract Class<? extends BaseMapper> getMapperClass();
-
- /**
- * Gets the reducer class.
- *
- * @return the reducer
- */
- public abstract Class<? extends BaseReducer> getReducerClass();
-
- /**
- * Gets the combiner class.
- *
- * @return the combiner
- */
- public Class<? extends BaseCombiner> getCombinerClass()
- {
- return null;
- }
-
- /**
- * Mapper base class for {@link AbstractNonIncrementalJob}.
- *
- * @author "Matthew Hayes"
- *
- */
- public static abstract class BaseMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<GenericRecord>, AvroValue<GenericRecord>>
- {
- }
-
- /**
- * Combiner base class for {@link AbstractNonIncrementalJob}.
- *
- * @author "Matthew Hayes"
- *
- */
- public static abstract class BaseCombiner extends Reducer<AvroKey<GenericRecord>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, AvroValue<GenericRecord>>
- {
- }
-
- /**
- * Reducer base class for {@link AbstractNonIncrementalJob}.
- *
- * @author "Matthew Hayes"
- *
- */
- public static abstract class BaseReducer extends Reducer<AvroKey<GenericRecord>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable>
- {
- }
-
- /**
- * Reports files created and processed for an iteration of the job.
- *
- * @author "Matthew Hayes"
- *
- */
- public static class Report
- {
- private String jobName;
- private String jobId;
- private Path countersPath;
- private List<DatePath> inputFiles = new ArrayList<DatePath>();
- private DatePath outputFile;
-
- /**
- * Gets the job name.
- *
- * @return job name
- */
- public String getJobName()
- {
- return jobName;
- }
-
- /**
- * Gets the job ID.
- *
- * @return job ID
- */
- public String getJobId()
- {
- return jobId;
- }
-
- /**
- * Gets the path to the counters file, if one was written.
- *
- * @return counters path
- */
- public Path getCountersPath()
- {
- return countersPath;
- }
-
- /**
- * Gets input files that were processed. These are files that are within
- * the desired date range.
- *
- * @return input files
- */
- public List<DatePath> getInputFiles()
- {
- return Collections.unmodifiableList(inputFiles);
- }
-
- /**
- * Gets the output file that was produced by the job.
- *
- * @return output file
- */
- public DatePath getOutputFile()
- {
- return outputFile;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractPartitionCollapsingIncrementalJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractPartitionCollapsingIncrementalJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractPartitionCollapsingIncrementalJob.java
deleted file mode 100644
index b01eaad..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/AbstractPartitionCollapsingIncrementalJob.java
+++ /dev/null
@@ -1,709 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-import java.sql.Date;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
-import org.apache.log4j.Logger;
-
-
-import datafu.hourglass.avro.AvroDateRangeMetadata;
-import datafu.hourglass.avro.AvroKeyWithMetadataOutputFormat;
-import datafu.hourglass.avro.AvroMultipleInputsKeyInputFormat;
-import datafu.hourglass.avro.AvroMultipleInputsUtil;
-import datafu.hourglass.fs.DatePath;
-import datafu.hourglass.fs.DateRange;
-import datafu.hourglass.fs.PathUtils;
-import datafu.hourglass.mapreduce.AvroKeyValueIdentityMapper;
-import datafu.hourglass.mapreduce.CollapsingCombiner;
-import datafu.hourglass.mapreduce.CollapsingMapper;
-import datafu.hourglass.mapreduce.CollapsingReducer;
-import datafu.hourglass.mapreduce.DelegatingCombiner;
-import datafu.hourglass.mapreduce.DelegatingMapper;
-import datafu.hourglass.mapreduce.DelegatingReducer;
-import datafu.hourglass.mapreduce.DistributedCacheHelper;
-import datafu.hourglass.mapreduce.Parameters;
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.model.Mapper;
-import datafu.hourglass.model.Merger;
-import datafu.hourglass.schemas.PartitionCollapsingSchemas;
-
-/**
- * An {@link IncrementalJob} that consumes partitioned input data and collapses the
- * partitions to produce a single output. This job can be used to process data
- * using a sliding window. It is capable of reusing the previous output, which
- * means that it can process data more efficiently.
- * Only Avro is supported for the input, intermediate, and output data.
- *
- * <p>
- * Implementations of this class must provide key, intermediate value, and output value schemas.
- * The key and intermediate value schemas define the output for the mapper and combiner.
- * The key and output value schemas define the output for the reducer.
- * These are defined by overriding {@link #getKeySchema()}, {@link #getIntermediateValueSchema()},
- * and {@link #getOutputValueSchema()}.
- * </p>
- *
- * <p>
- * Implementations must also provide a mapper by overriding {@link #getMapper()} and an accumulator
- * for the reducer by overriding {@link #getReducerAccumulator()}. An optional combiner may be
- * provided by overriding {@link #getCombinerAccumulator()}. For the combiner to be used
- * the property <em>use.combiner</em> must also be set to true.
- * </p>
- *
- * <p>
- * The input path can be provided either through the property <em>input.path</em>
- * or by calling {@link #setInputPaths(List)}. If multiple input paths are provided then
- * this implicitly means a join is to be performed. Multiple input paths can be provided via
- * properties by prefixing each with <em>input.path.</em>, such as <em>input.path.first</em>
- * and <em>input.path.second</em>.
- * Input data must be partitioned by day according to the naming convention yyyy/MM/dd.
- * The output path can be provided either through the property <em>output.path</em>
- * or by calling {@link #setOutputPath(Path)}.
- * Output data will be written using the naming convention yyyyMMdd, where the date used
- * to format the output path is the same as the end of the desired time range to process.
- * For example, if the desired time range to process is 2013/01/01 through 2013/01/14,
- * then the output will be named 20130114.
- * By default the job will fail if any input data in the desired time window is missing. This can be overriden by setting
- * <em>fail.on.missing</em> to false.
- * </p>
- *
- * <p>
- * The job will not process input if the corresponding output has already been produced. For example, if the desired date
- * range is 2013/01/01 through 2013/01/14 and the output 20130114 already exists, then it assumes the work has alreaday
- * been completed.
- * </p>
- *
- * <p>
- * By default only the latest output will be kept. All other outputs will be removed. This can be controlled
- * by setting the property <em>retention.count</em>, or by calling {@link #setRetentionCount(Integer)}.
- * </p>
- *
- * <p>
- * Two types of sliding windows may be used: <em>fixed-length</em> and <em>fixed-start</em>. For a fixed-length
- * sliding window, the size of the window is fixed; the start and end move according to the
- * availability of input data. For a fixed-start window, the size of the window is flexible;
- * the start is fixed and the end moves according to the availability of input data.
- * </p>
- *
- * <p>
- * A fixed-length sliding window can be defined either by setting the property <em>num.days</em>
- * or by calling {@link #setNumDays(Integer)}. This sets how many days of input data will be
- * consumed. By default the end of the window will be the same as the date of the latest available
- * input data. The start is then determine by the number of days to consume. The end date can
- * be moved back relative to the latest input data by setting the <em>days.ago</em> property or
- * by calling {@link #setDaysAgo(Integer)}. Since the end date is determined by the availability
- * of input data, as new data arrives the window will advance forward.
- * </p>
- *
- * <p>
- * A fixed-start sliding window can be defined by setting the property <em>start.date</em> or
- * by calling {@link #setStartDate(java.util.Date)}. The end date will be the same as the date of
- * the latest available input data. The end date can
- * be moved back relative to the latest input data by setting the <em>days.ago</em> property or
- * by calling {@link #setDaysAgo(Integer)}.
- * Because the end date is determined by the availability of input data, as new data arrives the window
- * will grow to include it.
- * </p>
- *
- * <p>
- * Previous output can be reused by setting the <em>reuse.previous.output</em> property to true, or
- * by calling {@link #setReusePreviousOutput(boolean)}. Reusing the previous output is often more efficient
- * because only input data outside of the time window covered by the previous output needs to be consumed.
- * For example, given a fixed-start sliding window job, if one new day of input data is available since the
- * last time the job ran, then the job can reuse the previous output and only read the newest day of data, rather
- * than reading all the input data again. Given a fixed-length sliding window in the same scenario, the new output
- * can be produced by adding the newest input to the previous output and subtracting the oldest input from the old
- * window.
- * </p>
- *
- * <p>
- * For a fixed-start sliding window, if the schema for the intermediate and output values are the same then no additional
- * changes are necessary, as the reducer's accumulator should be capable of adding the new input to the previous output.
- * However if they are different then a record must be defined by overriding {@link #getRecordMerger()} so that the previous
- * output can be merged with the partial output produced by reducing the new input data.
- * For the fixed-length sliding window one must override {@link #getOldRecordMerger()} to reuse the previous output.
- * This method essentially unmerges old, partial output data from the current output. For this case as well if the intermediate
- * and output schemas are the same the {@link #getRecordMerger()} method does not need to be overriden.
- * </p>
- *
- * <p>
- * The number of reducers to use is automatically determined based on the size of the data to process.
- * The total size is computed and then divided by the value of the property <em>num.reducers.bytes.per.reducer</em>, which
- * defaults to 256 MB. This is the number of reducers that will be used. This calculation includes
- * the input data as well as previous output that will be reused. It is also possible calculate the number of reducers
- * separately for the input and previous output through the properties <em>num.reducers.input.bytes.per.reducer</em>
- * and <em>num.reducers.previous.bytes.per.reducer</em>. The reducers will be computed separately for the two sets of data
- * and then added together. The number of reducers can also be set to a fixed value through the property <em>num.reducers</em>.
- * </p>
- *
- * <p>
- * This type of job is capable of performing its work over multiple iterations if previous output can be reused.
- * The number of days to process at a time can be limited by setting the property <em>max.days.to.process</em>,
- * or by calling {@link #setMaxToProcess(Integer)}. The default is 90 days.
- * This can be useful when there are restrictions on how many tasks
- * can be used by a single MapReduce job in the cluster. When this property is set, the job will process no more than
- * this many days at a time, and it will perform one or more iterations if necessary to complete the work.
- * The number of iterations can be limited by setting the property <em>max.iterations</em>, or by calling {@link #setMaxIterations(Integer)}.
- * If the number of iterations is exceeded the job will fail. By default the maximum number of iterations is 20.
- * </p>
- *
- * <p>
- * Hadoop configuration may be provided by setting a property with the prefix <em>hadoop-conf.</em>.
- * For example, <em>mapred.min.split.size</em> can be configured by setting property
- * <em>hadoop-conf.mapred.min.split.size</em> to the desired value.
- * </p>
- *
- * @author "Matthew Hayes"
- *
- */
-public abstract class AbstractPartitionCollapsingIncrementalJob extends IncrementalJob
-{
- private final Logger _log = Logger.getLogger(AbstractPartitionCollapsingIncrementalJob.class);
-
- private List<Report> _reports = new ArrayList<Report>();
- protected boolean _reusePreviousOutput;
- private FileCleaner _garbage;
-
- /**
- * Initializes the job.
- */
- public AbstractPartitionCollapsingIncrementalJob() throws IOException
- {
- }
-
- /**
- * Initializes the job with a job name and properties.
- *
- * @param name job name
- * @param props configuration properties
- */
- public AbstractPartitionCollapsingIncrementalJob(String name, Properties props) throws IOException
- {
- super(name,props);
- }
-
- /**
- * Gets the mapper.
- *
- * @return mapper
- */
- public abstract Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper();
-
- /**
- * Gets the accumulator used for the combiner.
- *
- * @return combiner accumulator
- */
- public Accumulator<GenericRecord,GenericRecord> getCombinerAccumulator()
- {
- return null;
- }
-
- /**
- * Gets the accumulator used for the reducer.
- *
- * @return reducer accumulator
- */
- public abstract Accumulator<GenericRecord,GenericRecord> getReducerAccumulator();
-
- /**
- * Gets the record merger that is capable of merging previous output with a new partial output.
- * This is only needed when reusing previous output where the intermediate and output schemas are different.
- * New partial output is produced by the reducer from new input that is after the previous output.
- *
- * @return merger
- */
- public Merger<GenericRecord> getRecordMerger()
- {
- return null;
- }
-
- /**
- * Gets the record merger that is capable of unmerging old partial output from the new output.
- * This is only needed when reusing previous output for a fixed-length sliding window.
- * The new output is the result of merging the previous output with the new partial output.
- * The old partial output is produced by the reducer from old input data before the time range of
- * the previous output.
- *
- * @return merger
- */
- public Merger<GenericRecord> getOldRecordMerger()
- {
- return null;
- }
-
- /**
- * Get the name for the reduce output schema.
- * By default this is the name of the class with "Output" appended.
- *
- * @return output schema name
- */
- protected String getOutputSchemaName()
- {
- return this.getClass().getSimpleName() + "Output";
- }
-
- /**
- * Get the namespace for the reduce output schema.
- * By default this is the package of the class.
- *
- * @return output schema namespace
- */
- protected String getOutputSchemaNamespace()
- {
- return this.getClass().getPackage().getName();
- }
-
- @Override
- public void setProperties(Properties props)
- {
- super.setProperties(props);
-
- if (getProperties().get("reuse.previous.output") != null)
- {
- setReusePreviousOutput(Boolean.parseBoolean((String)getProperties().get("reuse.previous.output")));
- }
- }
-
- /**
- * Get whether previous output should be reused.
- *
- * @return true if previous output should be reused
- */
- public boolean getReusePreviousOutput()
- {
- return _reusePreviousOutput;
- }
-
- /**
- * Set whether previous output should be reused.
- *
- * @param reuse true if previous output should be reused
- */
- public void setReusePreviousOutput(boolean reuse)
- {
- _reusePreviousOutput = reuse;
- }
-
- @Override
- protected void initialize()
- {
- _garbage = new FileCleaner(getFileSystem());
-
- if (getMaxIterations() == null)
- {
- setMaxIterations(20);
- }
-
- if (getMaxToProcess() == null)
- {
- if (getNumDays() != null)
- {
- setMaxToProcess(getNumDays());
- }
- else
- {
- setMaxToProcess(90);
- }
- }
-
- if (getRetentionCount() == null)
- {
- setRetentionCount(1);
- }
-
- super.initialize();
- }
-
- @Override
- public void run() throws IOException, InterruptedException, ClassNotFoundException
- {
- try
- {
- initialize();
- validate();
- execute();
- }
- finally
- {
- cleanup();
- }
- }
-
- /**
- * Get reports that summarize each of the job iterations.
- *
- * @return reports
- */
- public List<Report> getReports()
- {
- return Collections.unmodifiableList(_reports);
- }
-
- /**
- * Execute the job.
- *
- * @throws IOException
- * @throws InterruptedException
- * @throws ClassNotFoundException
- */
- private void execute() throws IOException, InterruptedException, ClassNotFoundException
- {
- int iterations = 0;
-
- while (true)
- {
- PartitionCollapsingExecutionPlanner planner = new PartitionCollapsingExecutionPlanner(getFileSystem(),getProperties());
- planner.setInputPaths(getInputPaths());
- planner.setOutputPath(getOutputPath());
- planner.setStartDate(getStartDate());
- planner.setEndDate(getEndDate());
- planner.setDaysAgo(getDaysAgo());
- planner.setNumDays(getNumDays());
- planner.setMaxToProcess(getMaxToProcess());
- planner.setReusePreviousOutput(getReusePreviousOutput());
- planner.setFailOnMissing(isFailOnMissing());
- planner.createPlan();
-
- if (planner.getInputsToProcess().size() == 0)
- {
- _log.info("Nothing to do");
- break;
- }
-
- if (iterations >= getMaxIterations())
- {
- throw new RuntimeException(String.format("Already completed %d iterations but the max is %d and there are still %d inputs to process",
- iterations,
- getMaxIterations(),
- planner.getInputsToProcess().size()));
- }
-
- Report report = new Report();
-
- report.inputFiles.addAll(planner.getNewInputsToProcess());
- report.oldInputFiles.addAll(planner.getOldInputsToProcess());
- if (planner.getPreviousOutputToProcess() != null)
- {
- report.reusedOutput = planner.getPreviousOutputToProcess();
- }
-
- DatePath outputPath = DatePath.createDatedPath(getOutputPath(), planner.getCurrentDateRange().getEndDate());
-
- _log.info("Output path: " + outputPath);
-
- Path tempOutputPath = createRandomTempPath();
-
- _garbage.add(tempOutputPath);
-
- final StagedOutputJob job = StagedOutputJob.createStagedJob(
- getConf(),
- getName() + "-" + PathUtils.datedPathFormat.format(planner.getCurrentDateRange().getEndDate()),
- null, // no input paths specified here, will add multiple inputs down below
- tempOutputPath.toString(),
- outputPath.getPath().toString(),
- _log);
-
- job.setCountersParentPath(getCountersParentPath());
-
- if (planner.getNewInputsToProcess() != null && planner.getNewInputsToProcess().size() > 0)
- {
- _log.info("*** New Input data:");
- for (DatePath inputPath : planner.getNewInputsToProcess())
- {
- _log.info(inputPath.getPath());
- MultipleInputs.addInputPath(job, inputPath.getPath(), AvroMultipleInputsKeyInputFormat.class, DelegatingMapper.class);
- }
- }
-
- if (planner.getOldInputsToProcess() != null && planner.getOldInputsToProcess().size() > 0)
- {
- _log.info("*** Old Input data:");
- for (DatePath inputPath : planner.getOldInputsToProcess())
- {
- _log.info(inputPath.getPath());
- MultipleInputs.addInputPath(job, inputPath.getPath(), AvroMultipleInputsKeyInputFormat.class, DelegatingMapper.class);
- }
- }
-
- if (planner.getPreviousOutputToProcess() != null)
- {
- _log.info("*** Previous output data:");
- _log.info(planner.getPreviousOutputToProcess().getPath());
- MultipleInputs.addInputPath(job, planner.getPreviousOutputToProcess().getPath(), AvroKeyInputFormat.class, AvroKeyValueIdentityMapper.class);
- }
-
- final Configuration conf = job.getConfiguration();
-
- config(conf);
-
- AvroDateRangeMetadata.configureOutputDateRange(conf, planner.getCurrentDateRange());
-
- PartitionCollapsingSchemas spSchemas = new PartitionCollapsingSchemas(getSchemas(), planner.getInputSchemasByPath(), getOutputSchemaName(), getOutputSchemaNamespace());
-
- job.setOutputFormatClass(AvroKeyWithMetadataOutputFormat.class);
-
- _log.info("Setting input path to schema mappings");
- for (String path : spSchemas.getMapInputSchemas().keySet())
- {
- Schema schema = spSchemas.getMapInputSchemas().get(path);
- _log.info("*** " + path);
- _log.info("*** => " + schema.toString());
- AvroMultipleInputsUtil.setInputKeySchemaForPath(job, schema, path);
- }
-
- AvroJob.setMapOutputKeySchema(job, spSchemas.getMapOutputKeySchema());
- AvroJob.setMapOutputValueSchema(job, spSchemas.getMapOutputValueSchema());
- AvroJob.setOutputKeySchema(job, spSchemas.getReduceOutputSchema());
-
- int numReducers;
-
- if (getNumReducers() != null)
- {
- numReducers = getNumReducers();
- _log.info(String.format("Using %d reducers (fixed)",numReducers));
- }
- else
- {
- numReducers = planner.getNumReducers();
- _log.info(String.format("Using %d reducers (computed)",numReducers));
- }
-
- job.setNumReduceTasks(numReducers);
-
- job.setReducerClass(DelegatingReducer.class);
-
- Path mapperPath = new Path(tempOutputPath,".mapper_impl");
- Path reducerPath = new Path(tempOutputPath,".reducer_impl");
- Path combinerPath = new Path(tempOutputPath,".combiner_impl");
-
- CollapsingMapper mapper = new CollapsingMapper();
- CollapsingReducer reducer = new CollapsingReducer();
-
- mapper.setSchemas(spSchemas);
- reducer.setSchemas(spSchemas);
-
- mapper.setMapper(getMapper());
- reducer.setAccumulator(getReducerAccumulator());
- reducer.setRecordMerger(getRecordMerger());
- reducer.setOldRecordMerger(getOldRecordMerger());
-
- mapper.setReuseOutput(_reusePreviousOutput);
- reducer.setReuseOutput(_reusePreviousOutput);
-
- configureOutputDateRange(job.getConfiguration(),planner.getCurrentDateRange(), reducer);
-
- DistributedCacheHelper.writeObject(conf, mapper, mapperPath);
- DistributedCacheHelper.writeObject(conf, reducer, reducerPath);
-
- conf.set(Parameters.REDUCER_IMPL_PATH, reducerPath.toString());
- conf.set(Parameters.MAPPER_IMPL_PATH, mapperPath.toString());
-
- if (isUseCombiner())
- {
- CollapsingCombiner combiner = new CollapsingCombiner();
- configureOutputDateRange(job.getConfiguration(),planner.getCurrentDateRange(), combiner);
- combiner.setReuseOutput(_reusePreviousOutput);
- combiner.setSchemas(spSchemas);
- combiner.setAccumulator(getCombinerAccumulator());
- conf.set(Parameters.COMBINER_IMPL_PATH, combinerPath.toString());
- job.setCombinerClass(DelegatingCombiner.class);
- DistributedCacheHelper.writeObject(conf, combiner, combinerPath);
- }
-
- if (!job.waitForCompletion(true))
- {
- _log.error("Job failed! Quitting...");
- throw new RuntimeException("Job failed");
- }
-
- report.jobId = job.getJobID().toString();
- report.jobName = job.getJobName();
- report.countersPath = job.getCountersPath();
- report.outputPath = outputPath;
-
- _reports.add(report);
-
- applyRetention();
-
- if (!planner.getNeedsAnotherPass())
- {
- break;
- }
-
- cleanup();
-
- iterations++;
- }
- }
-
- /**
- * Removes all but the more recent ouputs that are within the retention period, if one is specified.
- *
- * @throws IOException
- */
- private void applyRetention() throws IOException
- {
- if (getRetentionCount() != null)
- {
- PathUtils.keepLatestDatedPaths(getFileSystem(), getOutputPath(), getRetentionCount());
- }
- }
-
- /**
- * Configures the output date range for processing components.
- *
- * @param conf configuration
- * @param dateRange output date range
- * @param proc processor
- */
- private static void configureOutputDateRange(Configuration conf, DateRange dateRange, DateRangeConfigurable proc)
- {
- Calendar cal = Calendar.getInstance(PathUtils.timeZone);
- long beginTime = 0L;
- long endTime = Long.MAX_VALUE;
-
- if (dateRange.getBeginDate() != null)
- {
- cal.setTime(dateRange.getBeginDate());
- beginTime = cal.getTimeInMillis();
- }
-
- if (dateRange.getEndDate() != null)
- {
- cal.setTime(dateRange.getEndDate());
- cal.getTimeInMillis();
- }
-
- proc.setOutputDateRange(new DateRange(new Date(beginTime),new Date(endTime)));
- }
-
- /**
- * Remove all temporary paths.
- *
- * @throws IOException
- */
- private void cleanup() throws IOException
- {
- if (_garbage != null)
- {
- _garbage.clean();
- }
- }
-
- /**
- * Reports files created and processed for an iteration of the job.
- *
- * @author "Matthew Hayes"
- *
- */
- public static class Report
- {
- private String jobName;
- private String jobId;
- private Path countersPath;
- private DatePath outputPath;
- private List<DatePath> inputFiles = new ArrayList<DatePath>();
- private List<DatePath> oldInputFiles = new ArrayList<DatePath>();
- private DatePath reusedOutput;
-
- /**
- * Gets the job name.
- *
- * @return job name
- */
- public String getJobName()
- {
- return jobName;
- }
-
- /**
- * Gets the job ID.
- *
- * @return job ID
- */
- public String getJobId()
- {
- return jobId;
- }
-
- /**
- * Gets the path to the counters file, if one was written.
- *
- * @return counters path
- */
- public Path getCountersPath()
- {
- return countersPath;
- }
-
- /**
- * Gets the path to the output which was produced by the job.
- *
- * @return output path
- */
- public DatePath getOutputPath()
- {
- return outputPath;
- }
-
- /**
- * Gets the output that was reused, if one was reused.
- *
- * @return reused output path
- */
- public DatePath getReusedOutput()
- {
- return reusedOutput;
- }
-
- /**
- * Gets new input files that were processed. These are files that are within
- * the desired date range.
- *
- * @return input files
- */
- public List<DatePath> getInputFiles()
- {
- return Collections.unmodifiableList(inputFiles);
- }
-
- /**
- * Gets old input files that were processed. These are files that are before
- * the desired date range and were subtracted from the reused output.
- *
- * @return output files
- */
- public List<DatePath> getOldInputFiles()
- {
- return Collections.unmodifiableList(oldInputFiles);
- }
- }
-}