You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ja...@apache.org on 2014/05/18 21:59:48 UTC
[10/14] DATAFU-44: Migrate Hourglass to Gradle
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingCombiner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingCombiner.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingCombiner.java
deleted file mode 100644
index 03d36f3..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingCombiner.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Reducer;
-
-/**
- * A Hadoop combiner which delegates to an implementation read from the distributed cache.
- *
- * @author "Matthew Hayes"
- *
- */
-public class DelegatingCombiner extends Reducer<Object, Object, Object, Object>
-{
- private ObjectReducer processor;
-
- @Override
- protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException
- {
- super.setup(context);
-
- if (context != null)
- {
- Configuration conf = context.getConfiguration();
-
- String path = conf.get(Parameters.COMBINER_IMPL_PATH);
-
- this.processor = (ObjectReducer)DistributedCacheHelper.readObject(conf, new Path(path));
- this.processor.setContext(context);
- }
- }
-
- @Override
- protected void reduce(Object key, Iterable<Object> values, Context context) throws java.io.IOException, java.lang.InterruptedException
- {
- this.processor.reduce(key, values, context);
- }
-
- @Override
- protected void cleanup(Context context) throws IOException,
- InterruptedException
- {
- this.processor.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingMapper.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingMapper.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingMapper.java
deleted file mode 100644
index d170cf5..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingMapper.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- * A Hadoop mapper which delegates to an implementation read from the distributed cache.
- *
- * @author "Matthew Hayes"
- *
- */
-public class DelegatingMapper extends Mapper<Object, Object, Object, Object>
-{
- private ObjectMapper processor;
-
- @Override
- protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException
- {
- super.setup(context);
-
- if (context != null)
- {
- Configuration conf = context.getConfiguration();
-
- String path = conf.get(Parameters.MAPPER_IMPL_PATH);
-
- this.processor = (ObjectMapper)DistributedCacheHelper.readObject(conf, new Path(path));
- this.processor.setContext(context);
- }
- }
-
- @Override
- protected void map(Object key, Object value, Context context) throws java.io.IOException, java.lang.InterruptedException
- {
- this.processor.map(key, context);
- }
-
- @Override
- protected void cleanup(Context context) throws IOException,
- InterruptedException
- {
- this.processor.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingReducer.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingReducer.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingReducer.java
deleted file mode 100644
index 4ddf918..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DelegatingReducer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Reducer;
-
-/**
- * A Hadoop reducer which delegates to an implementation read from the distributed cache.
- *
- * @author "Matthew Hayes"
- *
- */
-public class DelegatingReducer extends Reducer<Object, Object, Object, Object>
-{
- private ObjectReducer processor;
-
- @Override
- protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException
- {
- super.setup(context);
-
- if (context != null)
- {
- Configuration conf = context.getConfiguration();
-
- String path = conf.get(Parameters.REDUCER_IMPL_PATH);
-
- this.processor = (ObjectReducer)DistributedCacheHelper.readObject(conf, new Path(path));
- this.processor.setContext(context);
- }
- }
-
- @Override
- protected void reduce(Object key, Iterable<Object> values, Context context) throws java.io.IOException, java.lang.InterruptedException
- {
- this.processor.reduce(key, values, context);
- }
-
- @Override
- protected void cleanup(Context context) throws IOException,
- InterruptedException
- {
- this.processor.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java
deleted file mode 100644
index bdb626a..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Methods for working with the Hadoop distributed cache.
- *
- * @author "Matthew Hayes"
- *
- */
-public class DistributedCacheHelper
-{
- /**
- * Deserializes an object from a path in HDFS.
- *
- * @param conf Hadoop configuration
- * @param path Path to deserialize from
- * @return Deserialized object
- * @throws IOException
- */
- public static Object readObject(Configuration conf, org.apache.hadoop.fs.Path path) throws IOException
- {
- String localPath = null;
- Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(conf);
- for (Path localCacheFile : localCacheFiles)
- {
- if (localCacheFile.toString().endsWith(path.toString()))
- {
- localPath = localCacheFile.toString();
- break;
- }
- }
- if (localPath == null)
- {
- throw new RuntimeException("Could not find " + path + " in local cache");
- }
- FileInputStream inputStream = new FileInputStream(new File(localPath));
- ObjectInputStream objStream = new ObjectInputStream(inputStream);
-
- try
- {
- try {
- return objStream.readObject();
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- }
- finally
- {
- objStream.close();
- inputStream.close();
- }
- }
-
- /**
- * Serializes an object to a path in HDFS and adds the file to the distributed cache.
- *
- * @param conf Hadoop configuration
- * @param obj Object to serialize
- * @param path Path to serialize object to
- * @throws IOException
- */
- public static void writeObject(Configuration conf, Object obj, org.apache.hadoop.fs.Path path) throws IOException
- {
- FileSystem fs = FileSystem.get(conf);
- FSDataOutputStream outputStream = fs.create(path, true);
- ObjectOutputStream objStream = new ObjectOutputStream(outputStream);
- objStream.writeObject(obj);
- objStream.close();
- outputStream.close();
- DistributedCache.addCacheFile(path.toUri(), conf);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectMapper.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectMapper.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectMapper.java
deleted file mode 100644
index ea1ca2e..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectMapper.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapreduce.MapContext;
-
-/**
- * Defines the interface for a mapper implementation that {@link DelegatingMapper} delegates to.
- *
- * @author "Matthew Hayes"
- *
- */
-public abstract class ObjectMapper extends ObjectProcessor {
- public abstract void map(Object input,
- MapContext<Object,Object,Object,Object> context) throws IOException,InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectProcessor.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectProcessor.java
deleted file mode 100644
index f33c790..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectProcessor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-/**
- * Base class for {@link ObjectMapper} and {@link ObjectReducer}.
- *
- * @author "Matthew Hayes"
- *
- */
-public abstract class ObjectProcessor
-{
- private TaskInputOutputContext<Object,Object,Object,Object> context;
-
- public TaskInputOutputContext<Object,Object,Object,Object> getContext()
- {
- return this.context;
- }
-
- public void setContext(TaskInputOutputContext<Object,Object,Object,Object> context)
- {
- this.context = context;
- }
-
- public void close() throws IOException, InterruptedException
- {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectReducer.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectReducer.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectReducer.java
deleted file mode 100644
index 9340019..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/ObjectReducer.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapreduce.ReduceContext;
-
-/**
- * Defines the interface for combiner and reducer implementations that {@link DelegatingCombiner} and
- * {@link DelegatingReducer} delegate to.
- *
- * @author "Matthew Hayes"
- *
- */
-public abstract class ObjectReducer extends ObjectProcessor {
- public abstract void reduce(Object key,
- Iterable<Object> values,
- ReduceContext<Object,Object,Object,Object> context) throws IOException,InterruptedException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/Parameters.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/Parameters.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/Parameters.java
deleted file mode 100644
index 65d3fd9..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/Parameters.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-/**
- * Parameters used by the jobs to pass configuration settings
- * to the mappers, combiners, and reducers.
- *
- * @author "Matthew Hayes"
- *
- */
-public class Parameters
-{
- public static final String MAPPER_IMPL_PATH = "hourglass.mapper.impl.path";
- public static final String REDUCER_IMPL_PATH = "hourglass.reducer.impl.path";
- public static final String COMBINER_IMPL_PATH = "hourglass.combiner.impl.path";
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningCombiner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningCombiner.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningCombiner.java
deleted file mode 100644
index 53d6a90..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningCombiner.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.hadoop.mapreduce.ReduceContext;
-
-import datafu.hourglass.model.Accumulator;
-
-/**
- * The combiner used by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob} and its derived classes.
- *
- * <p>
- * An implementation of {@link datafu.hourglass.model.Accumulator} is used to perform aggregation and produce the
- * intermediate value.
- * </p>
- *
- * @author "Matthew Hayes"
- *
- */
-public class PartitioningCombiner extends ObjectReducer implements Serializable
-{
- private Accumulator<GenericRecord,GenericRecord> accumulator;
-
- @SuppressWarnings("unchecked")
- public void reduce(Object keyObj,
- Iterable<Object> values,
- ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
- {
- Accumulator<GenericRecord,GenericRecord> acc = getAccumulator();
-
- if (acc == null)
- {
- throw new RuntimeException("No accumulator set for combiner!");
- }
-
- acc.cleanup();
-
- long accumulatedCount = 0;
- for (Object valueObj : values)
- {
- AvroValue<GenericRecord> value = (AvroValue<GenericRecord>)valueObj;
- acc.accumulate(value.datum());
- accumulatedCount++;
- }
-
- if (accumulatedCount > 0)
- {
- GenericRecord intermediateValue = acc.getFinal();
- if (intermediateValue != null)
- {
- context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(intermediateValue));
- }
- }
- }
-
- /**
- * Gets the accumulator used to perform aggregation.
- *
- * @return The accumulator
- */
- public Accumulator<GenericRecord,GenericRecord> getAccumulator()
- {
- return accumulator;
- }
-
- /**
- * Sets the accumulator used to perform aggregation.
- *
- * @param acc The accumulator
- */
- public void setAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
- {
- accumulator = acc;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningMapper.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningMapper.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningMapper.java
deleted file mode 100644
index 43b7130..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningMapper.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.mapreduce.MapContext;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-
-import datafu.hourglass.fs.PathUtils;
-import datafu.hourglass.model.KeyValueCollector;
-import datafu.hourglass.model.Mapper;
-import datafu.hourglass.schemas.PartitionPreservingSchemas;
-
-/**
- * The mapper used by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob} and its derived classes.
- *
- * <p>
- * An implementation of {@link datafu.hourglass.model.Mapper} is used for the
- * map operation, which produces key and intermediate value pairs from the input.
- * The input to the mapper is assumed to be partitioned by day.
- * Each key produced by {@link datafu.hourglass.model.Mapper} is tagged with the time for the partition
- * that the input came from. This enables the combiner and reducer to preserve the partitions.
- * </p>
- *
- * @author "Matthew Hayes"
- *
- */
-public class PartitioningMapper extends ObjectMapper implements Serializable
-{
- private transient MapCollector _mapCollector;
- private transient FileSplit _lastSplit;
- private transient long _lastTime;
- private Mapper<GenericRecord,GenericRecord,GenericRecord> _mapper;
- private PartitionPreservingSchemas _schemas;
-
- @SuppressWarnings("unchecked")
- @Override
- public void map(Object inputObj, MapContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
- {
- long time;
-
- if (_lastSplit == context.getInputSplit())
- {
- time = _lastTime;
- }
- else
- {
- _lastSplit = (FileSplit)context.getInputSplit();
- time = PathUtils.getDateForNestedDatedPath((_lastSplit).getPath().getParent()).getTime();
- _lastTime = time;
- }
-
- getMapCollector().setContext(context);
-
- // Set the time, representing the time range this data was derived from.
- // The key is tagged with this time.
- getMapCollector().setTime(time);
-
- try
- {
- AvroKey<GenericRecord> input = (AvroKey<GenericRecord>)inputObj;
- getMapper().map(input.datum(),getMapCollector());
- }
- catch (InterruptedException e)
- {
- throw new IOException(e);
- }
- }
-
- /**
- * Gets the mapper.
- *
- * @return mapper
- */
- public Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper()
- {
- return _mapper;
- }
-
- /**
- * Sets the mapper.
- *
- * @param mapper
- */
- public void setMapper(Mapper<GenericRecord,GenericRecord,GenericRecord> mapper)
- {
- _mapper = mapper;
- }
-
- /**
- * Sets the Avro schemas.
- *
- * @param schemas
- */
- public void setSchemas(PartitionPreservingSchemas schemas)
- {
- _schemas = schemas;
- }
-
- /**
- * Gets the Avro schemas.
- *
- * @return schemas
- */
- public PartitionPreservingSchemas getSchemas()
- {
- return _schemas;
- }
-
- @Override
- public void setContext(
- TaskInputOutputContext<Object, Object, Object, Object> context)
- {
- super.setContext(context);
-
- if (_mapper instanceof Configurable)
- {
- ((Configurable)_mapper).setConf(context.getConfiguration());
- }
- }
-
- private MapCollector getMapCollector()
- {
- if (_mapCollector == null)
- {
- _mapCollector = new MapCollector(getSchemas());
- }
-
- return _mapCollector;
- }
-
- /**
- * A {@see KeyValueCollector} that writes to {@see MapContext} and tags each mapped key with the time for the partition
- * it was derived from. This keeps the data partitioned so that the reducer may process each partition independently.
- *
- * @author "Matthew Hayes"
- *
- */
- private class MapCollector implements KeyValueCollector<GenericRecord,GenericRecord>
- {
- private MapContext<Object,Object,Object,Object> context;
- private GenericRecord wrappedKey;
-
- public MapCollector(PartitionPreservingSchemas schemas)
- {
- this.wrappedKey = new GenericData.Record(schemas.getMapOutputKeySchema());
- }
-
- public void setContext(MapContext<Object,Object,Object,Object> context)
- {
- this.context = context;
- }
-
- public void setTime(long time)
- {
- this.wrappedKey.put("time", time);
- }
-
- public void collect(GenericRecord key, GenericRecord value) throws IOException, InterruptedException
- {
- wrappedKey.put("value", key);
- context.write(new AvroKey<GenericRecord>(wrappedKey),new AvroValue<GenericRecord>(value));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningReducer.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningReducer.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningReducer.java
deleted file mode 100644
index d6726ac..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/PartitioningReducer.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
-import org.apache.hadoop.mapreduce.ReduceContext;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-
-import datafu.hourglass.fs.PathUtils;
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.schemas.PartitionPreservingSchemas;
-
-/**
- * The reducer used by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob} and its derived classes.
- *
- * <p>
- * An implementation of {@link datafu.hourglass.model.Accumulator} is used to perform aggregation and produce the
- * output value.
- * </p>
- *
- * <p>
- * The input key is assumed to have time and value fields. The value here is the true key,
- * and the time represents the input partition the data was derived from. The true key is
- * used as the key in the reducer output and the time is dropped.
- * This reducer uses multiple outputs; the time is used to determine which output to write to,
- * where the named outputs have the form yyyyMMdd derived from the time.
- * </p>
- *
- * @author "Matthew Hayes"
- *
- */
-public class PartitioningReducer extends ObjectReducer implements Serializable
-{
- private transient AvroMultipleOutputs _multipleOutputs;
- private transient Map<Long,String> _timeToNamedOutput;
- private PartitionPreservingSchemas _schemas;
- private Accumulator<GenericRecord,GenericRecord> accumulator;
-
- @SuppressWarnings("unchecked")
- public void reduce(Object keyObj,
- Iterable<Object> values,
- ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
- {
- Accumulator<GenericRecord,GenericRecord> acc = getAccumulator();
-
- if (acc == null)
- {
- throw new RuntimeException("No accumulator set for reducer!");
- }
-
- acc.cleanup();
-
- Long keyTime = null;
-
- GenericRecord key = ((AvroKey<GenericRecord>)keyObj).datum();
-
- keyTime = (Long)key.get("time");
- key = (GenericRecord)key.get("value");
-
- long accumulatedCount = 0;
- for (Object valueObj : values)
- {
- GenericRecord value = ((AvroValue<GenericRecord>)valueObj).datum();
- acc.accumulate(value);
- accumulatedCount++;
- }
-
- if (accumulatedCount > 0)
- {
- GenericRecord outputValue = acc.getFinal();
- if (outputValue != null)
- {
- GenericRecord output = new GenericData.Record(getSchemas().getReduceOutputSchema());
- output.put("key", key);
- output.put("value", outputValue);
-
- // write output in directories corresponding to each day
- String namedOutput = getNamedOutput(keyTime);
- if (_multipleOutputs == null)
- {
- throw new RuntimeException("No multiple outputs set");
- }
- _multipleOutputs.write(namedOutput, new AvroKey<GenericRecord>(output), (AvroValue<GenericRecord>)null);
- }
- }
- }
-
- @Override
- public void setContext(TaskInputOutputContext<Object,Object,Object,Object> context)
- {
- super.setContext(context);
-
- // ... and we also write the final output to multiple directories
- _multipleOutputs = new AvroMultipleOutputs(context);
- }
-
- /**
- * Sets the accumulator used to perform aggregation.
- *
- * @param acc The accumulator
- */
- public void setAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
- {
- accumulator = acc;
- }
-
- /**
- * Gets the accumulator used to perform aggregation.
- *
- * @return The accumulator
- */
- public Accumulator<GenericRecord,GenericRecord> getAccumulator()
- {
- return accumulator;
- }
-
- /**
- * Sets the Avro schemas.
- *
- * @param schemas
- */
- public void setSchemas(PartitionPreservingSchemas schemas)
- {
- _schemas = schemas;
- }
-
- /**
- * Gets the Avro schemas
- *
- * @return schemas
- */
- public PartitionPreservingSchemas getSchemas()
- {
- return _schemas;
- }
-
- @Override
- public void close() throws IOException, InterruptedException
- {
- super.close();
-
- if (_multipleOutputs != null)
- {
- _multipleOutputs.close();
- _multipleOutputs = null;
- }
- }
-
- private String getNamedOutput(Long time)
- {
- if (_timeToNamedOutput == null)
- {
- _timeToNamedOutput = new HashMap<Long,String>();
- }
- String namedOutput = _timeToNamedOutput.get(time);
- if (namedOutput == null)
- {
- namedOutput = PathUtils.datedPathFormat.format(new Date(time));
- _timeToNamedOutput.put(time, namedOutput);
- }
- return namedOutput;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/package-info.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/package-info.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/package-info.java
deleted file mode 100644
index 5ee265b..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/package-info.java
+++ /dev/null
@@ -1,6 +0,0 @@
-/**
- * Implementations of mappers, combiners, and reducers used by incremental jobs.
- * These are used internally by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob}
- * and {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob}.
- */
-package datafu.hourglass.mapreduce;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/model/Accumulator.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/model/Accumulator.java b/contrib/hourglass/src/java/datafu/hourglass/model/Accumulator.java
deleted file mode 100644
index 4f7ac7e..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/model/Accumulator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.model;
-
-import java.io.Serializable;
-
-/**
- * Collects a sequence of values and produces one value as a result.
- *
- * @author "Matthew Hayes"
- *
- * @param <In> Input value type
- * @param <Out> Output value type
- */
-public interface Accumulator<In,Out> extends Serializable
-{
- /**
- * Accumulate another value.
- *
- * @param value Value to accumulate
- */
- void accumulate(In value);
-
- /**
- * Get the output value corresponding to all input values accumulated so far.
- *
- * <p>
- * This may return null to indicate no record should be written.
- * </p>
- *
- * @return Output value
- */
- Out getFinal();
-
- /**
- * Resets the internal state so that all values accumulated so far are forgotten.
- */
- void cleanup();
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/model/KeyValueCollector.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/model/KeyValueCollector.java b/contrib/hourglass/src/java/datafu/hourglass/model/KeyValueCollector.java
deleted file mode 100644
index 215dd8d..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/model/KeyValueCollector.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.model;
-
-import java.io.IOException;
-
-/**
- * Provided to an instance of {@link Mapper} to collect key-value pairs.
- *
- * @author "Matthew Hayes"
- *
- * @param <K> Key type
- * @param <V> Value type
- */
-public interface KeyValueCollector<K,V>
-{
- /**
- * Collects key-value pairs.
- *
- * @param key Key to be collected
- * @param value Value to be collected
- * @throws IOException
- * @throws InterruptedException
- */
- void collect(K key,V value) throws IOException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/model/Mapper.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/model/Mapper.java b/contrib/hourglass/src/java/datafu/hourglass/model/Mapper.java
deleted file mode 100644
index 7640ae5..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/model/Mapper.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.model;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * Maps an input record to one or more key-value pairs.
- *
- * @author "Matthew Hayes"
- *
- * @param <In> Input type
- * @param <OutKey> Output key type
- * @param <OutVal> Output value type
- */
-public interface Mapper<In,OutKey,OutVal> extends Serializable
-{
- /**
- * Maps an input record to one or more key-value pairs.
- *
- * @param input Input value
- * @param collector Collects output key-value pairs
- * @throws IOException
- * @throws InterruptedException
- */
- void map(In input, KeyValueCollector<OutKey,OutVal> collector) throws IOException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/model/Merger.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/model/Merger.java b/contrib/hourglass/src/java/datafu/hourglass/model/Merger.java
deleted file mode 100644
index 0c27865..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/model/Merger.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.model;
-
-import java.io.Serializable;
-
-/**
- * Merges two values together.
- *
- * @author "Matthew Hayes"
- *
- * @param <T> Value type
- */
-public interface Merger<T> extends Serializable
-{
- /**
- * Merges two values together.
- *
- * <p>
- * This may return null to indicate no record should be written.
- * </p>
- *
- * @param left Left value
- * @param right Right value
- * @return Merged result
- */
- T merge(T left, T right);
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/model/package-info.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/model/package-info.java b/contrib/hourglass/src/java/datafu/hourglass/model/package-info.java
deleted file mode 100644
index 984c1fb..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/model/package-info.java
+++ /dev/null
@@ -1,4 +0,0 @@
-/**
- * Interfaces which define the incremental processing model.
- */
-package datafu.hourglass.model;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/schemas/PartitionCollapsingSchemas.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/schemas/PartitionCollapsingSchemas.java b/contrib/hourglass/src/java/datafu/hourglass/schemas/PartitionCollapsingSchemas.java
deleted file mode 100644
index 8d8246c..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/schemas/PartitionCollapsingSchemas.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.schemas;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.mapred.Pair;
-import org.apache.commons.lang.NullArgumentException;
-
-/**
- * Generates the Avro schemas used by {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob} and its derivations.
- *
- * @author "Matthew Hayes"
- *
- */
-public class PartitionCollapsingSchemas implements Serializable
-{
- private static String DATED_INTERMEDIATE_VALUE_SCHEMA_NAME = "DatedMapValue";
- private static String KEY_SCHEMA = "key.schema";
- private static String INTERMEDIATE_VALUE_SCHEMA = "intermediate.value.schema";
- private static String OUTPUT_VALUE_SCHEMA = "output.value.schema";
-
- private final String _outputSchemaName;
- private final String _outputSchemaNamespace;
- private transient Schema _keySchema;
- private transient Schema _intermediateValueSchema;
- private transient Schema _outputValueSchema;
-
- // generated schemas
- private transient Schema _mapOutputSchema;
- private transient Schema _dateIntermediateValueSchema;
- private transient Schema _mapOutputValueSchema;
- private transient Schema _reduceOutputSchema;
- private transient Map<String,Schema> _mapInputSchemas;
-
- //schemas are stored here so the object can be serialized
- private Map<String,String> conf;
-
- private Map<String,String> _inputSchemas;
-
- public PartitionCollapsingSchemas(TaskSchemas schemas, Map<String,Schema> inputSchemas, String outputSchemaName, String outputSchemaNamespace)
- {
- if (schemas == null)
- {
- throw new NullArgumentException("schemas");
- }
- if (inputSchemas == null)
- {
- throw new NullArgumentException("inputSchema");
- }
- if (outputSchemaName == null)
- {
- throw new NullArgumentException("outputSchemaName");
- }
- if (outputSchemaName == outputSchemaNamespace)
- {
- throw new NullArgumentException("outputSchemaNamespace");
- }
- _outputSchemaName = outputSchemaName;
- _outputSchemaNamespace = outputSchemaNamespace;
-
- conf = new HashMap<String,String>();
- conf.put(KEY_SCHEMA, schemas.getKeySchema().toString());
- conf.put(INTERMEDIATE_VALUE_SCHEMA, schemas.getIntermediateValueSchema().toString());
- conf.put(OUTPUT_VALUE_SCHEMA, schemas.getOutputValueSchema().toString());
-
- _inputSchemas = new HashMap<String,String>();
- for (Entry<String,Schema> schema : inputSchemas.entrySet())
- {
- _inputSchemas.put(schema.getKey(), schema.getValue().toString());
- }
- }
-
- public Map<String,Schema> getMapInputSchemas()
- {
- if (_mapInputSchemas == null)
- {
- _mapInputSchemas = new HashMap<String,Schema>();
-
- for (Entry<String,String> schemaPair : _inputSchemas.entrySet())
- {
- Schema schema = new Schema.Parser().parse(schemaPair.getValue());
-
- List<Schema> mapInputSchemas = new ArrayList<Schema>();
-
- if (schema.getType() == Type.UNION)
- {
- mapInputSchemas.addAll(schema.getTypes());
- }
- else
- {
- mapInputSchemas.add(schema);
- }
-
- // feedback from output (optional)
- mapInputSchemas.add(getReduceOutputSchema());
-
- _mapInputSchemas.put(schemaPair.getKey(), Schema.createUnion(mapInputSchemas));
- }
-
-
- }
- return Collections.unmodifiableMap(_mapInputSchemas);
- }
-
- public Schema getMapOutputSchema()
- {
- if (_mapOutputSchema == null)
- {
- _mapOutputSchema = Pair.getPairSchema(getMapOutputKeySchema(),
- getMapOutputValueSchema());
- }
- return _mapOutputSchema;
- }
-
- public Schema getKeySchema()
- {
- if (_keySchema == null)
- {
- _keySchema = new Schema.Parser().parse(conf.get(KEY_SCHEMA));
- }
- return _keySchema;
- }
-
- public Schema getMapOutputKeySchema()
- {
- return getKeySchema();
- }
-
- public Schema getReduceOutputSchema()
- {
- if (_reduceOutputSchema == null)
- {
- _reduceOutputSchema = Schema.createRecord(_outputSchemaName, null, _outputSchemaNamespace, false);
- List<Field> fields = Arrays.asList(new Field("key",getKeySchema(), null, null),
- new Field("value", getOutputValueSchema(), null, null));
- _reduceOutputSchema.setFields(fields);
- }
- return _reduceOutputSchema;
- }
-
- public Schema getDatedIntermediateValueSchema()
- {
- if (_dateIntermediateValueSchema == null)
- {
- _dateIntermediateValueSchema = Schema.createRecord(DATED_INTERMEDIATE_VALUE_SCHEMA_NAME, null, _outputSchemaNamespace, false);
- List<Field> intermediateValueFields = Arrays.asList(new Field("value", getIntermediateValueSchema(), null, null),
- new Field("time", Schema.create(Type.LONG), null, null));
- _dateIntermediateValueSchema.setFields(intermediateValueFields);
- }
- return _dateIntermediateValueSchema;
- }
-
- public Schema getOutputValueSchema()
- {
- if (_outputValueSchema == null)
- {
- _outputValueSchema = new Schema.Parser().parse(conf.get(OUTPUT_VALUE_SCHEMA));
- }
- return _outputValueSchema;
- }
-
- public Schema getIntermediateValueSchema()
- {
- if (_intermediateValueSchema == null)
- {
- _intermediateValueSchema = new Schema.Parser().parse(conf.get(INTERMEDIATE_VALUE_SCHEMA));
- }
- return _intermediateValueSchema;
- }
-
- public Schema getMapOutputValueSchema()
- {
- if (_mapOutputValueSchema == null)
- {
- List<Schema> unionSchemas = new ArrayList<Schema>();
-
- unionSchemas.add(getIntermediateValueSchema());
-
- // intermediate values tagged with the date
- unionSchemas.add(getDatedIntermediateValueSchema());
-
- // feedback from output of second pass
- if (!unionSchemas.contains(getOutputValueSchema()))
- {
- unionSchemas.add(getOutputValueSchema());
- }
-
- _mapOutputValueSchema = Schema.createUnion(unionSchemas);
- }
- return _mapOutputValueSchema;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/schemas/PartitionPreservingSchemas.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/schemas/PartitionPreservingSchemas.java b/contrib/hourglass/src/java/datafu/hourglass/schemas/PartitionPreservingSchemas.java
deleted file mode 100644
index d4f3bd2..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/schemas/PartitionPreservingSchemas.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.schemas;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.mapred.Pair;
-
-/**
- * Generates the Avro schemas used by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob} and its derivations.
- *
- * @author "Matthew Hayes"
- *
- */
-public class PartitionPreservingSchemas implements Serializable
-{
- private static String DATED_INCREMENTAL_KEY_SCHEMA_NAME = "DatedMapKey";
-
- private static String KEY_SCHEMA = "key.schema";
- private static String INTERMEDIATE_VALUE_SCHEMA = "intermediate.value.schema";
- private static String OUPUT_VALUE_SCHEMA = "output.value.schema";
-
- private transient Schema _keySchema;
- private transient Schema _intermediateValueSchema;
- private transient Schema _outputValueSchema;
- private final String _outputSchemaName;
- private final String _outputSchemaNamespace;
-
- // generated schemas
- private transient Schema _reduceOutputSchema;
- private transient Schema _mapOutputKeySchema;
- private transient Schema _mapOutputSchema;
- private transient Map<String,Schema> _inputSchemasParsed;
-
- // schemas are stored here so the object can be serialized
- private Map<String,String> conf;
-
- private Map<String,String> _inputSchemas;
-
- public PartitionPreservingSchemas(TaskSchemas schemas, Map<String,Schema> inputSchemas, String outputSchemaName, String outputSchemaNamespace)
- {
- _outputSchemaName = outputSchemaName;
- _outputSchemaNamespace = outputSchemaNamespace;
-
- conf = new HashMap<String,String>();
- conf.put(KEY_SCHEMA, schemas.getKeySchema().toString());
- conf.put(INTERMEDIATE_VALUE_SCHEMA, schemas.getIntermediateValueSchema().toString());
- conf.put(OUPUT_VALUE_SCHEMA, schemas.getOutputValueSchema().toString());
-
- _inputSchemas = new HashMap<String,String>();
- for (Entry<String,Schema> schema : inputSchemas.entrySet())
- {
- _inputSchemas.put(schema.getKey(), schema.getValue().toString());
- }
- }
-
- public Map<String,Schema> getMapInputSchemas()
- {
- if (_inputSchemasParsed == null)
- {
- _inputSchemasParsed = new HashMap<String,Schema>();
- for (Entry<String,String> schema : _inputSchemas.entrySet())
- {
- _inputSchemasParsed.put(schema.getKey(), new Schema.Parser().parse(schema.getValue()));
- }
- }
- return _inputSchemasParsed;
- }
-
- public Schema getMapOutputSchema()
- {
- if (_mapOutputSchema == null)
- {
- _mapOutputSchema = Pair.getPairSchema(getMapOutputKeySchema(),
- getMapOutputValueSchema());
- }
- return _mapOutputSchema;
- }
-
- public Schema getReduceOutputSchema()
- {
- if (_reduceOutputSchema == null)
- {
- _reduceOutputSchema = Schema.createRecord(_outputSchemaName, null, _outputSchemaNamespace, false);
- List<Field> fields = Arrays.asList(new Field("key",getKeySchema(), null, null),
- new Field("value", getOutputValueSchema(), null, null));
- _reduceOutputSchema.setFields(fields);
- }
- return _reduceOutputSchema;
- }
-
- public Schema getOutputValueSchema()
- {
- if (_outputValueSchema == null)
- {
- _outputValueSchema = new Schema.Parser().parse(conf.get(OUPUT_VALUE_SCHEMA));
- }
- return _outputValueSchema;
- }
-
- public Schema getKeySchema()
- {
- if (_keySchema == null)
- {
- _keySchema = new Schema.Parser().parse(conf.get(KEY_SCHEMA));
- }
- return _keySchema;
- }
-
- public Schema getMapOutputKeySchema()
- {
- if (_mapOutputKeySchema == null)
- {
- _mapOutputKeySchema = Schema.createRecord(DATED_INCREMENTAL_KEY_SCHEMA_NAME, null, _outputSchemaNamespace, false);
- // key needs the time included with it for partitioning
- List<Field> incrementalKeyFields = Arrays.asList(new Field("value", getKeySchema(), null, null),
- new Field("time", Schema.create(Type.LONG), null, null));
- _mapOutputKeySchema.setFields(incrementalKeyFields);
- }
- return _mapOutputKeySchema;
- }
-
- public Schema getIntermediateValueSchema()
- {
- if (_intermediateValueSchema == null)
- {
- _intermediateValueSchema = new Schema.Parser().parse(conf.get(INTERMEDIATE_VALUE_SCHEMA));
- }
- return _intermediateValueSchema;
- }
-
- public Schema getMapOutputValueSchema()
- {
- return getIntermediateValueSchema();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/schemas/TaskSchemas.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/schemas/TaskSchemas.java b/contrib/hourglass/src/java/datafu/hourglass/schemas/TaskSchemas.java
deleted file mode 100644
index 829755e..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/schemas/TaskSchemas.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.schemas;
-
-import org.apache.avro.Schema;
-
-/**
- * Contains the Avro schemas for the key, intermediate value, and output value of a job.
- *
- * <p>
- * The mapper and combiner output key-value pairs conforming to the key and intermediate
- * value schemas defined here.
- * The reducer outputs key-value pairs conforming to the key and output value schemas.
- * </p>
- *
- * @author "Matthew Hayes"
- *
- */
-public class TaskSchemas
-{
- private Schema keySchema;
- private Schema intermediateValueSchema;
- private Schema outputValueSchema;
-
- private TaskSchemas(Schema keySchema, Schema intermediateValueSchema, Schema outputValueSchema)
- {
- if (keySchema == null)
- {
- throw new IllegalArgumentException("missing key schema");
- }
-
- if (intermediateValueSchema == null)
- {
- throw new IllegalArgumentException("missing intermediate value schema");
- }
-
- if (outputValueSchema == null)
- {
- throw new IllegalArgumentException("missing output value schema");
- }
-
- this.keySchema = keySchema;
- this.intermediateValueSchema = intermediateValueSchema;
- this.outputValueSchema = outputValueSchema;
- }
-
- public Schema getKeySchema()
- {
- return keySchema;
- }
-
- public Schema getIntermediateValueSchema()
- {
- return intermediateValueSchema;
- }
-
- public Schema getOutputValueSchema()
- {
- return outputValueSchema;
- }
-
- public static class Builder
- {
- private Schema keySchema;
- private Schema intermediateValueSchema;
- private Schema outputValueSchema;
-
- public Builder setKeySchema(Schema schema)
- {
- keySchema = schema;
- return this;
- }
-
- public Builder setIntermediateValueSchema(Schema schema)
- {
- intermediateValueSchema = schema;
- return this;
- }
-
- public Builder setOutputValueSchema(Schema schema)
- {
- outputValueSchema = schema;
- return this;
- }
-
- public TaskSchemas build()
- {
- return new TaskSchemas(keySchema,intermediateValueSchema,outputValueSchema);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/schemas/package-info.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/schemas/package-info.java b/contrib/hourglass/src/java/datafu/hourglass/schemas/package-info.java
deleted file mode 100644
index 3a6d2b8..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/schemas/package-info.java
+++ /dev/null
@@ -1,6 +0,0 @@
-/**
- * Classes that help manage the Avro schemas used by the jobs.
- * These are used internally by {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob}
- * and {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob}.
- */
-package datafu.hourglass.schemas;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/test.sh
----------------------------------------------------------------------
diff --git a/contrib/hourglass/test.sh b/contrib/hourglass/test.sh
deleted file mode 100755
index f017e58..0000000
--- a/contrib/hourglass/test.sh
+++ /dev/null
@@ -1,5 +0,0 @@
-#!/usr/bin/env bash
-
-echo $$ > test.pid
-ant test
-rm test.pid
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/test/java/datafu/hourglass/demo/CountById.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/test/java/datafu/hourglass/demo/CountById.java b/contrib/hourglass/test/java/datafu/hourglass/demo/CountById.java
deleted file mode 100644
index 166a5db..0000000
--- a/contrib/hourglass/test/java/datafu/hourglass/demo/CountById.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.demo;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Arrays;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-
-
-import datafu.hourglass.jobs.PartitionCollapsingIncrementalJob;
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.model.KeyValueCollector;
-import datafu.hourglass.model.Mapper;
-
-/**
- * Given an input event with field "id" stored in daily directories according to yyyy/MM/dd,
- * incrementally counts the number of events, grouped by each distinct value.
- * New days will be merged with the previous output.
- *
- * @author "Matthew Hayes"
- *
- */
-public class CountById extends Configured implements NamedTool, Serializable
-{
- @Override
- public int run(String[] args) throws Exception
- {
- if (args.length != 2)
- {
- System.err.printf("%s %s\n",getName(),getDescription());
- System.err.println("Usage: <input> <output>");
- return 1;
- }
-
- try
- {
- run(super.getConf(), args[0], args[1]);
- return 0;
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- catch (ClassNotFoundException e)
- {
- e.printStackTrace();
- }
- return 1;
- }
-
- public void run(Configuration conf, String inputPath, String outputPath) throws IOException, InterruptedException, ClassNotFoundException
- {
- PartitionCollapsingIncrementalJob job = new PartitionCollapsingIncrementalJob(Examples.class);
-
- job.setConf(conf);
-
- final String namespace = "com.example";
-
- final Schema keySchema = Schema.createRecord("Key",null,namespace,false);
- keySchema.setFields(Arrays.asList(new Field("member_id",Schema.create(Type.LONG),null,null)));
- final String keySchemaString = keySchema.toString(true);
-
- System.out.println("Key schema: " + keySchemaString);
-
- final Schema valueSchema = Schema.createRecord("Value",null,namespace,false);
- valueSchema.setFields(Arrays.asList(new Field("count",Schema.create(Type.INT),null,null)));
- final String valueSchemaString = valueSchema.toString(true);
-
- System.out.println("Value schema: " + valueSchemaString);
-
- job.setKeySchema(keySchema);
- job.setIntermediateValueSchema(valueSchema);
- job.setOutputValueSchema(valueSchema);
-
- job.setInputPaths(Arrays.asList(new Path(inputPath)));
- job.setOutputPath(new Path(outputPath));
- job.setReusePreviousOutput(true);
- job.setRetentionCount(3);
-
- job.setMapper(new Mapper<GenericRecord,GenericRecord,GenericRecord>()
- {
- private transient Schema kSchema;
- private transient Schema vSchema;
-
- @Override
- public void map(GenericRecord input,
- KeyValueCollector<GenericRecord, GenericRecord> collector) throws IOException,
- InterruptedException
- {
- if (kSchema == null) kSchema = new Schema.Parser().parse(keySchemaString);
- if (vSchema == null) vSchema = new Schema.Parser().parse(valueSchemaString);
- GenericRecord key = new GenericData.Record(kSchema);
- key.put("member_id", input.get("id"));
- GenericRecord value = new GenericData.Record(vSchema);
- value.put("count", 1);
- collector.collect(key,value);
- }
- });
-
- job.setReducerAccumulator(new Accumulator<GenericRecord,GenericRecord>()
- {
- private transient int count;
- private transient Schema vSchema;
-
- @Override
- public void accumulate(GenericRecord value)
- {
- this.count += (Integer)value.get("count");
- }
-
- @Override
- public GenericRecord getFinal()
- {
- if (vSchema == null) vSchema = new Schema.Parser().parse(valueSchemaString);
- GenericRecord output = new GenericData.Record(vSchema);
- output.put("count", count);
- return output;
- }
-
- @Override
- public void cleanup()
- {
- this.count = 0;
- }
- });
-
- job.setCombinerAccumulator(job.getReducerAccumulator());
- job.setUseCombiner(true);
-
- job.run();
- }
-
- @Override
- public String getName()
- {
- return "countbyid";
- }
-
- @Override
- public String getDescription()
- {
- return "incrementally counts by id";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/test/java/datafu/hourglass/demo/EstimateCardinality.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/test/java/datafu/hourglass/demo/EstimateCardinality.java b/contrib/hourglass/test/java/datafu/hourglass/demo/EstimateCardinality.java
deleted file mode 100644
index eeaf8cc..0000000
--- a/contrib/hourglass/test/java/datafu/hourglass/demo/EstimateCardinality.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.demo;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-
-import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
-import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
-
-import datafu.hourglass.jobs.PartitionCollapsingIncrementalJob;
-import datafu.hourglass.jobs.PartitionPreservingIncrementalJob;
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.model.KeyValueCollector;
-import datafu.hourglass.model.Mapper;
-
-/**
- * Given an input event with field "id" stored in daily directories according to yyyy/MM/dd,
- * estimates the cardinality of the values over the past 30 days.
- *
- * @author "Matthew Hayes"
- *
- */
-public class EstimateCardinality extends Configured implements NamedTool, Serializable
-{
- @Override
- public int run(String[] args) throws Exception
- {
- if (args.length != 4)
- {
- System.out.println("Usage: <input> <intermediate> <output> <num-days>");
- System.exit(1);
- }
-
- try
- {
- run(super.getConf(), args[0], args[1], args[2], Integer.parseInt(args[3]));
- }
- catch (IOException e)
- {
- e.printStackTrace();
- System.exit(1);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- System.exit(1);
- }
- catch (ClassNotFoundException e)
- {
- e.printStackTrace();
- System.exit(1);
- }
- return 0;
- }
-
- public void run(Configuration conf, String inputPath, String intermediatePath, String outputPath, int numDays) throws IOException, InterruptedException, ClassNotFoundException
- {
- PartitionPreservingIncrementalJob job1 = new PartitionPreservingIncrementalJob(Examples.class);
- job1.setConf(conf);
-
- final String namespace = "com.example";
-
- final Schema keySchema = Schema.createRecord("Stat",null,namespace,false);
- keySchema.setFields(Arrays.asList(new Field("name", Schema.create(Type.STRING), "the type of statistic", null)));
- final String keySchemaString = keySchema.toString(true);
-
- // data is either an int (the member ID), or bytes from the estimator
- Schema dataSchema = Schema.createUnion(Arrays.asList(Schema.create(Type.LONG),Schema.create(Type.BYTES)));
-
- final Schema valueSchema = Schema.createRecord("Value",null,namespace,false);
- valueSchema.setFields(Arrays.asList(new Field("data", dataSchema, "the data, either a member ID or bytes from the estimator", null),
- new Field("count", Schema.create(Type.LONG), "the number of elements", null)));
- final String valueSchemaString = valueSchema.toString(true);
-
- Mapper<GenericRecord,GenericRecord,GenericRecord> mapper = new Mapper<GenericRecord,GenericRecord,GenericRecord>() {
- private transient Schema kSchema;
- private transient Schema vSchema;
-
- @Override
- public void map(GenericRecord input,
- KeyValueCollector<GenericRecord, GenericRecord> collector) throws IOException,
- InterruptedException
- {
- if (kSchema == null) kSchema = new Schema.Parser().parse(keySchemaString);
- if (vSchema == null) vSchema = new Schema.Parser().parse(valueSchemaString);
- GenericRecord key = new GenericData.Record(kSchema);
- key.put("name", "member_count");
- GenericRecord value = new GenericData.Record(vSchema);
- value.put("data",input.get("id")); // member id
- value.put("count", 1L); // just a single member
- collector.collect(key,value);
- }
- };
-
- Accumulator<GenericRecord,GenericRecord> accumulator = new Accumulator<GenericRecord,GenericRecord>() {
- private transient HyperLogLogPlus estimator;
- private transient Schema vSchema;
-
- @Override
- public void accumulate(GenericRecord value)
- {
- if (estimator == null) estimator = new HyperLogLogPlus(20);
- Object data = value.get("data");
- if (data instanceof Long)
- {
- estimator.offer(data);
- }
- else if (data instanceof ByteBuffer)
- {
- ByteBuffer bytes = (ByteBuffer)data;
- HyperLogLogPlus newEstimator;
- try
- {
- newEstimator = HyperLogLogPlus.Builder.build(bytes.array());
- estimator = (HyperLogLogPlus)estimator.merge(newEstimator);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- catch (CardinalityMergeException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
-
- @Override
- public GenericRecord getFinal()
- {
- if (vSchema == null) vSchema = new Schema.Parser().parse(valueSchemaString);
- GenericRecord output = new GenericData.Record(vSchema);
- try
- {
- ByteBuffer bytes = ByteBuffer.wrap(estimator.getBytes());
- output.put("data", bytes);
- output.put("count", estimator.cardinality());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- return output;
- }
-
- @Override
- public void cleanup()
- {
- estimator = null;
- }
- };
-
- job1.setKeySchema(keySchema);
- job1.setIntermediateValueSchema(valueSchema);
- job1.setOutputValueSchema(valueSchema);
- job1.setInputPaths(Arrays.asList(new Path(inputPath)));
- job1.setOutputPath(new Path(intermediatePath));
- job1.setMapper(mapper);
- job1.setCombinerAccumulator(accumulator);
- job1.setReducerAccumulator(accumulator);
- job1.setNumDays(numDays);
-
- job1.run();
-
- PartitionCollapsingIncrementalJob job2 = new PartitionCollapsingIncrementalJob(Examples.class);
- job2.setConf(conf);
-
- Mapper<GenericRecord,GenericRecord,GenericRecord> identity = new Mapper<GenericRecord,GenericRecord,GenericRecord>()
- {
- @Override
- public void map(GenericRecord record, KeyValueCollector<GenericRecord,GenericRecord> context) throws IOException,
- InterruptedException
- {
- context.collect((GenericRecord)record.get("key"), (GenericRecord)record.get("value"));
- }
- };
-
- job2.setNumDays(30);
- job2.setKeySchema(keySchema);
- job2.setIntermediateValueSchema(valueSchema);
- job2.setOutputValueSchema(valueSchema);
- job2.setInputPaths(Arrays.asList(new Path(intermediatePath)));
- job2.setOutputPath(new Path(outputPath));
- job2.setMapper(identity);
- job2.setCombinerAccumulator(accumulator);
- job2.setReducerAccumulator(accumulator);
- job2.setNumDays(numDays);
-
- job2.run();
- }
-
- @Override
- public String getName()
- {
- return "cardinality";
- }
-
- @Override
- public String getDescription()
- {
- return "estimates cardinality of IDs";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/test/java/datafu/hourglass/demo/Examples.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/test/java/datafu/hourglass/demo/Examples.java b/contrib/hourglass/test/java/datafu/hourglass/demo/Examples.java
deleted file mode 100644
index 8df7539..0000000
--- a/contrib/hourglass/test/java/datafu/hourglass/demo/Examples.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.demo;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-import datafu.hourglass.fs.PathUtils;
-import datafu.hourglass.test.Schemas;
-import datafu.hourglass.test.PartitionCollapsingTests;
-import datafu.hourglass.test.TestBase;
-import datafu.hourglass.test.util.DailyTrackingWriter;
-
-@Test(groups="pcl")
-public class Examples extends TestBase
-{
- private Logger _log = Logger.getLogger(PartitionCollapsingTests.class);
-
- private static final Schema EVENT_SCHEMA;
-
- private GenericRecord _record;
- private DailyTrackingWriter _eventWriter;
-
- static
- {
- EVENT_SCHEMA = Schemas.createRecordSchema(Examples.class, "Event",
- new Field("id", Schema.create(Type.LONG), null, null));
-
- System.out.println("Event schema: " + EVENT_SCHEMA.toString(true));
- }
-
- public Examples() throws IOException
- {
- super();
- }
-
- @BeforeClass
- public void beforeClass() throws Exception
- {
- super.beforeClass();
- }
-
- @AfterClass
- public void afterClass() throws Exception
- {
- super.afterClass();
- }
-
- @BeforeMethod
- public void beforeMethod(Method method) throws IOException
- {
- _log.info("*** Running " + method.getName());
-
- _log.info("*** Cleaning input and output paths");
- getFileSystem().delete(new Path("/data"), true);
- getFileSystem().delete(new Path("/output"), true);
- getFileSystem().mkdirs(new Path("/data"));
- getFileSystem().mkdirs(new Path("/output"));
-
- _record = new GenericData.Record(EVENT_SCHEMA);
- _eventWriter = new DailyTrackingWriter(new Path("/data/event"),EVENT_SCHEMA,getFileSystem());
- }
-
- @Test
- public void countByMember() throws IOException, InterruptedException, ClassNotFoundException
- {
- // setup
- openDayForEvent(2013, 3, 15);
- storeIds(1,1,1);
- storeIds(2);
- storeIds(3,3);
- closeDayForEvent();
-
- openDayForEvent(2013, 3, 16);
- storeIds(1,1);
- storeIds(2,2);
- storeIds(3);
- closeDayForEvent();
-
- // run
- new CountById().run(createJobConf(),"/data/event","/output");
-
- // verify
-
- checkOutputFolderCount(new Path("/output"), 1);
-
- HashMap<Long,Integer> counts = loadOutputCounts(new Path("/output"), "20130316");
-
- checkSize(counts,3);
- checkIdCount(counts,1,5);
- checkIdCount(counts,2,3);
- checkIdCount(counts,3,3);
-
- // more data
- openDayForEvent(2013, 3, 17);
- storeIds(1,1);
- storeIds(2,2,2);
- storeIds(3,3);
- closeDayForEvent();
-
- // run
- new CountById().run(createJobConf(),"/data/event","/output");
-
- counts = loadOutputCounts(new Path("/output"), "20130317");
-
- checkSize(counts,3);
- checkIdCount(counts,1,7);
- checkIdCount(counts,2,6);
- checkIdCount(counts,3,5);
- }
-
- @Test
- public void estimateNumMembers() throws IOException, InterruptedException, ClassNotFoundException
- {
- openDayForEvent(2013, 3, 1);
- // lots of members logged in this day
- for (int i=1; i<=10000; i++)
- {
- storeIds(i);
- }
- closeDayForEvent();
-
- // but only a handful logged in the remaining days
- for (int i=2; i<=30; i++)
- {
- openDayForEvent(2013, 3, i);
- storeIds(1,2,3,4,5);
- closeDayForEvent();
- }
-
- // run
- new EstimateCardinality().run(createJobConf(),"/data/event","/output/daily","/output/summary",30);
-
- // verify
- checkIntermediateFolderCount(new Path("/output/daily"), 30);
- checkOutputFolderCount(new Path("/output/summary"), 1);
- Assert.assertTrue(Math.abs(10000L - loadMemberCount(new Path("/output/summary"),"20130330").longValue())/10000.0 < 0.005);
-
- // more data
- openDayForEvent(2013, 3, 31);
- storeIds(6,7,8,9,10);
- closeDayForEvent();
-
- // run
- new EstimateCardinality().run(createJobConf(),"/data/event","/output/daily","/output/summary",30);
-
- // verify
- checkIntermediateFolderCount(new Path("/output/daily"), 31);
- checkOutputFolderCount(new Path("/output/summary"), 1);
- Assert.assertEquals(loadMemberCount(new Path("/output/summary"),"20130331").longValue(),10L);
- }
-
- private void openDayForEvent(int year, int month, int day) throws IOException
- {
- System.out.println(String.format("start day: %04d %02d %02d",year,month,day));
- _eventWriter.open(year, month, day);
- }
-
- private void closeDayForEvent() throws IOException
- {
- _eventWriter.close();
- }
-
- private void storeIds(long... ids) throws IOException
- {
- for (long id : ids)
- {
- storeId(id);
- }
- }
-
- private void storeId(long id) throws IOException
- {
- _record.put("id", id);
- System.out.println("record: " + _record.toString());
- _eventWriter.append(_record);
- }
-
- private void checkIdCount(HashMap<Long,Integer> counts, long id, long count)
- {
- Assert.assertTrue(counts.containsKey(id));
- Assert.assertEquals(counts.get(id).intValue(), count);
- }
-
- private void checkSize(HashMap<Long,Integer> counts, int expectedSize)
- {
- if (counts.size() != expectedSize)
- {
- StringBuilder sb = new StringBuilder("Expected count " + expectedSize + " does not match actual " + counts.size() + ", contents:\n");
- List<Long> keys = new ArrayList<Long>(counts.keySet());
- Collections.sort(keys);
- for (Long k : keys)
- {
- sb.append(k.toString() + " => " + counts.get(k).toString() + "\n");
- }
- Assert.fail(sb.toString());
- }
- }
-
- private void checkOutputFolderCount(Path path, int expectedCount) throws IOException
- {
- Assert.assertEquals(countOutputFolders(path),expectedCount,"Found: " + listOutputFolders(path));
- }
-
- private void checkIntermediateFolderCount(Path path, int expectedCount) throws IOException
- {
- Assert.assertEquals(countIntermediateFolders(path),expectedCount,"Found: " + listIntermediateFolders(path));
- }
-
- private int countIntermediateFolders(Path path) throws IOException
- {
- FileSystem fs = getFileSystem();
- return fs.globStatus(new Path(path,"*/*/*"),PathUtils.nonHiddenPathFilter).length;
- }
-
- private int countOutputFolders(Path path) throws IOException
- {
- FileSystem fs = getFileSystem();
- return fs.listStatus(path,PathUtils.nonHiddenPathFilter).length;
- }
-
- private String listOutputFolders(Path path) throws IOException
- {
- StringBuilder sb = new StringBuilder();
- for (FileStatus stat : getFileSystem().listStatus(path,PathUtils.nonHiddenPathFilter))
- {
- sb.append(stat.getPath().getName());
- sb.append(",");
- }
- return sb.toString();
- }
-
- private String listIntermediateFolders(Path path) throws IOException
- {
- StringBuilder sb = new StringBuilder();
- for (FileStatus stat : getFileSystem().globStatus(new Path(path,"*/*/*"),PathUtils.nonHiddenPathFilter))
- {
- sb.append(stat.getPath().getName());
- sb.append(",");
- }
- return sb.toString();
- }
-
- private Long loadMemberCount(Path path, String timestamp) throws IOException
- {
- FileSystem fs = getFileSystem();
- Assert.assertTrue(fs.exists(new Path(path, timestamp)));
- for (FileStatus stat : fs.globStatus(new Path(path,timestamp + "/*.avro")))
- {
- _log.info(String.format("found: %s (%d bytes)",stat.getPath(),stat.getLen()));
- FSDataInputStream is = fs.open(stat.getPath());
- DatumReader <GenericRecord> reader = new GenericDatumReader<GenericRecord>();
- DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(is, reader);
-
- try
- {
- GenericRecord r = dataFileStream.next();
- Long count = (Long)((GenericRecord)r.get("value")).get("count");
- Assert.assertNotNull(count);
- System.out.println("found count: " + count);
- return count;
- }
- finally
- {
- dataFileStream.close();
- }
- }
- throw new RuntimeException("found no data");
- }
-
- private HashMap<Long,Integer> loadOutputCounts(Path path, String timestamp) throws IOException
- {
- HashMap<Long,Integer> counts = new HashMap<Long,Integer>();
- FileSystem fs = getFileSystem();
- Assert.assertTrue(fs.exists(new Path(path, timestamp)));
- for (FileStatus stat : fs.globStatus(new Path(path,timestamp + "/*.avro")))
- {
- _log.info(String.format("found: %s (%d bytes)",stat.getPath(),stat.getLen()));
- FSDataInputStream is = fs.open(stat.getPath());
- DatumReader <GenericRecord> reader = new GenericDatumReader<GenericRecord>();
- DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(is, reader);
-
- try
- {
- while (dataFileStream.hasNext())
- {
- GenericRecord r = dataFileStream.next();
- _log.info("found: " + r.toString());
- Long memberId = (Long)((GenericRecord)r.get("key")).get("member_id");
- Assert.assertNotNull(memberId);
- Integer count = (Integer)((GenericRecord)r.get("value")).get("count");
- Assert.assertNotNull(count);
- Assert.assertFalse(counts.containsKey(memberId));
- counts.put(memberId, count);
- }
- }
- finally
- {
- dataFileStream.close();
- }
- }
- return counts;
- }
-}