You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ja...@apache.org on 2014/05/18 21:59:45 UTC
[07/14] DATAFU-44: Migrate Hourglass to Gradle
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/test/java/datafu/hourglass/test/jobs/counting/PartitionPreservingIncrementalCountJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/test/java/datafu/hourglass/test/jobs/counting/PartitionPreservingIncrementalCountJob.java b/contrib/hourglass/test/java/datafu/hourglass/test/jobs/counting/PartitionPreservingIncrementalCountJob.java
deleted file mode 100644
index 73215d1..0000000
--- a/contrib/hourglass/test/java/datafu/hourglass/test/jobs/counting/PartitionPreservingIncrementalCountJob.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.test.jobs.counting;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-
-
-import datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob;
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.model.KeyValueCollector;
-import datafu.hourglass.model.Mapper;
-
-public abstract class PartitionPreservingIncrementalCountJob extends AbstractPartitionPreservingIncrementalJob
-{
- public String getIntermediateValueSchemaName()
- {
- return getClass().getName() + "IntermediateValue";
- }
-
- public String getOutputValueSchemaName()
- {
- return getClass().getName() + "OutputValue";
- }
-
- @Override
- public String getOutputSchemaName()
- {
- return getClass().getName() + "Output";
- }
-
- @Override
- public String getOutputSchemaNamespace()
- {
- return getClass().getPackage().getName();
- }
-
- @Override
- public void config(Configuration conf)
- {
- super.config(conf);
- conf.set(AbstractCounter.COUNT_VALUE_SCHEMA_PARAM, getIntermediateValueSchema().toString());
- }
-
- public PartitionPreservingIncrementalCountJob(String name, Properties props) throws IOException
- {
- super(name, props);
- }
-
-
- @Override
- public Accumulator<GenericRecord,GenericRecord> getReducerAccumulator()
- {
- return new CountAccumulator(getOutputValueSchema());
- }
-
- @Override
- public Accumulator<GenericRecord,GenericRecord> getCombinerAccumulator()
- {
- return new CountAccumulator(getIntermediateValueSchema());
- }
-
- @Override
- public Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper()
- {
- return getCounter();
- }
-
- protected abstract AbstractCounter getCounter();
-
- @Override
- protected Schema getIntermediateValueSchema()
- {
- return getCountSchema(getIntermediateValueSchemaName());
- }
-
- @Override
- protected Schema getOutputValueSchema()
- {
- return getCountSchema(getOutputValueSchemaName());
- }
-
- protected Schema getCountSchema(String name)
- {
- Schema s = Schema.createRecord(name, null, getOutputSchemaNamespace(), false);
- List<Field> fields = Arrays.asList(new Field("count", Schema.create(Type.LONG), null, null));
- s.setFields(fields);
- return s;
- }
-
- protected static abstract class AbstractCounter implements Mapper<GenericRecord,GenericRecord,GenericRecord>, Configurable
- {
- private transient Schema _valueSchema;
- private CountWriterImpl writer;
- private Configuration conf;
-
- public static final String COUNT_VALUE_SCHEMA_PARAM = "incremental.count.task.value.schema";
-
- public AbstractCounter()
- {
- _valueSchema = null;
- }
-
- @Override
- public Configuration getConf()
- {
- return this.conf;
- }
-
- @Override
- public void setConf(Configuration conf)
- {
- this.conf = conf;
-
- if (conf != null)
- {
- _valueSchema = new Schema.Parser().parse(conf.get(COUNT_VALUE_SCHEMA_PARAM));
- this.writer = new CountWriterImpl();
- }
- }
-
- protected abstract void count(GenericRecord record, CountWriter writer) throws IOException, InterruptedException;
-
- @Override
- public void map(GenericRecord record, KeyValueCollector<GenericRecord,GenericRecord> context) throws IOException, InterruptedException
- {
- this.writer.setContext(context);
- count(record,this.writer);
- }
-
- private class CountWriterImpl implements CountWriter
- {
- private KeyValueCollector<GenericRecord,GenericRecord> context;
- private GenericRecord value;
-
- public CountWriterImpl()
- {
- value = new GenericData.Record(_valueSchema);
- }
-
- public void setContext(KeyValueCollector<GenericRecord,GenericRecord> context)
- {
- this.context = context;
- }
-
- @Override
- public void count(GenericRecord key) throws IOException, InterruptedException
- {
- count(key,1L);
- }
-
- @Override
- public void count(GenericRecord key, long count) throws IOException, InterruptedException
- {
- value.put("count", count);
- context.collect(key, value);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/test/java/datafu/hourglass/test/util/DailyTrackingWriter.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/test/java/datafu/hourglass/test/util/DailyTrackingWriter.java b/contrib/hourglass/test/java/datafu/hourglass/test/util/DailyTrackingWriter.java
deleted file mode 100644
index 34c05be..0000000
--- a/contrib/hourglass/test/java/datafu/hourglass/test/util/DailyTrackingWriter.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.test.util;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class DailyTrackingWriter
-{
- private final Path _outputPath;
- private final Schema _schema;
- private final FileSystem _fs;
-
- private DataFileWriter<GenericRecord> _dataWriter;
- private OutputStream _outputStream;
-
- public DailyTrackingWriter(Path outputPath, Schema schema, FileSystem fs)
- {
- _outputPath = outputPath;
- _schema = schema;
- _fs = fs;
- }
-
- public void open(int year, int month, int day) throws IOException
- {
- if (_dataWriter != null)
- {
- throw new RuntimeException("Already have data writer");
- }
-
- Path dailyPath = _outputPath;
- Path path = new Path(dailyPath,String.format("%04d/%02d/%02d",year,month,day));
-
- _outputStream = _fs.create(new Path(path, "part-00000.avro"));
-
- GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>();
- _dataWriter = new DataFileWriter<GenericRecord>(writer);
- _dataWriter.create(_schema, _outputStream);
- }
-
- public void append(GenericRecord record) throws IOException
- {
- if (_dataWriter == null)
- {
- throw new RuntimeException("No data writer");
- }
- _dataWriter.append(record);
- }
-
- public void close() throws IOException
- {
- if (_dataWriter == null)
- {
- throw new RuntimeException("No data writer");
- }
- _dataWriter.close();
- _outputStream.close();
- _dataWriter = null;
- _outputStream = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/test/java/datafu/hourglass/test/util/TimestampDataWriter.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/test/java/datafu/hourglass/test/util/TimestampDataWriter.java b/contrib/hourglass/test/java/datafu/hourglass/test/util/TimestampDataWriter.java
deleted file mode 100644
index c9ea8f9..0000000
--- a/contrib/hourglass/test/java/datafu/hourglass/test/util/TimestampDataWriter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.test.util;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class TimestampDataWriter
-{
- private final Path _outputPath;
- private final Schema _schema;
- private final FileSystem _fs;
-
- private DataFileWriter<GenericRecord> _dataWriter;
- private OutputStream _outputStream;
-
- public TimestampDataWriter(Path outputPath, Schema schema, FileSystem fs)
- {
- _outputPath = outputPath;
- _schema = schema;
- _fs = fs;
- }
-
- public void open(int year, int month, int day) throws IOException
- {
- if (_dataWriter != null)
- {
- throw new RuntimeException("Already have data writer");
- }
-
- Path path = new Path(_outputPath,String.format("%04d%02d%02d",year,month,day));
-
- _outputStream = _fs.create(new Path(path, "part-00000.avro"));
-
- GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>();
- _dataWriter = new DataFileWriter<GenericRecord>(writer);
- _dataWriter.create(_schema, _outputStream);
- }
-
- public void append(GenericRecord record) throws IOException
- {
- if (_dataWriter == null)
- {
- throw new RuntimeException("No data writer");
- }
- _dataWriter.append(record);
- }
-
- public void close() throws IOException
- {
- if (_dataWriter == null)
- {
- throw new RuntimeException("No data writer");
- }
- _dataWriter.close();
- _outputStream.close();
- _dataWriter = null;
- _outputStream = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/test_in_background.sh
----------------------------------------------------------------------
diff --git a/contrib/hourglass/test_in_background.sh b/contrib/hourglass/test_in_background.sh
deleted file mode 100755
index a89b531..0000000
--- a/contrib/hourglass/test_in_background.sh
+++ /dev/null
@@ -1,7 +0,0 @@
-#!/usr/bin/env bash
-
-echo "Starting test"
-nohup sh test.sh > test.log &
-sleep 1
-echo "Process ID:"
-cat test.pid
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/.gitignore
----------------------------------------------------------------------
diff --git a/datafu-hourglass/.gitignore b/datafu-hourglass/.gitignore
new file mode 100644
index 0000000..685bfd2
--- /dev/null
+++ b/datafu-hourglass/.gitignore
@@ -0,0 +1 @@
+test-logs/
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/README.md
----------------------------------------------------------------------
diff --git a/datafu-hourglass/README.md b/datafu-hourglass/README.md
new file mode 100644
index 0000000..31d2361
--- /dev/null
+++ b/datafu-hourglass/README.md
@@ -0,0 +1,312 @@
+# DataFu: Hourglass
+
+Hourglass is a framework for incrementally processing partitioned data sets in Hadoop.
+
+## Quick Start Example
+
+Let's walk through a use case where Hourglass is helpful. Suppose that we have a website that tracks a particular event,
+and for each event a member ID is recorded. These events are collected and stored in HDFS in Avro under paths having the
+format /data/event/yyyy/MM/dd. Suppose for this example our Avro schema is:
+
+```json
+{
+ "type" : "record",
+ "name" : "ExampleEvent",
+ "namespace" : "datafu.hourglass.test",
+ "fields" : [ {
+ "name" : "id",
+ "type" : "long",
+ "doc" : "ID"
+ } ]
+}
+```
+
+Suppose that the goal is to count how many times this event has occurred per member over the entire history and produce
+a daily report summarizing these counts. One solution is to simply consume all data under /data/event each day and
+aggregate by member ID. However, most of the data is the same day to day. The only difference is that each day a new
+day of data appears in HDFS. So while this solution works, it is wasteful. Wouldn't it be better if we could merge
+the previous result with the new data? With Hourglass you can.
+
+To continue our example, let's say there are two days of data currently available, 2013/03/15 and 2013/03/16, and that
+their contents are:
+
+```
+2013/03/15:
+{"id": 1}
+{"id": 1}
+{"id": 1}
+{"id": 2}
+{"id": 3}
+{"id": 3}
+
+2013/03/16:
+{"id": 1}
+{"id": 1}
+{"id": 2}
+{"id": 2}
+{"id": 3}
+```
+
+Let's aggregate the counts by member ID using Hourglass. To perform the aggregation we will use `PartitionCollapsingIncrementalJob`,
+which essentially takes a partitioned data set like the one we have and collapses all the partitions together into a single output.
+
+```Java
+PartitionCollapsingIncrementalJob job = new PartitionCollapsingIncrementalJob(Example.class);
+```
+
+Next we will define the schemas for the key and value used by the job. The key affects how data is grouped in the reducer when
+we perform the aggregation. In this case it will be the member ID. The value is the piece of data being aggregated, which will
+be an integer representing the count in this case. Hourglass uses Avro for its data types. Let's define the schemas:
+
+```Java
+final String namespace = "com.example";
+
+final Schema keySchema = Schema.createRecord("Key",null,namespace,false);
+keySchema.setFields(Arrays.asList(new Field("member_id",Schema.create(Type.LONG),null,null)));
+final String keySchemaString = keySchema.toString(true);
+
+final Schema valueSchema = Schema.createRecord("Value",null,namespace,false);
+valueSchema.setFields(Arrays.asList(new Field("count",Schema.create(Type.INT),null,null)));
+final String valueSchemaString = valueSchema.toString(true);
+```
+
+This produces schemas having the following representation:
+
+```json
+{
+ "type" : "record",
+ "name" : "Key",
+ "namespace" : "com.example",
+ "fields" : [ {
+ "name" : "member_id",
+ "type" : "long"
+ } ]
+}
+
+{
+ "type" : "record",
+ "name" : "Value",
+ "namespace" : "com.example",
+ "fields" : [ {
+ "name" : "count",
+ "type" : "int"
+ } ]
+}
+```
+
+Now we can tell the job what our schemas are. Hourglass allows two different value types. One is the intermediate value type
+that is output by the mapper and combiner. The other is the output value type, the output of the reducer. In this case we
+will use the same value type for each.
+
+```Java
+job.setKeySchema(keySchema);
+job.setIntermediateValueSchema(valueSchema);
+job.setOutputValueSchema(valueSchema);
+```
+
+Next we will tell Hourglass where to find the data, where to write the data, and that we want to reuse the previous output.
+
+```Java
+job.setInputPaths(Arrays.asList(new Path("/data/event")));
+job.setOutputPath(new Path("/output"));
+job.setReusePreviousOutput(true);
+```
+
+Now let's get into some application logic. The mapper will produce a key-value pair from each record consisting of
+the member ID and a count, which for each input record will just be `1`.
+
+```Java
+job.setMapper(new Mapper<GenericRecord,GenericRecord,GenericRecord>()
+{
+ private transient Schema kSchema;
+ private transient Schema vSchema;
+
+ @Override
+ public void map(GenericRecord input,
+ KeyValueCollector<GenericRecord, GenericRecord> collector) throws IOException,
+ InterruptedException
+ {
+ if (kSchema == null) kSchema = new Schema.Parser().parse(keySchemaString);
+ if (vSchema == null) vSchema = new Schema.Parser().parse(valueSchemaString);
+ GenericRecord key = new GenericData.Record(kSchema);
+ key.put("member_id", input.get("id"));
+ GenericRecord value = new GenericData.Record(vSchema);
+ value.put("count", 1);
+ collector.collect(key,value);
+ }
+});
+```
+
+An accumulator is responsible for aggregating this data. Records will be grouped by member and then passed to the accumulator
+one-by-one. The accumulator keeps a running count and adds each input count to it. When all data has been passed to it
+the `getFinal()` method will be called, which returns the output record containing the count.
+
+```Java
+job.setReducerAccumulator(new Accumulator<GenericRecord,GenericRecord>()
+{
+ private transient int count;
+ private transient Schema vSchema;
+
+ @Override
+ public void accumulate(GenericRecord value)
+ {
+ this.count += (Integer)value.get("count");
+ }
+
+ @Override
+ public GenericRecord getFinal()
+ {
+ if (vSchema == null) vSchema = new Schema.Parser().parse(valueSchemaString);
+ GenericRecord output = new GenericData.Record(vSchema);
+ output.put("count", count);
+ return output;
+ }
+
+ @Override
+ public void cleanup()
+ {
+ this.count = 0;
+ }
+});
+```
+
+Since the intermediate and output values have the same schema, the accumulator can also be used for the combiner,
+so let's indicate that we want it to be used for that:
+
+```Java
+job.setCombinerAccumulator(job.getReducerAccumulator());
+job.setUseCombiner(true);
+```
+
+Finally, we run the job.
+
+```Java
+job.run();
+```
+
+When we inspect the output we find that the counts match what we expect:
+
+```
+{"key": {"member_id": 1}, "value": {"count": 5}}
+{"key": {"member_id": 2}, "value": {"count": 3}}
+{"key": {"member_id": 3}, "value": {"count": 3}}
+```
+
+Now suppose that a new day of data becomes available:
+
+```
+2013/03/17:
+{"id": 1}
+{"id": 1}
+{"id": 2}
+{"id": 2}
+{"id": 2}
+{"id": 3}
+{"id": 3}
+```
+
+Let's run the job again.
+Since Hourglass already has a result for the previous day, it consumes the new day of input and the previous output, rather
+than all the input data it already processed. The previous output is passed to the accumulator implementation where it is
+aggregated with the new data. This produces the output we expect:
+
+```json
+{"key": {"member_id": 1}, "value": {"count": 7}}
+{"key": {"member_id": 2}, "value": {"count": 6}}
+{"key": {"member_id": 3}, "value": {"count": 5}}
+```
+
+In this example we only have a few days of input data, so the impact of incrementally processing the new data is small.
+However, as the size of the input data grows, the benefit of incrementally processing data becomes very significant.
+
+## Motivation
+
+Data sets that are temporal in nature very often are stored in such a way that each directory corresponds to a separate
+time range. For example, one convention could be to divide the data by day. One benefit of a partitioning scheme such
+as this is that it makes it possible to consume a subset of the data for specific time ranges, instead of consuming
+the entire data set.
+
+Very often computations on data such as this are performed daily over sliding windows. For example, a metric of interest
+may be the last time each member logged into the site. The most straightforward implementation is to consume all login events
+across all days. This is inefficient however since day-to-day the input data is mostly the same.
+A more efficient solution is to merge the previous output with new data since the last run. As a result there is less
+data to process. Another metric of interest may be the number of pages viewed per member over the last 30 days.
+A straightforward implementation is to consume the page view data over the last 30 days each time the job runs.
+However, again, the input data is mostly the same day-to-day.
+Instead, given the previous output of the job, the new output can be produced by adding the new data and subtracting the old data.
+
+Although these incremental jobs are conceptually easy to understand, the implementations can be complex.
+Hourglass defines an easy-to-use programming model and provides jobs for incrementally processing partitioned data as just described.
+It handles the underlying details and complexity of an incremental system so that programmers can focus on
+application logic.
+
+## Capabilities
+
+Hourglass uses Avro for input, intermediate, and output data. Input data must be partitioned by day according to the
+naming convention yyyy/MM/dd. Joining multiple inputs is supported.
+
+Hourglass provides two types of jobs: partition-preserving and partition-collapsing. A *partition-preserving* job
+consumes input data partitioned by day and produces output data partitioned by day. This is equivalent to running a
+MapReduce job for each individual day of input data, but much more efficient. It compares the input data against
+the existing output data and only processes input data with no corresponding output. A *partition-collapsing* job
+consumes input data partitioned by day and produces a single output. What distinguishes this job from a standard
+MapReduce job is that it can reuse the previous output. This enables it to process data much more efficiently.
+Rather than consuming all input data on each run, it can consume only the new data since the previous run and
+merges it with the previous output. Since the partition-preserving job output partitioned data, the two jobs
+can be chained together.
+
+Given these two jobs, processing data over sliding windows can be done much more efficiently. There are two types
+of sliding windows that are of particular interest that can be implemented using Hourglass.
+A *fixed-start* sliding window has a start date that remains the same over multiple runs and an end date that is
+flexible, where the end date advances forward as new input data becomes available.
+A *fixed-length* sliding window has a window length that remains the same over multiple runs and flexible start
+and end dates, where the start and end advance forward as new input data becomes available.
+Hourglass makes defining these sliding windows easy.
+
+An example of a fixed-start sliding window problem is computing the last login time for all members of a website
+using a login event that records the login time. This could be solved efficiently by using a partition-collapsing
+job, which is capable of reusing the previous output and merging it with new login data as it arrives.
+
+An example of a fixed-length sliding window problem is computing the pages viewed per member of the last 30 days
+using a page-view event. This could also be solved efficiently using a partition-collapsing job, which is
+capable of reusing the previous output and merging it with the new page-view data while subtracting off the old
+page-view data.
+
+For some problems it is not possible to subtract off the oldest day of data for a fixed-length sliding window problem.
+For example, suppose the goal is to estimate the distinct number of members who have logged into a website in the last
+30 days using a login event that records the member ID. A HyperLogLog counter could be used to estimate the cardinality.
+The internal data for this counter could be serialized to bytes and stored as output alongside the estimated count.
+However, although multiple HyperLogLog counters can be merged together, they cannot be subtracted or unmerged.
+In other words the operation is not reversible. So a partition-collapsing job by itself could not be used.
+However we could chain together a partition-preserving and partition-collapsing job. The first job would estimate cardinality
+per day and store the value with the counter's byte representation. The second job would merge the per day counters
+together to produce the estimate over the full 30 day window. This makes the computation extremely efficient.
+
+## Programming Model
+
+To implement an incremental job, a developer must specify Avro schemas for the key, intermediate value, and output value types.
+The key and intermediate value types are used for the output of the mapper and an optional combiner. The key and output value
+types are used for the output of the reducer. The input schemas are automatically determined by the job by inspecting the
+input data.
+
+A developer must also define their application logic through interfaces that are based on the MapReduce programming model.
+The mapper is defined through a *Mapper* interface, which given a record produces zero or more key-value pairs.
+The key-value pairs must conform to the key and intermediate value schemas just mentioned.
+
+Reduce is defined through an *Accumulator* interface, which is passed all the records for a given key and then returns
+the final result. Both the combiner and reducer use an Accumulator for the reduce functionality.
+This is similar to the standard reduce function of MapReduce. The key difference is that
+no more than one record can be returned. The input records to the Accumulator are of the intermediate value type;
+the ouput is of the output value type.
+
+If the intermediate and output value types are the same then the Accumulator can
+naturally be used to merge the new input data with the previous output. However if they are different then a class implementing
+the *Merge* interface must be provided. Merge is a a binary operation on two records of the output value type that returns a
+record as a result.
+
+The other case where an implementation of Merge must be provided is when output is reused for a partition-collapsing job
+over a fixed-length sliding window. Merge in this case is used to essentially subtract old output from the current output.
+
+## Contribute
+
+The source code is available under the Apache 2.0 license. Contributions are welcome.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/build.gradle
----------------------------------------------------------------------
diff --git a/datafu-hourglass/build.gradle b/datafu-hourglass/build.gradle
new file mode 100644
index 0000000..9cc2b99
--- /dev/null
+++ b/datafu-hourglass/build.gradle
@@ -0,0 +1,123 @@
+apply plugin: 'java'
+apply plugin: 'license'
+apply plugin: 'maven-publish'
+
+import groovy.xml.MarkupBuilder
+
+buildscript {
+ repositories {
+ mavenCentral()
+ }
+ dependencies {
+ }
+}
+
+cleanEclipse {
+ doLast {
+ delete ".apt_generated"
+ delete ".settings"
+ delete ".factorypath"
+ delete "bin"
+ }
+}
+
+task sourcesJar(type: Jar) {
+ description 'Creates the sources jar'
+
+ classifier = 'sources'
+ from sourceSets.main.allJava
+}
+
+task javadocJar(type: Jar, dependsOn: javadoc) {
+ description 'Creates the javadoc jar'
+
+ classifier = 'javadoc'
+ from javadoc.destinationDir
+}
+
+artifacts {
+ archives sourcesJar
+ archives javadocJar
+ archives jar
+}
+
+// Note: alternate way to publish: https://github.com/Netflix/gradle-template
+
+publishing {
+ publications {
+ mavenJava(MavenPublication) {
+ artifact sourcesJar
+ artifact javadocJar
+ artifact jar
+
+ pom.withXml {
+ asNode().appendNode("packaging","jar")
+ asNode().appendNode("name","Apache DataFu Hourglass")
+ asNode().appendNode("description","A framework for incrementally processing data in Hadoop.")
+ asNode().appendNode("url","http://datafu.incubator.apache.org/")
+
+ def licenseNode = asNode().appendNode("licenses").appendNode("license")
+ licenseNode.appendNode("name","The Apache Software License, Version 2.0")
+ licenseNode.appendNode("url","http://www.apache.org/licenses/LICENSE-2.0.txt")
+ licenseNode.appendNode("distribution","repo")
+
+ def dependenciesNode = asNode().appendNode("dependencies")
+ def dependency = dependenciesNode.appendNode("dependency")
+ dependency.appendNode("groupId","log4j")
+ dependency.appendNode("artifactId","log4j")
+ dependency.appendNode("version","$log4jVersion")
+
+ dependency = dependenciesNode.appendNode("dependency")
+ dependency.appendNode("groupId","org.apache.avro")
+ dependency.appendNode("artifactId","avro")
+ dependency.appendNode("version","$avroVersion")
+
+ dependency = dependenciesNode.appendNode("dependency")
+ dependency.appendNode("groupId","org.apache.avro")
+ dependency.appendNode("artifactId","avro-mapred")
+ dependency.appendNode("version","$avroVersion")
+
+ dependency = dependenciesNode.appendNode("dependency")
+ dependency.appendNode("groupId","org.apache.avro")
+ dependency.appendNode("artifactId","avro-compiler")
+ dependency.appendNode("version","$avroVersion")
+ }
+ }
+ }
+}
+
+// create tasks to automatically add the license header
+license {
+ header rootProject.file('HEADER')
+ skipExistingHeaders = true
+}
+
+dependencies {
+ // core dependencies, listed as dependencies in pom
+ compile "log4j:log4j:$log4jVersion"
+ compile "org.json:json:$jsonVersion"
+ compile "org.apache.avro:avro:$avroVersion"
+ compile "org.apache.avro:avro-mapred:$avroVersion"
+ compile "org.apache.avro:avro-compiler:$avroVersion"
+
+ // needed for compilation and testing, not listed as a dependencies in pom
+ compile "org.apache.hadoop:hadoop-core:$hadoopVersion"
+ compile "org.apache.hadoop:hadoop-tools:$hadoopVersion"
+
+ // needed for testing, not listed as a dependencies in pom
+ testCompile "org.apache.hadoop:hadoop-test:$hadoopVersion"
+ testCompile "com.clearspring.analytics:stream:$streamVersion"
+ testCompile "javax.ws.rs:jsr311-api:$jsr311Version"
+ testCompile "org.slf4j:slf4j-log4j12:$slf4jVersion"
+ testCompile "commons-io:commons-io:$commonsIoVersion"
+ testCompile "org.apache.avro:avro-tools:$avroVersion"
+ testCompile "org.testng:testng:$testngVersion"
+}
+
+
+test {
+ // enable TestNG support (default is JUnit)
+ useTestNG()
+
+ maxHeapSize = "2G"
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/changes.md
----------------------------------------------------------------------
diff --git a/datafu-hourglass/changes.md b/datafu-hourglass/changes.md
new file mode 100644
index 0000000..28c78a0
--- /dev/null
+++ b/datafu-hourglass/changes.md
@@ -0,0 +1,22 @@
+# 0.1.3
+
+Fixes:
+
+* Fix bug reusing output in execution planner. It now subtracts off the correct old inputs.
+* When reusing output, execution planner now considers whether it is better than not reusing.
+
+# 0.1.2
+
+Fixes:
+
+* Correctly load schema for input paths.
+
+# 0.1.1
+
+Additions:
+
+* Jobs now support joins on inconsistent schemas (Issue #67)
+
+# 0.1.0
+
+Initial release.
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/overview.html
----------------------------------------------------------------------
diff --git a/datafu-hourglass/overview.html b/datafu-hourglass/overview.html
new file mode 100644
index 0000000..f976ab9
--- /dev/null
+++ b/datafu-hourglass/overview.html
@@ -0,0 +1,3 @@
+<body>
+A framework for incrementally processing data in Hadoop.
+</body>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroDateRangeMetadata.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroDateRangeMetadata.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroDateRangeMetadata.java
new file mode 100644
index 0000000..0136ee9
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroDateRangeMetadata.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.avro;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import datafu.hourglass.fs.DateRange;
+import datafu.hourglass.fs.PathUtils;
+
+/**
+ * Manages the storage and retrieval of date ranges in the metadata of Avro files.
+ * This is used by {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob} so that when reusing previous
+ * output it can determine the date range the data corresponds to.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class AvroDateRangeMetadata
+{
+ public static String METADATA_DATE_START = "hourglass.date.start";
+ public static String METADATA_DATE_END = "hourglass.date.end";
+
+ /**
+ * Reads the date range from the metadata stored in an Avro file.
+ *
+ * @param fs file system to access path
+ * @param path path to get date range for
+ * @return date range
+ * @throws IOException
+ */
+ public static DateRange getOutputFileDateRange(FileSystem fs, Path path) throws IOException
+ {
+ path = fs.listStatus(path, PathUtils.nonHiddenPathFilter)[0].getPath();
+ FSDataInputStream dataInputStream = fs.open(path);
+ DatumReader <GenericRecord> reader = new GenericDatumReader<GenericRecord>();
+ DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(dataInputStream, reader);
+
+ try
+ {
+ return new DateRange(new Date(Long.parseLong(dataFileStream.getMetaString(METADATA_DATE_START))),
+ new Date(Long.parseLong(dataFileStream.getMetaString(METADATA_DATE_END))));
+ }
+ finally
+ {
+ dataFileStream.close();
+ dataInputStream.close();
+ }
+ }
+
+ /**
+ * Updates the Hadoop configuration so that the Avro files which are written have date range
+ * information stored in the metadata. This should be used in conjunction with
+ * {@link AvroKeyValueWithMetadataRecordWriter}.
+ *
+ * @param conf configuration to store date range in
+ * @param dateRange date range
+ */
+ public static void configureOutputDateRange(Configuration conf, DateRange dateRange)
+ {
+ // store the date range in the output file's metadata
+ conf.set(AvroKeyValueWithMetadataRecordWriter.TEXT_PREFIX + METADATA_DATE_START, Long.toString(dateRange.getBeginDate().getTime()));
+ conf.set(AvroKeyValueWithMetadataRecordWriter.TEXT_PREFIX + METADATA_DATE_END, Long.toString(dateRange.getEndDate().getTime()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyValueWithMetadataOutputFormat.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyValueWithMetadataOutputFormat.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyValueWithMetadataOutputFormat.java
new file mode 100644
index 0000000..4574c69
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyValueWithMetadataOutputFormat.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package datafu.hourglass.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.hadoop.io.AvroDatumConverter;
+import org.apache.avro.hadoop.io.AvroDatumConverterFactory;
+import org.apache.avro.mapreduce.AvroOutputFormatBase;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * FileOutputFormat for writing Avro container files of key/value pairs.
+ *
+ * <p>Since Avro container files can only contain records (not key/value pairs), this
+ * output format puts the key and value into an Avro generic record with two fields, named
+ * 'key' and 'value'.</p>
+ *
+ * <p>The keys and values given to this output format may be Avro objects wrapped in
+ * <code>AvroKey</code> or <code>AvroValue</code> objects. The basic Writable types are
+ * also supported (e.g., IntWritable, Text); they will be converted to their corresponding
+ * Avro types.</p>
+ *
+ * @param <K> The type of key. If an Avro type, it must be wrapped in an <code>AvroKey</code>.
+ * @param <V> The type of value. If an Avro type, it must be wrapped in an <code>AvroValue</code>.
+ */
+public class AvroKeyValueWithMetadataOutputFormat<K, V> extends AvroOutputFormatBase<K, V> {
+ /** {@inheritDoc} */
+ @Override
+ @SuppressWarnings("unchecked")
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException {
+ AvroDatumConverterFactory converterFactory = new AvroDatumConverterFactory(
+ context.getConfiguration());
+
+ AvroDatumConverter<K, ?> keyConverter = converterFactory.create(
+ (Class<K>) context.getOutputKeyClass());
+ AvroDatumConverter<V, ?> valueConverter = converterFactory.create(
+ (Class<V>) context.getOutputValueClass());
+
+ return new AvroKeyValueWithMetadataRecordWriter<K, V>(keyConverter, valueConverter,
+ getCompressionCodec(context), getAvroFileOutputStream(context), context.getConfiguration());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyValueWithMetadataRecordWriter.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyValueWithMetadataRecordWriter.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyValueWithMetadataRecordWriter.java
new file mode 100644
index 0000000..4511d24
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyValueWithMetadataRecordWriter.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package datafu.hourglass.avro;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map.Entry;
+
+import org.apache.avro.Schema;
+import org.apache.avro.hadoop.io.AvroKeyValue;
+import org.apache.avro.hadoop.io.AvroDatumConverter;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Writes key/value pairs to an Avro container file.
+ *
+ * <p>Each entry in the Avro container file will be a generic record with two fields,
+ * named 'key' and 'value'. The input types may be basic Writable objects like Text or
+ * IntWritable, or they may be AvroWrapper subclasses (AvroKey or AvroValue). Writable
+ * objects will be converted to their corresponding Avro types when written to the generic
+ * record key/value pair.</p>
+ *
+ * @param <K> The type of key to write.
+ * @param <V> The type of value to write.
+ */
+public class AvroKeyValueWithMetadataRecordWriter<K, V> extends RecordWriter<K, V> {
+ /** A writer for the Avro container file. */
+ private final DataFileWriter<GenericRecord> mAvroFileWriter;
+
+ /** The writer schema for the generic record entries of the Avro container file. */
+ private final Schema mKeyValuePairSchema;
+
+ /** A reusable Avro generic record for writing key/value pairs to the file. */
+ private final AvroKeyValue<Object, Object> mOutputRecord;
+
+ /** A helper object that converts the input key to an Avro datum. */
+ private final AvroDatumConverter<K, ?> mKeyConverter;
+
+ /** A helper object that converts the input value to an Avro datum. */
+ private final AvroDatumConverter<V, ?> mValueConverter;
+
+ /** The configuration key prefix for a text output metadata. */
+ public static final String TEXT_PREFIX = "avro.meta.text.";
+
+ public AvroKeyValueWithMetadataRecordWriter(AvroDatumConverter<K, ?> keyConverter,
+ AvroDatumConverter<V, ?> valueConverter, CodecFactory compressionCodec,
+ OutputStream outputStream, Configuration conf) throws IOException {
+ // Create the generic record schema for the key/value pair.
+ mKeyValuePairSchema = AvroKeyValue.getSchema(
+ keyConverter.getWriterSchema(), valueConverter.getWriterSchema());
+
+ // Create an Avro container file and a writer to it.
+ mAvroFileWriter = new DataFileWriter<GenericRecord>(
+ new ReflectDatumWriter<GenericRecord>(mKeyValuePairSchema));
+ mAvroFileWriter.setCodec(compressionCodec);
+
+ for (Entry<String,String> e : conf)
+ {
+ if (e.getKey().startsWith(TEXT_PREFIX))
+ mAvroFileWriter.setMeta(e.getKey().substring(TEXT_PREFIX.length()),
+ e.getValue());
+ }
+
+ mAvroFileWriter.create(mKeyValuePairSchema, outputStream);
+
+ // Keep a reference to the converters.
+ mKeyConverter = keyConverter;
+ mValueConverter = valueConverter;
+
+ // Create a reusable output record.
+ mOutputRecord = new AvroKeyValue<Object, Object>(new GenericData.Record(mKeyValuePairSchema));
+ }
+
+ /**
+ * Gets the writer schema for the key/value pair generic record.
+ *
+ * @return The writer schema used for entries of the Avro container file.
+ */
+ public Schema getWriterSchema() {
+ return mKeyValuePairSchema;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(K key, V value) throws IOException {
+ mOutputRecord.setKey(mKeyConverter.convert(key));
+ mOutputRecord.setValue(mValueConverter.convert(value));
+ mAvroFileWriter.append(mOutputRecord.get());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close(TaskAttemptContext context) throws IOException {
+ mAvroFileWriter.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyWithMetadataOutputFormat.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyWithMetadataOutputFormat.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyWithMetadataOutputFormat.java
new file mode 100644
index 0000000..b37c559
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyWithMetadataOutputFormat.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package datafu.hourglass.avro;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroOutputFormatBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * FileOutputFormat for writing Avro container files.
+ *
+ * <p>Since Avro container files only contain records (not key/value pairs), this output
+ * format ignores the value.</p>
+ *
+ * @param <T> The (java) type of the Avro data to write.
+ */
+public class AvroKeyWithMetadataOutputFormat<T> extends AvroOutputFormatBase<AvroKey<T>, NullWritable> {
+ /** A factory for creating record writers. */
+ @SuppressWarnings("rawtypes")
+ private final RecordWriterFactory mRecordWriterFactory;
+
+ /**
+ * Constructor.
+ */
+ @SuppressWarnings("rawtypes")
+ public AvroKeyWithMetadataOutputFormat() {
+ this(new RecordWriterFactory());
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param recordWriterFactory A factory for creating record writers.
+ */
+ protected AvroKeyWithMetadataOutputFormat(@SuppressWarnings("rawtypes") RecordWriterFactory recordWriterFactory) {
+ mRecordWriterFactory = recordWriterFactory;
+ }
+
+ /**
+ * A factory for creating record writers.
+ *
+ * @param <T> The java type of the avro record to write.
+ */
+ protected static class RecordWriterFactory<T> {
+ /**
+ * Creates a new record writer instance.
+ *
+ * @param writerSchema The writer schema for the records to write.
+ * @param compressionCodec The compression type for the writer file.
+ * @param outputStream The target output stream for the records.
+ */
+ protected RecordWriter<AvroKey<T>, NullWritable> create(
+ Schema writerSchema, CodecFactory compressionCodec, OutputStream outputStream, Configuration conf)
+ throws IOException {
+ return new AvroKeyWithMetadataRecordWriter<T>(writerSchema, compressionCodec, outputStream, conf);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ @SuppressWarnings("unchecked")
+ public RecordWriter<AvroKey<T>, NullWritable> getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ // Get the writer schema.
+ Schema writerSchema = AvroJob.getOutputKeySchema(context.getConfiguration());
+ if (null == writerSchema) {
+ throw new IOException(
+ "AvroKeyOutputFormat requires an output schema. Use AvroJob.setOutputKeySchema().");
+ }
+
+ return mRecordWriterFactory.create(
+ writerSchema, getCompressionCodec(context), getAvroFileOutputStream(context), context.getConfiguration());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyWithMetadataRecordWriter.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyWithMetadataRecordWriter.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyWithMetadataRecordWriter.java
new file mode 100644
index 0000000..0e61d87
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroKeyWithMetadataRecordWriter.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package datafu.hourglass.avro;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map.Entry;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Writes Avro records to an Avro container file output stream.
+ *
+ * @param <T> The Java type of the Avro data to write.
+ */
+public class AvroKeyWithMetadataRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable> {
+ /** A writer for the Avro container file. */
+ private final DataFileWriter<T> mAvroFileWriter;
+
+ /** The configuration key prefix for a text output metadata. */
+ public static final String TEXT_PREFIX = "avro.meta.text.";
+
+ /**
+ * Constructor.
+ *
+ * @param writerSchema The writer schema for the records in the Avro container file.
+ * @param compressionCodec A compression codec factory for the Avro container file.
+ * @param outputStream The output stream to write the Avro container file to.
+ * @throws IOException If the record writer cannot be opened.
+ */
+ public AvroKeyWithMetadataRecordWriter(Schema writerSchema, CodecFactory compressionCodec,
+ OutputStream outputStream, Configuration conf) throws IOException {
+ // Create an Avro container file and a writer to it.
+ mAvroFileWriter = new DataFileWriter<T>(new ReflectDatumWriter<T>(writerSchema));
+ mAvroFileWriter.setCodec(compressionCodec);
+
+ for (Entry<String,String> e : conf)
+ {
+ if (e.getKey().startsWith(TEXT_PREFIX))
+ mAvroFileWriter.setMeta(e.getKey().substring(TEXT_PREFIX.length()),
+ e.getValue());
+ }
+
+ mAvroFileWriter.create(writerSchema, outputStream);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(AvroKey<T> record, NullWritable ignore) throws IOException {
+ mAvroFileWriter.append(record.datum());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close(TaskAttemptContext context) throws IOException {
+ mAvroFileWriter.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroMultipleInputsKeyInputFormat.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroMultipleInputsKeyInputFormat.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroMultipleInputsKeyInputFormat.java
new file mode 100644
index 0000000..733826f
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroMultipleInputsKeyInputFormat.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroKeyRecordReader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * A MapReduce InputFormat that can handle Avro container files and multiple inputs.
+ * The input schema is determine based on the split. The mapping from input path
+ * to schema is stored in the job configuration.
+ *
+ * <p>Keys are AvroKey wrapper objects that contain the Avro data. Since Avro
+ * container files store only records (not key/value pairs), the value from
+ * this InputFormat is a NullWritable.</p>
+ */
+public class AvroMultipleInputsKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable>
+{
+ /** {@inheritDoc} */
+ @Override
+ public RecordReader<AvroKey<T>, NullWritable> createRecordReader(
+ InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ Schema readerSchema = AvroMultipleInputsUtil.getInputKeySchemaForSplit(context.getConfiguration(), split);
+ if (readerSchema == null)
+ {
+ throw new RuntimeException("Could not determine input schema");
+ }
+ return new AvroKeyRecordReader<T>(readerSchema);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroMultipleInputsUtil.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroMultipleInputsUtil.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroMultipleInputsUtil.java
new file mode 100644
index 0000000..8afe192
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/AvroMultipleInputsUtil.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.avro;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.log4j.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Helper methods for dealing with multiple Avro input schemas. A mapping is stored in the configuration
+ * that maps each input path to its corresponding schema. Methods in this class help with loading and
+ * storing these schema mappings.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class AvroMultipleInputsUtil
+{
+ private static final Logger _log = Logger.getLogger(AvroMultipleInputsUtil.class);
+
+ private static final String CONF_INPUT_KEY_SCHEMA = "avro.schema.multiple.inputs.keys";
+
+ /**
+ * Gets the schema for a particular input split.
+ *
+ * @param conf configuration to get schema from
+ * @param split input split to get schema for
+ * @return schema
+ */
+ public static Schema getInputKeySchemaForSplit(Configuration conf, InputSplit split)
+ {
+ String path = ((FileSplit)split).getPath().toString();
+ _log.info("Determining schema for input path " + path);
+ JSONObject schemas;
+ try
+ {
+ schemas = getInputSchemas(conf);
+ }
+ catch (JSONException e1)
+ {
+ throw new RuntimeException(e1);
+ }
+ Schema schema = null;
+ if (schemas != null)
+ {
+ for (String key : JSONObject.getNames(schemas))
+ {
+ _log.info("Checking against " + key);
+ if (path.startsWith(key))
+ {
+ try
+ {
+ schema = new Schema.Parser().parse(schemas.getString(key));
+ _log.info("Input schema found: " + schema.toString(true));
+ break;
+ }
+ catch (JSONException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+ if (schema == null)
+ {
+ _log.info("Could not determine input schema");
+ }
+ return schema;
+ }
+
+ /**
+ * Sets the job input key schema for a path.
+ *
+ * @param job The job to configure.
+ * @param schema The input key schema.
+ * @param path the path to set the schema for
+ */
+ public static void setInputKeySchemaForPath(Job job, Schema schema, String path)
+ {
+ JSONObject schemas;
+ try
+ {
+ schemas = getInputSchemas(job.getConfiguration());
+ schemas.put(path, schema.toString());
+ }
+ catch (JSONException e)
+ {
+ throw new RuntimeException(e);
+ }
+ job.getConfiguration().set(CONF_INPUT_KEY_SCHEMA, schemas.toString());
+ }
+
+ /**
+ * Get a mapping from path to input schema.
+ *
+ * @param conf
+ * @return mapping from path to input schem
+ * @throws JSONException
+ */
+ private static JSONObject getInputSchemas(Configuration conf) throws JSONException
+ {
+ JSONObject schemas;
+
+ String schemasJson = conf.get(CONF_INPUT_KEY_SCHEMA);
+
+ if (schemasJson == null)
+ {
+ schemas = new JSONObject();
+ }
+ else
+ {
+ schemas = new JSONObject(schemasJson);
+ }
+
+ return schemas;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/CombinedAvroKeyInputFormat.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/CombinedAvroKeyInputFormat.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/CombinedAvroKeyInputFormat.java
new file mode 100644
index 0000000..e6beae2
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/CombinedAvroKeyInputFormat.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyRecordReader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.log4j.Logger;
+
+import datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob;
+
+/**
+ * A combined input format for reading Avro data.
+ *
+ * @author "Matthew Hayes"
+ *
+ * @param <T> Type of data to be read
+ */
+public class CombinedAvroKeyInputFormat<T> extends CombineFileInputFormat<AvroKey<T>, NullWritable>
+{
+ private final Logger LOG = Logger.getLogger(AbstractPartitionPreservingIncrementalJob.class);
+
+ public static class CombinedAvroKeyRecordReader<T> extends AvroKeyRecordReader<T>
+ {
+ private CombineFileSplit inputSplit;
+ private Integer idx;
+
+ public CombinedAvroKeyRecordReader(CombineFileSplit inputSplit, TaskAttemptContext context, Integer idx)
+ {
+ super(AvroJob.getInputKeySchema(context.getConfiguration()));
+ this.inputSplit = inputSplit;
+ this.idx = idx;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
+ InterruptedException
+ {
+ this.inputSplit = (CombineFileSplit)inputSplit;
+
+ FileSplit fileSplit = new FileSplit(this.inputSplit.getPath(idx),
+ this.inputSplit.getOffset(idx),
+ this.inputSplit.getLength(idx),
+ this.inputSplit.getLocations());
+
+ super.initialize(fileSplit, context);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public RecordReader<AvroKey<T>, NullWritable> createRecordReader(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException
+ {
+ Schema readerSchema = AvroJob.getInputKeySchema(context.getConfiguration());
+ if (null == readerSchema) {
+ LOG.warn("Reader schema was not set. Use AvroJob.setInputKeySchema() if desired.");
+ LOG.info("Using a reader schema equal to the writer schema.");
+ }
+
+ Object c = CombinedAvroKeyRecordReader.class;
+ return new CombineFileRecordReader<AvroKey<T>, NullWritable>((CombineFileSplit) inputSplit, context, (Class<? extends RecordReader<AvroKey<T>, NullWritable>>)c);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/avro/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/avro/package-info.java b/datafu-hourglass/src/main/java/datafu/hourglass/avro/package-info.java
new file mode 100644
index 0000000..2ae23ab
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/avro/package-info.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Input and output formats for using Avro in incremental Hadoop jobs.
+ * These are used internally by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob}
+ * and {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob}.
+ */
+package datafu.hourglass.avro;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/fs/DatePath.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/fs/DatePath.java b/datafu-hourglass/src/main/java/datafu/hourglass/fs/DatePath.java
new file mode 100644
index 0000000..8229574
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/fs/DatePath.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.fs;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Represents a path and the corresponding date that is associated with it.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class DatePath implements Comparable<DatePath>
+{
+ private static final SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyyMMddHHmmss");
+
+ private final Date date;
+ private final Path path;
+
+ static
+ {
+ timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ }
+
+ public DatePath(Date date, Path path)
+ {
+ this.date = date;
+ this.path = path;
+ }
+
+ public Date getDate() { return this.date; }
+ public Path getPath() { return this.path; }
+
+ public static DatePath createDatedPath(Path parent, Date date)
+ {
+ return new DatePath(date,new Path(parent,PathUtils.datedPathFormat.format(date)));
+ }
+
+ public static DatePath createNestedDatedPath(Path parent, Date date)
+ {
+ return new DatePath(date,new Path(parent,PathUtils.nestedDatedPathFormat.format(date)));
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("[date=%s, path=%s]",timestampFormat.format(this.date), this.path.toString());
+ }
+
+ @Override
+ public int compareTo(DatePath o)
+ {
+ return this.date.compareTo(o.date);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((date == null) ? 0 : date.hashCode());
+ result = prime * result + ((path == null) ? 0 : path.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ DatePath other = (DatePath) obj;
+ if (date == null)
+ {
+ if (other.date != null)
+ return false;
+ }
+ else if (!date.equals(other.date))
+ return false;
+ if (path == null)
+ {
+ if (other.path != null)
+ return false;
+ }
+ else if (!path.equals(other.path))
+ return false;
+ return true;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/fs/DateRange.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/fs/DateRange.java b/datafu-hourglass/src/main/java/datafu/hourglass/fs/DateRange.java
new file mode 100644
index 0000000..3cac627
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/fs/DateRange.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.fs;
+
+import java.util.Date;
+
+/**
+ * A date range, consisting of a start and end date.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class DateRange
+{
+ private final Date _beginDate;
+ private final Date _endDate;
+
+ public DateRange(Date beginDate, Date endDate)
+ {
+ _beginDate = beginDate;
+ _endDate = endDate;
+ }
+
+ public Date getBeginDate()
+ {
+ return _beginDate;
+ }
+
+ public Date getEndDate()
+ {
+ return _endDate;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/fs/PathUtils.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/fs/PathUtils.java b/datafu-hourglass/src/main/java/datafu/hourglass/fs/PathUtils.java
new file mode 100644
index 0000000..25e66ea
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/fs/PathUtils.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.hourglass.fs;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.log4j.Logger;
+
+
+/**
+ * A collection of utility methods for dealing with files and paths.
+ *
+ * @author "Matthew Hayes"
+ *
+ */
+public class PathUtils
+{
+ private static Logger _log = Logger.getLogger(PathUtils.class);
+
+ public final static TimeZone timeZone = TimeZone.getTimeZone("UTC");
+ public static final SimpleDateFormat datedPathFormat = new SimpleDateFormat("yyyyMMdd");
+ public static final SimpleDateFormat nestedDatedPathFormat = new SimpleDateFormat("yyyy/MM/dd");
+ private static final Pattern timestampPathPattern = Pattern.compile(".+/(\\d{8})");
+ private static final Pattern dailyPathPattern = Pattern.compile("(.+)/(\\d{4}/\\d{2}/\\d{2})");
+
+ /**
+ * Filters out paths starting with "." and "_".
+ */
+ public static final PathFilter nonHiddenPathFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path path)
+ {
+ String s = path.getName().toString();
+ return !s.startsWith(".") && !s.startsWith("_");
+ }
+ };
+
+ static
+ {
+ datedPathFormat.setTimeZone(timeZone);
+ nestedDatedPathFormat.setTimeZone(timeZone);
+ }
+
+ /**
+ * Delete all but the last N days of paths matching the "yyyyMMdd" format.
+ *
+ * @param fs
+ * @param path
+ * @param retentionCount
+ * @throws IOException
+ */
+ public static void keepLatestDatedPaths(FileSystem fs, Path path, int retentionCount) throws IOException
+ {
+ LinkedList<DatePath> outputs = new LinkedList<DatePath>(PathUtils.findDatedPaths(fs, path));
+
+ while (outputs.size() > retentionCount)
+ {
+ DatePath toDelete = outputs.removeFirst();
+ _log.info(String.format("Removing %s",toDelete.getPath()));
+ fs.delete(toDelete.getPath(),true);
+ }
+ }
+
+ /**
+ * Delete all but the last N days of paths matching the "yyyy/MM/dd" format.
+ *
+ * @param fs
+ * @param path
+ * @param retentionCount
+ * @throws IOException
+ */
+ public static void keepLatestNestedDatedPaths(FileSystem fs, Path path, int retentionCount) throws IOException
+ {
+ List<DatePath> outputPaths = PathUtils.findNestedDatedPaths(fs, path);
+
+ if (outputPaths.size() > retentionCount)
+ {
+ Collections.sort(outputPaths);
+ _log.info(String.format("Applying retention range policy"));
+ for (DatePath output : outputPaths.subList(0, outputPaths.size() - retentionCount))
+ {
+ _log.info(String.format("Removing %s because it is outside retention range",output.getPath()));
+ fs.delete(output.getPath(),true);
+ }
+ }
+ }
+
+ /**
+ * List all paths matching the "yyyy/MM/dd" format under a given path.
+ *
+ * @param fs file system
+ * @param input path to search under
+ * @return paths
+ * @throws IOException
+ */
+ public static List<DatePath> findNestedDatedPaths(FileSystem fs, Path input) throws IOException
+ {
+ List<DatePath> inputDates = new ArrayList<DatePath>();
+
+ FileStatus[] pathsStatus = fs.globStatus(new Path(input,"*/*/*"), nonHiddenPathFilter);
+
+ if (pathsStatus == null)
+ {
+ return inputDates;
+ }
+
+ for (FileStatus pathStatus : pathsStatus)
+ {
+ Matcher matcher = dailyPathPattern.matcher(pathStatus.getPath().toString());
+ if (matcher.matches())
+ {
+ String datePath = matcher.group(2);
+ Date date;
+ try
+ {
+ date = nestedDatedPathFormat.parse(datePath);
+ }
+ catch (ParseException e)
+ {
+ continue;
+ }
+
+ Calendar cal = Calendar.getInstance(timeZone);
+
+ cal.setTimeInMillis(date.getTime());
+
+ inputDates.add(new DatePath(cal.getTime(), pathStatus.getPath()));
+ }
+ }
+
+ return inputDates;
+ }
+
+ /**
+ * List all paths matching the "yyyyMMdd" format under a given path.
+ *
+ * @param fs file system
+ * @param path path to search under
+ * @return paths
+ * @throws IOException
+ */
+ public static List<DatePath> findDatedPaths(FileSystem fs, Path path) throws IOException
+ {
+ FileStatus[] outputPaths = fs.listStatus(path, nonHiddenPathFilter);
+
+ List<DatePath> outputs = new ArrayList<DatePath>();
+
+ if (outputPaths != null)
+ {
+ for (FileStatus outputPath : outputPaths)
+ {
+ Date date;
+ try
+ {
+ date = datedPathFormat.parse(outputPath.getPath().getName());
+ }
+ catch (ParseException e)
+ {
+ continue;
+ }
+
+ outputs.add(new DatePath(date,outputPath.getPath()));
+ }
+ }
+
+ Collections.sort(outputs);
+
+ return outputs;
+ }
+
+ /**
+ * Gets the schema from a given Avro data file.
+ *
+ * @param fs
+ * @param path
+ * @return The schema read from the data file's metadata.
+ * @throws IOException
+ */
+ public static Schema getSchemaFromFile(FileSystem fs, Path path) throws IOException
+ {
+ FSDataInputStream dataInputStream = fs.open(path);
+ DatumReader <GenericRecord> reader = new GenericDatumReader<GenericRecord>();
+ DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(dataInputStream, reader);
+ try
+ {
+ return dataFileStream.getSchema();
+ }
+ finally
+ {
+ dataFileStream.close();
+ }
+ }
+
+ /**
+ * Gets the schema for the first Avro file under the given path.
+ *
+ * @param path path to fetch schema for
+ * @return Avro schema
+ * @throws IOException
+ */
+ public static Schema getSchemaFromPath(FileSystem fs, Path path) throws IOException
+ {
+ return getSchemaFromFile(fs,fs.listStatus(path, nonHiddenPathFilter)[0].getPath());
+ }
+
+ /**
+ * Sums the size of all files listed under a given path.
+ *
+ * @param fs file system
+ * @param path path to count bytes for
+ * @return total bytes under path
+ * @throws IOException
+ */
+ public static long countBytes(FileSystem fs, Path path) throws IOException
+ {
+ FileStatus[] files = fs.listStatus(path, nonHiddenPathFilter);
+ long totalForPath = 0L;
+ for (FileStatus file : files)
+ {
+ totalForPath += file.getLen();
+ }
+ return totalForPath;
+ }
+
+ /**
+ * Gets the date for a path in the "yyyyMMdd" format.
+ *
+ * @param path path to check
+ * @return date for path
+ */
+ public static Date getDateForDatedPath(Path path)
+ {
+ Matcher matcher = timestampPathPattern.matcher(path.toString());
+
+ if (!matcher.matches())
+ {
+ throw new RuntimeException("Unexpected input filename: " + path);
+ }
+
+ try
+ {
+ return PathUtils.datedPathFormat.parse(matcher.group(1));
+ }
+ catch (ParseException e)
+ {
+ throw new RuntimeException("Unexpected input filename: " + path);
+ }
+ }
+
+ /**
+ * Gets the date for a path in the "yyyy/MM/dd" format.
+ *
+ * @param path path to check
+ * @return date
+ */
+ public static Date getDateForNestedDatedPath(Path path)
+ {
+ Matcher matcher = dailyPathPattern.matcher(path.toString());
+
+ if (!matcher.matches())
+ {
+ throw new RuntimeException("Unexpected input filename: " + path);
+ }
+
+ try
+ {
+ return PathUtils.nestedDatedPathFormat.parse(matcher.group(2));
+ }
+ catch (ParseException e)
+ {
+ throw new RuntimeException("Unexpected input filename: " + path);
+ }
+ }
+
+ /**
+ * Gets the root path for a path in the "yyyy/MM/dd" format. This is part of the path preceding the
+ * "yyyy/MM/dd" portion.
+ *
+ * @param path
+ * @return
+ */
+ public static Path getNestedPathRoot(Path path)
+ {
+ Matcher matcher = dailyPathPattern.matcher(path.toString());
+
+ if (!matcher.matches())
+ {
+ throw new RuntimeException("Unexpected input filename: " + path);
+ }
+
+ return new Path(matcher.group(1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/datafu-hourglass/src/main/java/datafu/hourglass/fs/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/fs/package-info.java b/datafu-hourglass/src/main/java/datafu/hourglass/fs/package-info.java
new file mode 100644
index 0000000..f1fcdaf
--- /dev/null
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/fs/package-info.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Classes for working with the file system.
+ * These are used internally by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob}
+ * and {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob}.
+ */
+package datafu.hourglass.fs;
\ No newline at end of file