You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ja...@apache.org on 2014/05/18 21:59:48 UTC

[10/14] DATAFU-44: Migrate Hourglass to Gradle

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingCombiner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingCombiner.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingCombiner.java
deleted file mode 100644
index 03d36f3..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingCombiner.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Reducer;
-
-/**
- * A Hadoop combiner which delegates to an implementation read from the distributed cache.
- * 
- * @author "Matthew Hayes"
- *
- */
-public class DelegatingCombiner extends Reducer<Object, Object, Object, Object> 
-{
-  private ObjectReducer processor;
-  
-  @Override
-  protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException
-  {
-    super.setup(context);
-    
-    if (context != null)
-    {
-      Configuration conf = context.getConfiguration();
-      
-      String path = conf.get(Parameters.COMBINER_IMPL_PATH);
-      
-      this.processor = (ObjectReducer)DistributedCacheHelper.readObject(conf, new Path(path));
-      this.processor.setContext(context);
-    }
-  }
-  
-  @Override
-  protected void reduce(Object key, Iterable<Object> values, Context context) throws java.io.IOException, java.lang.InterruptedException
-  {
-    this.processor.reduce(key, values, context);
-  }
-
-  @Override
-  protected void cleanup(Context context) throws IOException,
-      InterruptedException
-  {
-    this.processor.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingMapper.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingMapper.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingMapper.java
deleted file mode 100644
index d170cf5..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingMapper.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- * A Hadoop mapper which delegates to an implementation read from the distributed cache.
- * 
- * @author "Matthew Hayes"
- *
- */
-public class DelegatingMapper extends Mapper<Object, Object, Object, Object> 
-{
-  private ObjectMapper processor;
-  
-  @Override
-  protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException
-  {
-    super.setup(context);
-    
-    if (context != null)
-    {
-      Configuration conf = context.getConfiguration();
-      
-      String path = conf.get(Parameters.MAPPER_IMPL_PATH);
-      
-      this.processor = (ObjectMapper)DistributedCacheHelper.readObject(conf, new Path(path));
-      this.processor.setContext(context);
-    }
-  }
-  
-  @Override
-  protected void map(Object key, Object value, Context context) throws java.io.IOException, java.lang.InterruptedException
-  {
-    this.processor.map(key, context);
-  }
-
-  @Override
-  protected void cleanup(Context context) throws IOException,
-      InterruptedException
-  {
-    this.processor.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingReducer.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingReducer.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingReducer.java
deleted file mode 100644
index 4ddf918..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingReducer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Reducer;
-
-/**
- * A Hadoop reducer which delegates to an implementation read from the distributed cache.
- * 
- * @author "Matthew Hayes"
- *
- */
-public class DelegatingReducer extends Reducer<Object, Object, Object, Object> 
-{
-  private ObjectReducer processor;
-  
-  @Override
-  protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException
-  {
-    super.setup(context);
-    
-    if (context != null)
-    {
-      Configuration conf = context.getConfiguration();
-      
-      String path = conf.get(Parameters.REDUCER_IMPL_PATH);
-      
-      this.processor = (ObjectReducer)DistributedCacheHelper.readObject(conf, new Path(path));
-      this.processor.setContext(context);
-    }
-  }
-  
-  @Override
-  protected void reduce(Object key, Iterable<Object> values, Context context) throws java.io.IOException, java.lang.InterruptedException
-  {
-    this.processor.reduce(key, values, context);
-  }
-
-  @Override
-  protected void cleanup(Context context) throws IOException,
-      InterruptedException
-  {
-    this.processor.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java
deleted file mode 100644
index bdb626a..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Methods for working with the Hadoop distributed cache.
- * 
- * @author "Matthew Hayes"
- *
- */
-public class DistributedCacheHelper 
-{
-  /**
-   * Deserializes an object from a path in HDFS.
-   * 
-   * @param conf Hadoop configuration
-   * @param path Path to deserialize from
-   * @return Deserialized object
-   * @throws IOException
-   */
-  public static Object readObject(Configuration conf, org.apache.hadoop.fs.Path path) throws IOException
-  {
-    String localPath = null;
-    Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(conf);
-    for (Path localCacheFile : localCacheFiles)
-    {
-      if (localCacheFile.toString().endsWith(path.toString()))
-      {
-        localPath = localCacheFile.toString();
-        break;
-      }
-    }
-    if (localPath == null)
-    {
-      throw new RuntimeException("Could not find " + path + " in local cache");
-    }
-    FileInputStream inputStream = new FileInputStream(new File(localPath));
-    ObjectInputStream objStream = new ObjectInputStream(inputStream);
-    
-    try
-    {
-      try {
-        return objStream.readObject();
-      } catch (ClassNotFoundException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    finally
-    {
-      objStream.close();
-      inputStream.close();
-    }
-  }
-  
-  /**
-   * Serializes an object to a path in HDFS and adds the file to the distributed cache.
-   * 
-   * @param conf Hadoop configuration
-   * @param obj Object to serialize
-   * @param path Path to serialize object to
-   * @throws IOException
-   */
-  public static void writeObject(Configuration conf, Object obj, org.apache.hadoop.fs.Path path) throws IOException
-  {
-    FileSystem fs = FileSystem.get(conf);
-    FSDataOutputStream outputStream = fs.create(path, true);
-    ObjectOutputStream objStream = new ObjectOutputStream(outputStream);
-    objStream.writeObject(obj);
-    objStream.close();
-    outputStream.close();
-    DistributedCache.addCacheFile(path.toUri(), conf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectMapper.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectMapper.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectMapper.java
deleted file mode 100644
index ea1ca2e..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectMapper.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapreduce.MapContext;
-
-/**
- * Defines the interface for a mapper implementation that {@link DelegatingMapper} delegates to.
- * 
- * @author "Matthew Hayes"
- *
- */
-public abstract class ObjectMapper extends ObjectProcessor {
-  public abstract void map(Object input,
-      MapContext<Object,Object,Object,Object> context) throws IOException,InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectProcessor.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectProcessor.java
deleted file mode 100644
index f33c790..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectProcessor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-/**
- * Base class for {@link ObjectMapper} and {@link ObjectReducer}.
- * 
- * @author "Matthew Hayes"
- *
- */
-public abstract class ObjectProcessor
-{
-  private TaskInputOutputContext<Object,Object,Object,Object> context;
-  
-  public TaskInputOutputContext<Object,Object,Object,Object> getContext()
-  {
-    return this.context;
-  }
-  
-  public void setContext(TaskInputOutputContext<Object,Object,Object,Object> context)
-  {    
-    this.context = context;
-  }
-  
-  public void close() throws IOException, InterruptedException
-  {    
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectReducer.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectReducer.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectReducer.java
deleted file mode 100644
index 9340019..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectReducer.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapreduce.ReduceContext;
-
-/**
- * Defines the interface for combiner and reducer implementations that {@link DelegatingCombiner} and
- * {@link DelegatingReducer} delegate to.
- * 
- * @author "Matthew Hayes"
- *
- */
-public abstract class ObjectReducer extends ObjectProcessor {
-  public abstract void reduce(Object key,
-      Iterable<Object> values,
-      ReduceContext<Object,Object,Object,Object> context) throws IOException,InterruptedException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/Parameters.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/Parameters.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/Parameters.java
deleted file mode 100644
index 65d3fd9..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/Parameters.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-/**
- * Parameters used by the jobs to pass configuration settings 
- * to the mappers, combiners, and reducers.
- * 
- * @author "Matthew Hayes"
- *
- */
-public class Parameters 
-{
-  public static final String MAPPER_IMPL_PATH = "hourglass.mapper.impl.path"; 
-  public static final String REDUCER_IMPL_PATH = "hourglass.reducer.impl.path"; 
-  public static final String COMBINER_IMPL_PATH = "hourglass.combiner.impl.path";
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningCombiner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningCombiner.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningCombiner.java
deleted file mode 100644
index 53d6a90..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningCombiner.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.hadoop.mapreduce.ReduceContext;
-
-import datafu.hourglass.model.Accumulator;
-
-/**
- * The combiner used by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob} and its derived classes.
- * 
- * <p>
- * An implementation of {@link datafu.hourglass.model.Accumulator} is used to perform aggregation and produce the
- * intermediate value.
- * </p>
- * 
- * @author "Matthew Hayes"
- *
- */
-public class PartitioningCombiner extends ObjectReducer implements Serializable 
-{
-  private Accumulator<GenericRecord,GenericRecord> accumulator;
-  
-  @SuppressWarnings("unchecked")
-  public void reduce(Object keyObj,
-                      Iterable<Object> values,
-                      ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
-  {           
-    Accumulator<GenericRecord,GenericRecord> acc = getAccumulator();
-        
-    if (acc == null)
-    {
-      throw new RuntimeException("No accumulator set for combiner!");
-    }
-    
-    acc.cleanup();
-                    
-    long accumulatedCount = 0;    
-    for (Object valueObj : values)
-    {       
-      AvroValue<GenericRecord> value = (AvroValue<GenericRecord>)valueObj;
-      acc.accumulate(value.datum());
-      accumulatedCount++;
-    }
-    
-    if (accumulatedCount > 0)
-    {
-      GenericRecord intermediateValue = acc.getFinal();
-      if (intermediateValue != null)
-      {
-        context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(intermediateValue));
-      }
-    }
-  }
-  
-  /**
-   * Gets the accumulator used to perform aggregation. 
-   * 
-   * @return The accumulator
-   */
-  public Accumulator<GenericRecord,GenericRecord> getAccumulator()
-  {
-    return accumulator;
-  }
-  
-  /**
-   * Sets the accumulator used to perform aggregation. 
-   * 
-   * @param acc The accumulator
-   */
-  public void setAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
-  {
-    accumulator = acc;
-  }  
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningMapper.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningMapper.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningMapper.java
deleted file mode 100644
index 43b7130..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningMapper.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.mapreduce.MapContext;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-
-import datafu.hourglass.fs.PathUtils;
-import datafu.hourglass.model.KeyValueCollector;
-import datafu.hourglass.model.Mapper;
-import datafu.hourglass.schemas.PartitionPreservingSchemas;
-
-/**
- * The mapper used by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob} and its derived classes.
- * 
- * <p>
- * An implementation of {@link datafu.hourglass.model.Mapper} is used for the
- * map operation, which produces key and intermediate value pairs from the input.
- * The input to the mapper is assumed to be partitioned by day.
- * Each key produced by {@link datafu.hourglass.model.Mapper} is tagged with the time for the partition
- * that the input came from.  This enables the combiner and reducer to preserve the partitions.
- * </p>
- * 
- * @author "Matthew Hayes"
- *
- */
-public class PartitioningMapper extends ObjectMapper implements Serializable
-{
-  private transient MapCollector _mapCollector;
-  private transient FileSplit _lastSplit;
-  private transient long _lastTime;
-  private Mapper<GenericRecord,GenericRecord,GenericRecord> _mapper;
-  private PartitionPreservingSchemas _schemas;
-  
-  @SuppressWarnings("unchecked")
-  @Override
-  public void map(Object inputObj, MapContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
-  {  
-    long time;
-    
-    if (_lastSplit == context.getInputSplit())
-    {
-      time = _lastTime;
-    }
-    else
-    {
-      _lastSplit = (FileSplit)context.getInputSplit();      
-      time = PathUtils.getDateForNestedDatedPath((_lastSplit).getPath().getParent()).getTime();
-      _lastTime = time;
-    }
-    
-    getMapCollector().setContext(context);
-    
-    // Set the time, representing the time range this data was derived from.
-    // The key is tagged with this time.
-    getMapCollector().setTime(time);
-    
-    try
-    {
-      AvroKey<GenericRecord> input = (AvroKey<GenericRecord>)inputObj;
-      getMapper().map(input.datum(),getMapCollector());
-    }
-    catch (InterruptedException e)
-    {          
-      throw new IOException(e);
-    }
-  }
-  
-  /**
-   * Gets the mapper.
-   * 
-   * @return mapper
-   */
-  public Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper()
-  {
-    return _mapper;
-  }
-  
-  /**
-   * Sets the mapper.
-   * 
-   * @param mapper
-   */
-  public void setMapper(Mapper<GenericRecord,GenericRecord,GenericRecord> mapper)
-  {
-    _mapper = mapper;
-  }
-  
-  /**
-   * Sets the Avro schemas.
-   * 
-   * @param schemas
-   */
-  public void setSchemas(PartitionPreservingSchemas schemas)
-  {
-    _schemas = schemas;
-  }
-  
-  /**
-   * Gets the Avro schemas.
-   * 
-   * @return schemas
-   */
-  public PartitionPreservingSchemas getSchemas()
-  {
-    return _schemas;
-  }
-  
-  @Override
-  public void setContext(
-      TaskInputOutputContext<Object, Object, Object, Object> context) 
-  {
-    super.setContext(context);
-        
-    if (_mapper instanceof Configurable)
-    {
-      ((Configurable)_mapper).setConf(context.getConfiguration());
-    }
-  }
-
-  private MapCollector getMapCollector()
-  {
-    if (_mapCollector == null)
-    {
-      _mapCollector = new MapCollector(getSchemas());
-    }
-    
-    return _mapCollector;
-  }
-  
-  /**
-   * A {@see KeyValueCollector} that writes to {@see MapContext} and tags each mapped key with the time for the partition
-   * it was derived from.  This keeps the data partitioned so that the reducer may process each partition independently.
-   * 
-   * @author "Matthew Hayes"
-   *
-   */
-  private class MapCollector implements KeyValueCollector<GenericRecord,GenericRecord>
-  {
-    private MapContext<Object,Object,Object,Object> context;
-    private GenericRecord wrappedKey;
-      
-    public MapCollector(PartitionPreservingSchemas schemas)
-    {
-      this.wrappedKey = new GenericData.Record(schemas.getMapOutputKeySchema());
-    }
-    
-    public void setContext(MapContext<Object,Object,Object,Object> context)
-    {
-      this.context = context;
-    }
-    
-    public void setTime(long time)
-    {
-      this.wrappedKey.put("time", time);
-    }
-    
-    public void collect(GenericRecord key, GenericRecord value) throws IOException, InterruptedException
-    {        
-      wrappedKey.put("value", key);  
-      context.write(new AvroKey<GenericRecord>(wrappedKey),new AvroValue<GenericRecord>(value));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningReducer.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningReducer.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningReducer.java
deleted file mode 100644
index d6726ac..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningReducer.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
-import org.apache.hadoop.mapreduce.ReduceContext;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-
-import datafu.hourglass.fs.PathUtils;
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.schemas.PartitionPreservingSchemas;
-
-/**
- * The reducer used by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob} and its derived classes.
- * 
- * <p>
- * An implementation of {@link datafu.hourglass.model.Accumulator} is used to perform aggregation and produce the
- * output value.
- * </p>
- * 
- * <p>
- * The input key is assumed to have time and value fields.  The value here is the true key,
- * and the time represents the input partition the data was derived from.  The true key is
- * used as the key in the reducer output and the time is dropped. 
- * This reducer uses multiple outputs; the time is used to determine which output to write to,
- * where the named outputs have the form yyyyMMdd derived from the time. 
- * </p>
- * 
- * @author "Matthew Hayes"
- *
- */
-public class PartitioningReducer extends ObjectReducer implements Serializable 
-{  
-  private transient AvroMultipleOutputs _multipleOutputs;
-  private transient Map<Long,String> _timeToNamedOutput;
-  private PartitionPreservingSchemas _schemas;
-  private Accumulator<GenericRecord,GenericRecord> accumulator;
-  
-  @SuppressWarnings("unchecked")
-  public void reduce(Object keyObj,
-                     Iterable<Object> values,
-                     ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
-  {  
-    Accumulator<GenericRecord,GenericRecord> acc = getAccumulator();
-    
-    if (acc == null)
-    {
-      throw new RuntimeException("No accumulator set for reducer!");
-    }
-    
-    acc.cleanup();
-            
-    Long keyTime = null;
-    
-    GenericRecord key = ((AvroKey<GenericRecord>)keyObj).datum();
-    
-    keyTime = (Long)key.get("time");
-    key = (GenericRecord)key.get("value");
-    
-    long accumulatedCount = 0;    
-    for (Object valueObj : values)
-    {       
-      GenericRecord value = ((AvroValue<GenericRecord>)valueObj).datum(); 
-      acc.accumulate(value);      
-      accumulatedCount++;      
-    }
-    
-    if (accumulatedCount > 0)
-    {
-      GenericRecord outputValue = acc.getFinal();               
-      if (outputValue != null)
-      {                    
-        GenericRecord output = new GenericData.Record(getSchemas().getReduceOutputSchema());
-        output.put("key", key);
-        output.put("value", outputValue);
-        
-        // write output in directories corresponding to each day
-        String namedOutput = getNamedOutput(keyTime);
-        if (_multipleOutputs == null)
-        {
-          throw new RuntimeException("No multiple outputs set");
-        }
-        _multipleOutputs.write(namedOutput, new AvroKey<GenericRecord>(output), (AvroValue<GenericRecord>)null);
-      }
-    }
-  }
-  
-  @Override
-  public void setContext(TaskInputOutputContext<Object,Object,Object,Object> context)
-  {           
-    super.setContext(context);
-    
-    // ... and we also write the final output to multiple directories
-    _multipleOutputs = new AvroMultipleOutputs(context);
-  }
-  
-  /**
-   * Sets the accumulator used to perform aggregation. 
-   * 
-   * @param acc The accumulator
-   */
-  public void setAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
-  {
-    accumulator = acc;
-  }
-  
-  /**
-   * Gets the accumulator used to perform aggregation. 
-   * 
-   * @return The accumulator
-   */
-  public Accumulator<GenericRecord,GenericRecord> getAccumulator()
-  {
-    return accumulator;
-  }
-  
-  /**
-   * Sets the Avro schemas.
-   * 
-   * @param schemas
-   */
-  public void setSchemas(PartitionPreservingSchemas schemas)
-  {
-    _schemas = schemas;
-  }
-  
-  /**
-   * Gets the Avro schemas
-   * 
-   * @return schemas
-   */
-  public PartitionPreservingSchemas getSchemas()
-  {
-    return _schemas;
-  }
-
-  @Override
-  public void close() throws IOException, InterruptedException
-  {
-    super.close();
-    
-    if (_multipleOutputs != null)
-    {
-      _multipleOutputs.close();
-      _multipleOutputs = null;
-    }
-  }
-  
-  private String getNamedOutput(Long time)
-  {
-    if (_timeToNamedOutput == null)
-    {
-      _timeToNamedOutput = new HashMap<Long,String>();
-    }
-    String namedOutput = _timeToNamedOutput.get(time);
-    if (namedOutput == null)
-    {
-      namedOutput = PathUtils.datedPathFormat.format(new Date(time));
-      _timeToNamedOutput.put(time, namedOutput);
-    }
-    return namedOutput;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/package-info.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/package-info.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/package-info.java
deleted file mode 100644
index 5ee265b..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/package-info.java
+++ /dev/null
@@ -1,6 +0,0 @@
-/**
- * Implementations of mappers, combiners, and reducers used by incremental jobs.
- * These are used internally by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob}
- * and {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob}.
- */
-package datafu.hourglass.mapreduce;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/model/Accumulator.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/model/Accumulator.java b/contrib/hourglass/src/java/datafu/hourglass/model/Accumulator.java
deleted file mode 100644
index 4f7ac7e..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/model/Accumulator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.model;
-
-import java.io.Serializable;
-
-/**
- * Collects a sequence of values and produces one value as a result.
- * 
- * @author "Matthew Hayes"
- *
- * @param <In> Input value type
- * @param <Out> Output value type
- */
-public interface Accumulator<In,Out> extends Serializable
-{
-  /**
-   * Accumulate another value.
-   *  
-   * @param value Value to accumulate
-   */
-  void accumulate(In value);
-  
-  /**
-   * Get the output value corresponding to all input values accumulated so far.
-   * 
-   * <p>
-   * This may return null to indicate no record should be written.
-   * </p>
-   * 
-   * @return Output value
-   */
-  Out getFinal();
-  
-  /**
-   * Resets the internal state so that all values accumulated so far are forgotten.
-   */
-  void cleanup();
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/model/KeyValueCollector.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/model/KeyValueCollector.java b/contrib/hourglass/src/java/datafu/hourglass/model/KeyValueCollector.java
deleted file mode 100644
index 215dd8d..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/model/KeyValueCollector.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.model;
-
-import java.io.IOException;
-
-/**
- * Provided to an instance of {@link Mapper} to collect key-value pairs.
- * 
- * @author "Matthew Hayes"
- *
- * @param <K> Key type
- * @param <V> Value type
- */
-public interface KeyValueCollector<K,V>
-{
-  /**
-   * Collects key-value pairs.
-   * 
-   * @param key Key to be collected
-   * @param value Value to be collected
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  void collect(K key,V value)  throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/model/Mapper.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/model/Mapper.java b/contrib/hourglass/src/java/datafu/hourglass/model/Mapper.java
deleted file mode 100644
index 7640ae5..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/model/Mapper.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.model;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * Maps an input record to one or more key-value pairs.
- * 
- * @author "Matthew Hayes"
- *
- * @param <In> Input type
- * @param <OutKey> Output key type
- * @param <OutVal> Output value type
- */
-public interface Mapper<In,OutKey,OutVal> extends Serializable
-{
-  /**
-   * Maps an input record to one or more key-value pairs.
-   * 
-   * @param input Input value
-   * @param collector Collects output key-value pairs
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  void map(In input, KeyValueCollector<OutKey,OutVal> collector)  throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/model/Merger.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/model/Merger.java b/contrib/hourglass/src/java/datafu/hourglass/model/Merger.java
deleted file mode 100644
index 0c27865..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/model/Merger.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.model;
-
-import java.io.Serializable;
-
-/**
- * Merges two values together.
- * 
- * @author "Matthew Hayes"
- *
- * @param <T> Value type
- */
-public interface Merger<T> extends Serializable
-{
-  /**
-   * Merges two values together.
-   * 
-   * <p>
-   * This may return null to indicate no record should be written.
-   * </p>
-   * 
-   * @param left Left value
-   * @param right Right value
-   * @return Merged result
-   */
-  T merge(T left, T right);
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/model/package-info.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/model/package-info.java b/contrib/hourglass/src/java/datafu/hourglass/model/package-info.java
deleted file mode 100644
index 984c1fb..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/model/package-info.java
+++ /dev/null
@@ -1,4 +0,0 @@
-/**
- * Interfaces which define the incremental processing model.
- */
-package datafu.hourglass.model;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/schemas/PartitionCollapsingSchemas.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/schemas/PartitionCollapsingSchemas.java b/contrib/hourglass/src/java/datafu/hourglass/schemas/PartitionCollapsingSchemas.java
deleted file mode 100644
index 8d8246c..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/schemas/PartitionCollapsingSchemas.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.schemas;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.mapred.Pair;
-import org.apache.commons.lang.NullArgumentException;
-
-/**
- * Generates the Avro schemas used by {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob} and its derivations.
- * 
- * @author "Matthew Hayes"
- *
- */
-public class PartitionCollapsingSchemas implements Serializable
-{  
-  private static String DATED_INTERMEDIATE_VALUE_SCHEMA_NAME = "DatedMapValue";
-  private static String KEY_SCHEMA = "key.schema";
-  private static String INTERMEDIATE_VALUE_SCHEMA = "intermediate.value.schema";
-  private static String OUTPUT_VALUE_SCHEMA = "output.value.schema";
-  
-  private final String _outputSchemaName;
-  private final String _outputSchemaNamespace;
-  private transient Schema _keySchema;
-  private transient Schema _intermediateValueSchema;
-  private transient Schema _outputValueSchema;
-  
-  // generated schemas
-  private transient Schema _mapOutputSchema;
-  private transient Schema _dateIntermediateValueSchema;
-  private transient Schema _mapOutputValueSchema;
-  private transient Schema _reduceOutputSchema;
-  private transient Map<String,Schema> _mapInputSchemas;
-  
-  //schemas are stored here so the object can be serialized
-  private Map<String,String> conf;
-
-  private Map<String,String> _inputSchemas;
-  
-  public PartitionCollapsingSchemas(TaskSchemas schemas, Map<String,Schema> inputSchemas, String outputSchemaName, String outputSchemaNamespace)
-  {
-    if (schemas == null)
-    {
-      throw new NullArgumentException("schemas");
-    }
-    if (inputSchemas == null)
-    {
-      throw new NullArgumentException("inputSchema");
-    }
-    if (outputSchemaName == null)
-    {
-      throw new NullArgumentException("outputSchemaName");
-    }
-    if (outputSchemaName == outputSchemaNamespace)
-    {
-      throw new NullArgumentException("outputSchemaNamespace");
-    }
-    _outputSchemaName = outputSchemaName;
-    _outputSchemaNamespace = outputSchemaNamespace;
-    
-    conf = new HashMap<String,String>();
-    conf.put(KEY_SCHEMA, schemas.getKeySchema().toString());
-    conf.put(INTERMEDIATE_VALUE_SCHEMA, schemas.getIntermediateValueSchema().toString());
-    conf.put(OUTPUT_VALUE_SCHEMA, schemas.getOutputValueSchema().toString());
-    
-    _inputSchemas = new HashMap<String,String>();
-    for (Entry<String,Schema> schema : inputSchemas.entrySet())
-    {
-      _inputSchemas.put(schema.getKey(), schema.getValue().toString());
-    }
-  }
-    
-  public Map<String,Schema> getMapInputSchemas()
-  {    
-    if (_mapInputSchemas == null)
-    {
-      _mapInputSchemas = new HashMap<String,Schema>();
-      
-      for (Entry<String,String> schemaPair : _inputSchemas.entrySet())
-      {
-        Schema schema = new Schema.Parser().parse(schemaPair.getValue());
-        
-        List<Schema> mapInputSchemas = new ArrayList<Schema>();
-        
-        if (schema.getType() == Type.UNION)
-        {
-          mapInputSchemas.addAll(schema.getTypes());
-        }
-        else
-        {
-          mapInputSchemas.add(schema);
-        }
-        
-        // feedback from output (optional)
-        mapInputSchemas.add(getReduceOutputSchema());
-        
-        _mapInputSchemas.put(schemaPair.getKey(), Schema.createUnion(mapInputSchemas));
-      }
-      
-      
-    }
-    return Collections.unmodifiableMap(_mapInputSchemas);
-  }
-    
-  public Schema getMapOutputSchema()
-  {
-    if (_mapOutputSchema == null)
-    {
-      _mapOutputSchema = Pair.getPairSchema(getMapOutputKeySchema(), 
-                                            getMapOutputValueSchema());
-    }
-    return _mapOutputSchema;
-  }
-  
-  public Schema getKeySchema()
-  {
-    if (_keySchema == null)
-    {
-      _keySchema = new Schema.Parser().parse(conf.get(KEY_SCHEMA));
-    }
-    return _keySchema;
-  }
-      
-  public Schema getMapOutputKeySchema()
-  {
-    return getKeySchema();
-  }  
-  
-  public Schema getReduceOutputSchema()
-  {
-    if (_reduceOutputSchema == null)
-    {
-      _reduceOutputSchema = Schema.createRecord(_outputSchemaName, null, _outputSchemaNamespace, false);            
-      List<Field> fields = Arrays.asList(new Field("key",getKeySchema(), null, null),
-                                         new Field("value", getOutputValueSchema(), null, null));    
-      _reduceOutputSchema.setFields(fields);
-    }
-    return _reduceOutputSchema;
-  }
-    
-  public Schema getDatedIntermediateValueSchema()
-  {
-    if (_dateIntermediateValueSchema == null)
-    {
-      _dateIntermediateValueSchema = Schema.createRecord(DATED_INTERMEDIATE_VALUE_SCHEMA_NAME, null, _outputSchemaNamespace, false);
-      List<Field> intermediateValueFields = Arrays.asList(new Field("value", getIntermediateValueSchema(), null, null),
-                                                         new Field("time", Schema.create(Type.LONG), null, null));
-      _dateIntermediateValueSchema.setFields(intermediateValueFields);
-    }
-    return _dateIntermediateValueSchema;
-  }
-  
-  public Schema getOutputValueSchema()
-  {
-    if (_outputValueSchema == null)
-    {
-      _outputValueSchema = new Schema.Parser().parse(conf.get(OUTPUT_VALUE_SCHEMA));
-    }
-    return _outputValueSchema;
-  }
-  
-  public Schema getIntermediateValueSchema()
-  {
-    if (_intermediateValueSchema == null)
-    {
-      _intermediateValueSchema = new Schema.Parser().parse(conf.get(INTERMEDIATE_VALUE_SCHEMA));
-    }
-    return _intermediateValueSchema;
-  }
-    
-  public Schema getMapOutputValueSchema()
-  {    
-    if (_mapOutputValueSchema == null)
-    {
-      List<Schema> unionSchemas = new ArrayList<Schema>();
-      
-      unionSchemas.add(getIntermediateValueSchema());
-      
-      // intermediate values tagged with the date
-      unionSchemas.add(getDatedIntermediateValueSchema());
-      
-      // feedback from output of second pass
-      if (!unionSchemas.contains(getOutputValueSchema()))
-      {
-        unionSchemas.add(getOutputValueSchema());
-      }
-      
-      _mapOutputValueSchema = Schema.createUnion(unionSchemas);
-    }
-    return _mapOutputValueSchema;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/schemas/PartitionPreservingSchemas.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/schemas/PartitionPreservingSchemas.java b/contrib/hourglass/src/java/datafu/hourglass/schemas/PartitionPreservingSchemas.java
deleted file mode 100644
index d4f3bd2..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/schemas/PartitionPreservingSchemas.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.schemas;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.mapred.Pair;
-
-/**
- * Generates the Avro schemas used by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob} and its derivations.
- * 
- * @author "Matthew Hayes"
- *
- */
-public class PartitionPreservingSchemas implements Serializable
-{  
-  private static String DATED_INCREMENTAL_KEY_SCHEMA_NAME = "DatedMapKey";
-  
-  private static String KEY_SCHEMA = "key.schema";
-  private static String INTERMEDIATE_VALUE_SCHEMA = "intermediate.value.schema";
-  private static String OUPUT_VALUE_SCHEMA = "output.value.schema";
-  
-  private transient Schema _keySchema;
-  private transient Schema _intermediateValueSchema;
-  private transient Schema _outputValueSchema;
-  private final String _outputSchemaName;
-  private final String _outputSchemaNamespace;
-  
-  // generated schemas
-  private transient Schema _reduceOutputSchema;
-  private transient Schema _mapOutputKeySchema;
-  private transient Schema _mapOutputSchema;
-  private transient Map<String,Schema> _inputSchemasParsed;
-  
-  // schemas are stored here so the object can be serialized
-  private Map<String,String> conf;
-  
-  private Map<String,String> _inputSchemas;
-  
-  public PartitionPreservingSchemas(TaskSchemas schemas, Map<String,Schema> inputSchemas, String outputSchemaName, String outputSchemaNamespace)
-  {
-    _outputSchemaName = outputSchemaName;
-    _outputSchemaNamespace = outputSchemaNamespace;
-    
-    conf = new HashMap<String,String>();
-    conf.put(KEY_SCHEMA, schemas.getKeySchema().toString());
-    conf.put(INTERMEDIATE_VALUE_SCHEMA, schemas.getIntermediateValueSchema().toString());
-    conf.put(OUPUT_VALUE_SCHEMA, schemas.getOutputValueSchema().toString());
-    
-    _inputSchemas = new HashMap<String,String>();
-    for (Entry<String,Schema> schema : inputSchemas.entrySet())
-    {
-      _inputSchemas.put(schema.getKey(), schema.getValue().toString());
-    }
-  }
-  
-  public Map<String,Schema> getMapInputSchemas()
-  {
-    if (_inputSchemasParsed == null)
-    {
-      _inputSchemasParsed = new HashMap<String,Schema>(); 
-      for (Entry<String,String> schema : _inputSchemas.entrySet())
-      {
-        _inputSchemasParsed.put(schema.getKey(), new Schema.Parser().parse(schema.getValue()));
-      }
-    }
-    return _inputSchemasParsed;
-  }
-    
-  public Schema getMapOutputSchema()
-  {
-    if (_mapOutputSchema == null)
-    {
-      _mapOutputSchema = Pair.getPairSchema(getMapOutputKeySchema(), 
-                                            getMapOutputValueSchema());
-    }
-    return _mapOutputSchema;
-  }
-  
-  public Schema getReduceOutputSchema()
-  {
-    if (_reduceOutputSchema == null)
-    {
-      _reduceOutputSchema = Schema.createRecord(_outputSchemaName, null, _outputSchemaNamespace, false);            
-      List<Field> fields = Arrays.asList(new Field("key",getKeySchema(), null, null),
-                                         new Field("value", getOutputValueSchema(), null, null));    
-      _reduceOutputSchema.setFields(fields);
-    }
-    return _reduceOutputSchema;
-  }
-  
-  public Schema getOutputValueSchema()
-  {
-    if (_outputValueSchema == null)
-    {
-      _outputValueSchema = new Schema.Parser().parse(conf.get(OUPUT_VALUE_SCHEMA));
-    }
-    return _outputValueSchema;
-  }
-  
-  public Schema getKeySchema()
-  {
-    if (_keySchema == null)
-    {
-      _keySchema = new Schema.Parser().parse(conf.get(KEY_SCHEMA));
-    }
-    return _keySchema;
-  }
-        
-  public Schema getMapOutputKeySchema()
-  {
-    if (_mapOutputKeySchema == null)
-    {
-        _mapOutputKeySchema = Schema.createRecord(DATED_INCREMENTAL_KEY_SCHEMA_NAME, null, _outputSchemaNamespace, false);
-      // key needs the time included with it for partitioning
-      List<Field> incrementalKeyFields = Arrays.asList(new Field("value", getKeySchema(), null, null),
-                                                         new Field("time", Schema.create(Type.LONG), null, null));
-      _mapOutputKeySchema.setFields(incrementalKeyFields);
-    }
-    return _mapOutputKeySchema;
-  }
-  
-  public Schema getIntermediateValueSchema()
-  {
-    if (_intermediateValueSchema == null)
-    {
-      _intermediateValueSchema = new Schema.Parser().parse(conf.get(INTERMEDIATE_VALUE_SCHEMA));
-    }
-    return _intermediateValueSchema;
-  }
-    
-  public Schema getMapOutputValueSchema()
-  {    
-    return getIntermediateValueSchema();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/schemas/TaskSchemas.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/schemas/TaskSchemas.java b/contrib/hourglass/src/java/datafu/hourglass/schemas/TaskSchemas.java
deleted file mode 100644
index 829755e..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/schemas/TaskSchemas.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.schemas;
-
-import org.apache.avro.Schema;
-
-/**
- * Contains the Avro schemas for the key, intermediate value, and output value of a job.
- * 
- * <p>
- * The mapper and combiner output key-value pairs conforming to the key and intermediate
- * value schemas defined here.
- * The reducer outputs key-value pairs conforming to the key and output value schemas.
- * </p>
- * 
- * @author "Matthew Hayes"
- *
- */
-public class TaskSchemas
-{    
-  private Schema keySchema;
-  private Schema intermediateValueSchema;
-  private Schema outputValueSchema;
-  
-  private TaskSchemas(Schema keySchema, Schema intermediateValueSchema, Schema outputValueSchema)
-  {    
-    if (keySchema == null)
-    {
-      throw new IllegalArgumentException("missing key schema");
-    }
-    
-    if (intermediateValueSchema == null)
-    {
-      throw new IllegalArgumentException("missing intermediate value schema");
-    }
-    
-    if (outputValueSchema == null)
-    {
-      throw new IllegalArgumentException("missing output value schema");
-    }
-    
-    this.keySchema = keySchema;
-    this.intermediateValueSchema = intermediateValueSchema;
-    this.outputValueSchema = outputValueSchema;
-  }
-  
-  public Schema getKeySchema()
-  {
-    return keySchema;
-  }
-
-  public Schema getIntermediateValueSchema()
-  {
-    return intermediateValueSchema;
-  }
-
-  public Schema getOutputValueSchema()
-  {
-    return outputValueSchema;
-  }
-  
-  public static class Builder
-  {
-    private Schema keySchema;
-    private Schema intermediateValueSchema;
-    private Schema outputValueSchema;
-    
-    public Builder setKeySchema(Schema schema)
-    {
-      keySchema = schema;
-      return this;
-    }
-    
-    public Builder setIntermediateValueSchema(Schema schema)
-    {
-      intermediateValueSchema = schema;
-      return this;
-    }
-    
-    public Builder setOutputValueSchema(Schema schema)
-    {
-      outputValueSchema = schema;
-      return this;
-    }
-    
-    public TaskSchemas build()
-    {
-      return new TaskSchemas(keySchema,intermediateValueSchema,outputValueSchema);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/schemas/package-info.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/schemas/package-info.java b/contrib/hourglass/src/java/datafu/hourglass/schemas/package-info.java
deleted file mode 100644
index 3a6d2b8..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/schemas/package-info.java
+++ /dev/null
@@ -1,6 +0,0 @@
-/**
- * Classes that help manage the Avro schemas used by the jobs.
- * These are used internally by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob}
- * and {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob}.
- */
-package datafu.hourglass.schemas;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/test.sh
----------------------------------------------------------------------
diff --git a/contrib/hourglass/test.sh b/contrib/hourglass/test.sh
deleted file mode 100755
index f017e58..0000000
--- a/contrib/hourglass/test.sh
+++ /dev/null
@@ -1,5 +0,0 @@
-#!/usr/bin/env bash
-
-echo $$ > test.pid
-ant test
-rm test.pid

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/test/java/datafu/hourglass/demo/CountById.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/test/java/datafu/hourglass/demo/CountById.java b/contrib/hourglass/test/java/datafu/hourglass/demo/CountById.java
deleted file mode 100644
index 166a5db..0000000
--- a/contrib/hourglass/test/java/datafu/hourglass/demo/CountById.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.demo;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Arrays;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-
-
-import datafu.hourglass.jobs.PartitionCollapsingIncrementalJob;
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.model.KeyValueCollector;
-import datafu.hourglass.model.Mapper;
-
-/**
- * Given an input event with field "id" stored in daily directories according to yyyy/MM/dd, 
- * incrementally counts the number of events, grouped by each distinct value. 
- * New days will be merged with the previous output.
- * 
- * @author "Matthew Hayes"
- *
- */
-public class CountById extends Configured implements NamedTool, Serializable
-{
-  @Override
-  public int run(String[] args) throws Exception
-  {
-    if (args.length != 2)
-    {
-      System.err.printf("%s   %s\n",getName(),getDescription());
-      System.err.println("Usage: <input> <output>");
-      return 1;
-    }
-    
-    try
-    {
-      run(super.getConf(), args[0], args[1]);
-      return 0;
-    }
-    catch (IOException e)
-    {
-      e.printStackTrace();
-    }
-    catch (InterruptedException e)
-    {
-      e.printStackTrace();
-    }
-    catch (ClassNotFoundException e)
-    {
-      e.printStackTrace();
-    }
-    return 1;
-  }
-  
-  public void run(Configuration conf, String inputPath, String outputPath) throws IOException, InterruptedException, ClassNotFoundException
-  {
-    PartitionCollapsingIncrementalJob job = new PartitionCollapsingIncrementalJob(Examples.class);
-    
-    job.setConf(conf);
-    
-    final String namespace = "com.example";
-    
-    final Schema keySchema = Schema.createRecord("Key",null,namespace,false);
-    keySchema.setFields(Arrays.asList(new Field("member_id",Schema.create(Type.LONG),null,null)));
-    final String keySchemaString = keySchema.toString(true);
-    
-    System.out.println("Key schema: " + keySchemaString);
-    
-    final Schema valueSchema = Schema.createRecord("Value",null,namespace,false);
-    valueSchema.setFields(Arrays.asList(new Field("count",Schema.create(Type.INT),null,null)));
-    final String valueSchemaString = valueSchema.toString(true);
-    
-    System.out.println("Value schema: " + valueSchemaString);
-    
-    job.setKeySchema(keySchema);
-    job.setIntermediateValueSchema(valueSchema);
-    job.setOutputValueSchema(valueSchema);
-    
-    job.setInputPaths(Arrays.asList(new Path(inputPath)));
-    job.setOutputPath(new Path(outputPath));
-    job.setReusePreviousOutput(true);
-    job.setRetentionCount(3);
-    
-    job.setMapper(new Mapper<GenericRecord,GenericRecord,GenericRecord>() 
-    {
-      private transient Schema kSchema;
-      private transient Schema vSchema;
-      
-      @Override
-      public void map(GenericRecord input,
-                      KeyValueCollector<GenericRecord, GenericRecord> collector) throws IOException,
-          InterruptedException
-      {
-        if (kSchema == null) kSchema = new Schema.Parser().parse(keySchemaString);
-        if (vSchema == null) vSchema = new Schema.Parser().parse(valueSchemaString);
-        GenericRecord key = new GenericData.Record(kSchema);
-        key.put("member_id", input.get("id"));
-        GenericRecord value = new GenericData.Record(vSchema);
-        value.put("count", 1);
-        collector.collect(key,value);
-      }      
-    });
-    
-    job.setReducerAccumulator(new Accumulator<GenericRecord,GenericRecord>() 
-    {
-      private transient int count;
-      private transient Schema vSchema;
-      
-      @Override
-      public void accumulate(GenericRecord value)
-      {
-        this.count += (Integer)value.get("count");
-      }
-
-      @Override
-      public GenericRecord getFinal()
-      {
-        if (vSchema == null) vSchema = new Schema.Parser().parse(valueSchemaString);
-        GenericRecord output = new GenericData.Record(vSchema);
-        output.put("count", count);
-        return output;
-      }
-
-      @Override
-      public void cleanup()
-      {
-        this.count = 0;
-      }      
-    });
-    
-    job.setCombinerAccumulator(job.getReducerAccumulator());
-    job.setUseCombiner(true);
-    
-    job.run();
-  }
-
-  @Override
-  public String getName()
-  {
-    return "countbyid";
-  }
-
-  @Override
-  public String getDescription()
-  {
-    return "incrementally counts by id";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/test/java/datafu/hourglass/demo/EstimateCardinality.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/test/java/datafu/hourglass/demo/EstimateCardinality.java b/contrib/hourglass/test/java/datafu/hourglass/demo/EstimateCardinality.java
deleted file mode 100644
index eeaf8cc..0000000
--- a/contrib/hourglass/test/java/datafu/hourglass/demo/EstimateCardinality.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.demo;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-
-import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
-import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
-
-import datafu.hourglass.jobs.PartitionCollapsingIncrementalJob;
-import datafu.hourglass.jobs.PartitionPreservingIncrementalJob;
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.model.KeyValueCollector;
-import datafu.hourglass.model.Mapper;
-
-/**
- * Given an input event with field "id" stored in daily directories according to yyyy/MM/dd, 
- * estimates the cardinality of the values over the past 30 days.
- * 
- * @author "Matthew Hayes"
- *
- */
-public class EstimateCardinality extends Configured implements NamedTool, Serializable
-{  
-  @Override
-  public int run(String[] args) throws Exception
-  {
-    if (args.length != 4)
-    {
-      System.out.println("Usage: <input> <intermediate> <output> <num-days>");
-      System.exit(1);
-    }
-    
-    try
-    {
-      run(super.getConf(), args[0], args[1], args[2], Integer.parseInt(args[3]));
-    }
-    catch (IOException e)
-    {
-      e.printStackTrace();
-      System.exit(1);
-    }
-    catch (InterruptedException e)
-    {
-      e.printStackTrace();
-      System.exit(1);
-    }
-    catch (ClassNotFoundException e)
-    {
-      e.printStackTrace();
-      System.exit(1);
-    }
-    return 0;
-  }
-  
-  public void run(Configuration conf, String inputPath, String intermediatePath, String outputPath, int numDays) throws IOException, InterruptedException, ClassNotFoundException
-  {
-    PartitionPreservingIncrementalJob job1 = new PartitionPreservingIncrementalJob(Examples.class);    
-    job1.setConf(conf);
-    
-    final String namespace = "com.example";
-    
-    final Schema keySchema = Schema.createRecord("Stat",null,namespace,false);
-    keySchema.setFields(Arrays.asList(new Field("name", Schema.create(Type.STRING), "the type of statistic", null)));
-    final String keySchemaString = keySchema.toString(true);
-    
-    // data is either an int (the member ID), or bytes from the estimator
-    Schema dataSchema = Schema.createUnion(Arrays.asList(Schema.create(Type.LONG),Schema.create(Type.BYTES)));
-    
-    final Schema valueSchema = Schema.createRecord("Value",null,namespace,false);
-    valueSchema.setFields(Arrays.asList(new Field("data", dataSchema, "the data, either a member ID or bytes from the estimator", null),
-                                         new Field("count", Schema.create(Type.LONG), "the number of elements", null)));
-    final String valueSchemaString = valueSchema.toString(true);
-    
-    Mapper<GenericRecord,GenericRecord,GenericRecord> mapper = new Mapper<GenericRecord,GenericRecord,GenericRecord>() {
-      private transient Schema kSchema;
-      private transient Schema vSchema;
-      
-      @Override
-      public void map(GenericRecord input,
-                      KeyValueCollector<GenericRecord, GenericRecord> collector) throws IOException,
-          InterruptedException
-      {
-        if (kSchema == null) kSchema = new Schema.Parser().parse(keySchemaString);
-        if (vSchema == null) vSchema = new Schema.Parser().parse(valueSchemaString);
-        GenericRecord key = new GenericData.Record(kSchema);
-        key.put("name", "member_count");
-        GenericRecord value = new GenericData.Record(vSchema);
-        value.put("data",input.get("id")); // member id
-        value.put("count", 1L);             // just a single member
-        collector.collect(key,value);        
-      }      
-    };
-    
-    Accumulator<GenericRecord,GenericRecord> accumulator = new Accumulator<GenericRecord,GenericRecord>() {
-      private transient HyperLogLogPlus estimator;
-      private transient Schema vSchema;
-      
-      @Override
-      public void accumulate(GenericRecord value)
-      {
-        if (estimator == null) estimator = new HyperLogLogPlus(20);
-        Object data = value.get("data");
-        if (data instanceof Long)
-        {
-          estimator.offer(data);
-        }
-        else if (data instanceof ByteBuffer)
-        {
-          ByteBuffer bytes = (ByteBuffer)data;
-          HyperLogLogPlus newEstimator;
-          try
-          {
-            newEstimator = HyperLogLogPlus.Builder.build(bytes.array());
-            estimator = (HyperLogLogPlus)estimator.merge(newEstimator);
-          }
-          catch (IOException e)
-          {
-            throw new RuntimeException(e);
-          }
-          catch (CardinalityMergeException e)
-          {
-            throw new RuntimeException(e);
-          }      
-        }
-      }
-
-      @Override
-      public GenericRecord getFinal()
-      {
-        if (vSchema == null) vSchema = new Schema.Parser().parse(valueSchemaString);
-        GenericRecord output = new GenericData.Record(vSchema);
-        try
-        {
-          ByteBuffer bytes = ByteBuffer.wrap(estimator.getBytes());
-          output.put("data", bytes);
-          output.put("count", estimator.cardinality());
-        }
-        catch (IOException e)
-        {
-          throw new RuntimeException(e);
-        }
-        return output;
-      }
-
-      @Override
-      public void cleanup()
-      {
-        estimator = null;
-      }      
-    };
-    
-    job1.setKeySchema(keySchema);
-    job1.setIntermediateValueSchema(valueSchema);
-    job1.setOutputValueSchema(valueSchema);    
-    job1.setInputPaths(Arrays.asList(new Path(inputPath)));
-    job1.setOutputPath(new Path(intermediatePath));
-    job1.setMapper(mapper);
-    job1.setCombinerAccumulator(accumulator);
-    job1.setReducerAccumulator(accumulator);
-    job1.setNumDays(numDays);
-    
-    job1.run();
-    
-    PartitionCollapsingIncrementalJob job2 = new PartitionCollapsingIncrementalJob(Examples.class);    
-    job2.setConf(conf);
-    
-    Mapper<GenericRecord,GenericRecord,GenericRecord> identity = new Mapper<GenericRecord,GenericRecord,GenericRecord>()
-    {
-      @Override
-      public void map(GenericRecord record, KeyValueCollector<GenericRecord,GenericRecord> context) throws IOException,
-          InterruptedException
-      {
-        context.collect((GenericRecord)record.get("key"), (GenericRecord)record.get("value"));
-      }
-    };
-    
-    job2.setNumDays(30);
-    job2.setKeySchema(keySchema);
-    job2.setIntermediateValueSchema(valueSchema);
-    job2.setOutputValueSchema(valueSchema);    
-    job2.setInputPaths(Arrays.asList(new Path(intermediatePath)));
-    job2.setOutputPath(new Path(outputPath));
-    job2.setMapper(identity);
-    job2.setCombinerAccumulator(accumulator);
-    job2.setReducerAccumulator(accumulator);
-    job2.setNumDays(numDays);
-    
-    job2.run();
-  }
-
-  @Override
-  public String getName()
-  {
-    return "cardinality";
-  }
-
-  @Override
-  public String getDescription()
-  {
-    return "estimates cardinality of IDs";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/test/java/datafu/hourglass/demo/Examples.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/test/java/datafu/hourglass/demo/Examples.java b/contrib/hourglass/test/java/datafu/hourglass/demo/Examples.java
deleted file mode 100644
index 8df7539..0000000
--- a/contrib/hourglass/test/java/datafu/hourglass/demo/Examples.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-* 
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-* 
-* http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.demo;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-import datafu.hourglass.fs.PathUtils;
-import datafu.hourglass.test.Schemas;
-import datafu.hourglass.test.PartitionCollapsingTests;
-import datafu.hourglass.test.TestBase;
-import datafu.hourglass.test.util.DailyTrackingWriter;
-
-@Test(groups="pcl")
-public class Examples extends TestBase
-{
-  private Logger _log = Logger.getLogger(PartitionCollapsingTests.class);
-    
-  private static final Schema EVENT_SCHEMA;
-  
-  private GenericRecord _record;
-  private DailyTrackingWriter _eventWriter;
-  
-  static
-  {    
-    EVENT_SCHEMA = Schemas.createRecordSchema(Examples.class, "Event",
-                                              new Field("id", Schema.create(Type.LONG), null, null));
-    
-    System.out.println("Event schema: " + EVENT_SCHEMA.toString(true));
-  }
-  
-  public Examples() throws IOException
-  {
-    super();
-  }
-  
-  @BeforeClass
-  public void beforeClass() throws Exception
-  {
-    super.beforeClass();
-  }
-  
-  @AfterClass
-  public void afterClass() throws Exception
-  {
-    super.afterClass();
-  }
-  
-  @BeforeMethod
-  public void beforeMethod(Method method) throws IOException
-  {
-    _log.info("*** Running " + method.getName());
-    
-    _log.info("*** Cleaning input and output paths");  
-    getFileSystem().delete(new Path("/data"), true);
-    getFileSystem().delete(new Path("/output"), true);
-    getFileSystem().mkdirs(new Path("/data"));
-    getFileSystem().mkdirs(new Path("/output"));
-               
-    _record = new GenericData.Record(EVENT_SCHEMA);    
-    _eventWriter = new DailyTrackingWriter(new Path("/data/event"),EVENT_SCHEMA,getFileSystem());
-  }
-  
-  @Test
-  public void countByMember() throws IOException, InterruptedException, ClassNotFoundException
-  {          
-    // setup
-    openDayForEvent(2013, 3, 15);        
-    storeIds(1,1,1);        
-    storeIds(2);
-    storeIds(3,3);    
-    closeDayForEvent();
-    
-    openDayForEvent(2013, 3, 16);        
-    storeIds(1,1);        
-    storeIds(2,2);
-    storeIds(3);    
-    closeDayForEvent();
-            
-    // run    
-    new CountById().run(createJobConf(),"/data/event","/output");
-    
-    // verify
-    
-    checkOutputFolderCount(new Path("/output"), 1);
-    
-    HashMap<Long,Integer> counts = loadOutputCounts(new Path("/output"), "20130316");
-    
-    checkSize(counts,3);    
-    checkIdCount(counts,1,5);
-    checkIdCount(counts,2,3);
-    checkIdCount(counts,3,3);
-
-    // more data
-    openDayForEvent(2013, 3, 17);        
-    storeIds(1,1);        
-    storeIds(2,2,2);
-    storeIds(3,3);    
-    closeDayForEvent();
-    
-    // run    
-    new CountById().run(createJobConf(),"/data/event","/output");
-    
-    counts = loadOutputCounts(new Path("/output"), "20130317");
-    
-    checkSize(counts,3);    
-    checkIdCount(counts,1,7);
-    checkIdCount(counts,2,6);
-    checkIdCount(counts,3,5);
-  }
-  
-  @Test
-  public void estimateNumMembers() throws IOException, InterruptedException, ClassNotFoundException
-  {     
-    openDayForEvent(2013, 3, 1);  
-    // lots of members logged in this day
-    for (int i=1; i<=10000; i++) 
-    {
-      storeIds(i);   
-    } 
-    closeDayForEvent();
-    
-    // but only a handful logged in the remaining days
-    for (int i=2; i<=30; i++) 
-    {
-      openDayForEvent(2013, 3, i);        
-      storeIds(1,2,3,4,5);    
-      closeDayForEvent();
-    }
-        
-    // run    
-    new EstimateCardinality().run(createJobConf(),"/data/event","/output/daily","/output/summary",30);
-    
-    // verify    
-    checkIntermediateFolderCount(new Path("/output/daily"), 30);
-    checkOutputFolderCount(new Path("/output/summary"), 1);
-    Assert.assertTrue(Math.abs(10000L - loadMemberCount(new Path("/output/summary"),"20130330").longValue())/10000.0 < 0.005);
-
-    // more data
-    openDayForEvent(2013, 3, 31);        
-    storeIds(6,7,8,9,10);
-    closeDayForEvent();
-    
-    // run    
-    new EstimateCardinality().run(createJobConf(),"/data/event","/output/daily","/output/summary",30);
-    
-    // verify    
-    checkIntermediateFolderCount(new Path("/output/daily"), 31);
-    checkOutputFolderCount(new Path("/output/summary"), 1);
-    Assert.assertEquals(loadMemberCount(new Path("/output/summary"),"20130331").longValue(),10L);
-  }
-  
-  private void openDayForEvent(int year, int month, int day) throws IOException
-  {
-    System.out.println(String.format("start day: %04d %02d %02d",year,month,day));
-    _eventWriter.open(year, month, day);
-  }
-  
-  private void closeDayForEvent() throws IOException
-  {
-    _eventWriter.close();
-  }
-  
-  private void storeIds(long... ids) throws IOException
-  {
-    for (long id : ids)
-    {
-      storeId(id);
-    }
-  }
-    
-  private void storeId(long id) throws IOException
-  {
-    _record.put("id", id);  
-    System.out.println("record: " + _record.toString());
-    _eventWriter.append(_record);
-  }
-  
-  private void checkIdCount(HashMap<Long,Integer> counts, long id, long count)
-  {
-    Assert.assertTrue(counts.containsKey(id));
-    Assert.assertEquals(counts.get(id).intValue(), count);
-  }
-  
-  private void checkSize(HashMap<Long,Integer> counts, int expectedSize)
-  {
-    if (counts.size() != expectedSize)
-    {
-      StringBuilder sb = new StringBuilder("Expected count " + expectedSize + " does not match actual " + counts.size() + ", contents:\n");
-      List<Long> keys = new ArrayList<Long>(counts.keySet());
-      Collections.sort(keys);
-      for (Long k : keys)
-      {
-        sb.append(k.toString() + " => " + counts.get(k).toString() + "\n");
-      }
-      Assert.fail(sb.toString());
-    }
-  }
-  
-  private void checkOutputFolderCount(Path path, int expectedCount) throws IOException
-  {
-    Assert.assertEquals(countOutputFolders(path),expectedCount,"Found: " + listOutputFolders(path));
-  }
-  
-  private void checkIntermediateFolderCount(Path path, int expectedCount) throws IOException
-  {
-    Assert.assertEquals(countIntermediateFolders(path),expectedCount,"Found: " + listIntermediateFolders(path));
-  }
-  
-  private int countIntermediateFolders(Path path) throws IOException
-  {
-    FileSystem fs = getFileSystem();
-    return fs.globStatus(new Path(path,"*/*/*"),PathUtils.nonHiddenPathFilter).length;
-  }
-  
-  private int countOutputFolders(Path path) throws IOException
-  {
-    FileSystem fs = getFileSystem();
-    return fs.listStatus(path,PathUtils.nonHiddenPathFilter).length;
-  }
-  
-  private String listOutputFolders(Path path) throws IOException
-  {
-    StringBuilder sb = new StringBuilder();
-    for (FileStatus stat : getFileSystem().listStatus(path,PathUtils.nonHiddenPathFilter))
-    {
-      sb.append(stat.getPath().getName());
-      sb.append(",");
-    }
-    return sb.toString();
-  }
-  
-  private String listIntermediateFolders(Path path) throws IOException
-  {
-    StringBuilder sb = new StringBuilder();
-    for (FileStatus stat : getFileSystem().globStatus(new Path(path,"*/*/*"),PathUtils.nonHiddenPathFilter))
-    {
-      sb.append(stat.getPath().getName());
-      sb.append(",");
-    }
-    return sb.toString();
-  }
-  
-  private Long loadMemberCount(Path path, String timestamp) throws IOException
-  {
-    FileSystem fs = getFileSystem();
-    Assert.assertTrue(fs.exists(new Path(path, timestamp)));
-    for (FileStatus stat : fs.globStatus(new Path(path,timestamp + "/*.avro")))
-    {
-      _log.info(String.format("found: %s (%d bytes)",stat.getPath(),stat.getLen()));
-      FSDataInputStream is = fs.open(stat.getPath());
-      DatumReader <GenericRecord> reader = new GenericDatumReader<GenericRecord>();
-      DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(is, reader);
-      
-      try
-      {
-        GenericRecord r = dataFileStream.next();
-        Long count = (Long)((GenericRecord)r.get("value")).get("count");   
-        Assert.assertNotNull(count);       
-        System.out.println("found count: " + count);
-        return count;
-      }
-      finally
-      {
-        dataFileStream.close();
-      }
-    }
-    throw new RuntimeException("found no data");
-  }
-  
-  private HashMap<Long,Integer> loadOutputCounts(Path path, String timestamp) throws IOException
-  {
-    HashMap<Long,Integer> counts = new HashMap<Long,Integer>();
-    FileSystem fs = getFileSystem();
-    Assert.assertTrue(fs.exists(new Path(path, timestamp)));
-    for (FileStatus stat : fs.globStatus(new Path(path,timestamp + "/*.avro")))
-    {
-      _log.info(String.format("found: %s (%d bytes)",stat.getPath(),stat.getLen()));
-      FSDataInputStream is = fs.open(stat.getPath());
-      DatumReader <GenericRecord> reader = new GenericDatumReader<GenericRecord>();
-      DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(is, reader);
-      
-      try
-      {
-        while (dataFileStream.hasNext())
-        {          
-          GenericRecord r = dataFileStream.next();
-          _log.info("found: " + r.toString());
-          Long memberId = (Long)((GenericRecord)r.get("key")).get("member_id");
-          Assert.assertNotNull(memberId);
-          Integer count = (Integer)((GenericRecord)r.get("value")).get("count");   
-          Assert.assertNotNull(count);     
-          Assert.assertFalse(counts.containsKey(memberId));
-          counts.put(memberId, count);
-        }
-      }
-      finally
-      {
-        dataFileStream.close();
-      }
-    }
-    return counts;
-  }
-}