You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ja...@apache.org on 2014/05/18 21:59:49 UTC
[11/14] DATAFU-44: Migrate Hourglass to Gradle
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java
deleted file mode 100644
index a3fc77d..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionPreservingExecutionPlanner.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeSet;
-import java.util.Map.Entry;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-import datafu.hourglass.fs.DatePath;
-import datafu.hourglass.fs.PathUtils;
-
-/**
- * Execution planner used by {@link AbstractPartitionPreservingIncrementalJob} and its derived classes.
- * This creates a plan to process partitioned input data and produce partitioned output data.
- *
- * <p>
- * To use this class, the input and output paths must be specified. In addition the desired input date
- * range can be specified through several methods. Then {@link #createPlan()} can be called and the
- * execution plan will be created. The inputs to process will be available from {@link #getInputsToProcess()},
- * the number of reducers to use will be available from {@link #getNumReducers()}, and the input schemas
- * will be available from {@link #getInputSchemas()}.
- * </p>
- *
- * <p>
- * Configuration properties are used to configure a {@link ReduceEstimator} instance. This is used to
- * calculate how many reducers should be used.
- * The number of reducers to use is based on the input data size and the
- * <em>num.reducers.bytes.per.reducer</em> property.
- * Check {@link ReduceEstimator} for more details on how the properties are used.
- * </p>
- *
- * @author "Matthew Hayes"
- *
- */
-public class PartitionPreservingExecutionPlanner extends ExecutionPlanner
-{
- private final Logger _log = Logger.getLogger(PartitionPreservingExecutionPlanner.class);
-
- private SortedMap<Date,DatePath> _outputPathsByDate;
- private Map<String,String> _latestInputByPath = new HashMap<String,String>();
- private List<DatePath> _inputsToProcess = new ArrayList<DatePath>();
- private List<Schema> _inputSchemas = new ArrayList<Schema>();
- private Map<String,Schema> _inputSchemasByPath = new HashMap<String,Schema>();
- private boolean _needAnotherPass;
- private int _numReducers;
- private boolean _planExists;
-
- /**
- * Initializes the execution planner.
- *
- * @param fs file system
- * @param props configuration properties
- */
- public PartitionPreservingExecutionPlanner(FileSystem fs, Properties props)
- {
- super(fs,props);
- }
-
- /**
- * Create the execution plan.
- *
- * @throws IOException
- */
- public void createPlan() throws IOException
- {
- if (_planExists) throw new RuntimeException("Plan already exists");
- _planExists = true;
- loadInputData();
- loadOutputData();
- determineAvailableInputDates();
- determineDateRange();
- determineInputsToProcess();
- determineInputSchemas();
- determineNumReducers();
- }
-
- /**
- * Get the number of reducers to use based on the input data size.
- * Must call {@link #createPlan()} first.
- *
- * @return number of reducers to use
- */
- public int getNumReducers()
- {
- checkPlanExists();
- return _numReducers;
- }
-
- /**
- * Gets the input schemas. Because multiple inputs are allowed, there may be multiple schemas.
- * Must call {@link #createPlan()} first.
- *
- * @return input schemas
- */
- public List<Schema> getInputSchemas()
- {
- checkPlanExists();
- return _inputSchemas;
- }
-
- /**
- * Gets a map from input path to schema. Because multiple inputs are allowed, there may be multiple schemas.
- * Must call {@link #createPlan()} first.
- *
- * @return map from path to input schema
- */
- public Map<String,Schema> getInputSchemasByPath()
- {
- checkPlanExists();
- return _inputSchemasByPath;
- }
-
- /**
- * Gets whether another pass will be required. Because there may be a limit on the number of inputs processed
- * in a single run, multiple runs may be required to process all data in the desired date range.
- * Must call {@link #createPlan()} first.
- *
- * @return true if another pass is required
- */
- public boolean getNeedsAnotherPass()
- {
- checkPlanExists();
- return _needAnotherPass;
- }
-
- /**
- * Gets the inputs which are to be processed.
- * Must call {@link #createPlan()} first.
- *
- * @return inputs to process
- */
- public List<DatePath> getInputsToProcess()
- {
- checkPlanExists();
- return _inputsToProcess;
- }
-
- /**
- * Gets the input dates which are to be processed.
- * Must call {@link #createPlan()} first.
- *
- * @return dates to process
- */
- public List<Date> getDatesToProcess()
- {
- checkPlanExists();
- Set<Date> dates = new TreeSet<Date>();
- for (DatePath dp : _inputsToProcess)
- {
- dates.add(dp.getDate());
- }
- return new ArrayList<Date>(dates);
- }
-
- /**
- * Determines the number of reducers to use based on the input data size.
- * The number of reducers to use is based on the input data size and the
- * <em>num.reducers.bytes.per.reducer</em> property. See {@link ReduceEstimator}
- * for details on reducer estimation.
- *
- * @throws IOException
- */
- private void determineNumReducers() throws IOException
- {
- ReduceEstimator estimator = new ReduceEstimator(getFileSystem(),getProps());
- List<String> inputPaths = new ArrayList<String>();
- for (DatePath input : getInputsToProcess())
- {
- inputPaths.add(input.getPath().toString());
- estimator.addInputPath("input",input.getPath());
- }
- _numReducers = estimator.getNumReducers();
- }
-
- /**
- * Determines the input schemas. There may be multiple input schemas because multiple inputs are allowed.
- * The latest available inputs are used to determine the schema, the assumption being that schemas are
- * backwards-compatible.
- *
- * @throws IOException
- */
- private void determineInputSchemas() throws IOException
- {
- if (_latestInputByPath.size() > 0)
- {
- _log.info("Determining input schemas");
- for (Entry<String,String> entry : _latestInputByPath.entrySet())
- {
- String root = entry.getKey();
- String input = entry.getValue();
- _log.info("Loading schema for " + input);
- Schema schema = PathUtils.getSchemaFromPath(getFileSystem(),new Path(input));
- _inputSchemas.add(schema);
- _inputSchemasByPath.put(root, schema);
- }
- }
- }
-
- /**
- * Determines which input data should be processed. This checks the availability of input data within
- * the desired date range and also checks whether the output already exists. Only inputs with no
- * corresponding output are processed.
- */
- private void determineInputsToProcess()
- {
- _log.info("Determining inputs to process");
- _latestInputByPath.clear();
- int newDataCount = 0;
- Calendar cal = Calendar.getInstance(PathUtils.timeZone);
- for (Date currentDate=getDateRange().getBeginDate(); currentDate.compareTo(getDateRange().getEndDate()) <= 0; )
- {
- if (!_outputPathsByDate.containsKey(currentDate))
- {
- List<DatePath> inputs = getAvailableInputsByDate().get(currentDate);
- if (inputs != null)
- {
- if (getMaxToProcess() != null && newDataCount >= getMaxToProcess())
- {
- // too much data to process in a single run, will require another pass
- _needAnotherPass = true;
- break;
- }
-
- for (DatePath input : inputs)
- {
- _log.info(String.format("Input: %s",input.getPath()));
- _inputsToProcess.add(input);
-
- Path root = PathUtils.getNestedPathRoot(input.getPath());
- _latestInputByPath.put(root.toString(), input.getPath().toString());
- }
-
- newDataCount++;
- }
- else if (isFailOnMissing())
- {
- throw new RuntimeException("missing input data for " + currentDate);
- }
- }
-
- cal.setTime(currentDate);
- cal.add(Calendar.DAY_OF_MONTH, 1);
- currentDate = cal.getTime();
- }
- }
-
- /**
- * Determines what output data already exists. Inputs will not be consumed if the output already exists.
- *
- * @throws IOException
- */
- private void loadOutputData() throws IOException
- {
- _log.info(String.format("Checking output data in " + getOutputPath()));
- _outputPathsByDate = getDailyData(getOutputPath());
- }
-
- /**
- * Throws an exception if the plan hasn't been created.
- */
- private void checkPlanExists()
- {
- if (!_planExists) throw new RuntimeException("Must call createPlan first");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionPreservingIncrementalJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionPreservingIncrementalJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionPreservingIncrementalJob.java
deleted file mode 100644
index 488fe7a..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/PartitionPreservingIncrementalJob.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.model.Mapper;
-
-/**
- * A concrete version of {@link AbstractPartitionPreservingIncrementalJob}.
- *
- * This provides an alternative to extending {@link AbstractPartitionPreservingIncrementalJob}.
- * Instead of extending this class and implementing the abstract methods, this concrete version
- * can be used instead. Getters and setters have been provided for the abstract methods.
- *
- * @author "Matthew Hayes"
- *
- */
-public class PartitionPreservingIncrementalJob extends AbstractPartitionPreservingIncrementalJob
-{
- private Mapper<GenericRecord,GenericRecord,GenericRecord> _mapper;
- private Accumulator<GenericRecord,GenericRecord> _combiner;
- private Accumulator<GenericRecord,GenericRecord> _reducer;
- private Schema _keySchema;
- private Schema _intermediateValueSchema;
- private Schema _outputValueSchema;
- private Setup _setup;
-
- /**
- * Initializes the job. The job name is derived from the name of a provided class.
- *
- * @param cls class to base job name on
- * @throws IOException
- */
- public PartitionPreservingIncrementalJob(@SuppressWarnings("rawtypes") Class cls) throws IOException
- {
- setName(cls.getName());
- }
-
- @Override
- public Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper()
- {
- return _mapper;
- }
-
- @Override
- public Accumulator<GenericRecord,GenericRecord> getCombinerAccumulator()
- {
- return _combiner;
- }
-
- @Override
- public Accumulator<GenericRecord,GenericRecord> getReducerAccumulator()
- {
- return _reducer;
- }
-
- @Override
- protected Schema getKeySchema()
- {
- return _keySchema;
- }
-
- @Override
- protected Schema getIntermediateValueSchema()
- {
- return _intermediateValueSchema;
- }
-
- @Override
- protected Schema getOutputValueSchema()
- {
- return _outputValueSchema;
- }
-
- /**
- * Set the mapper.
- *
- * @param mapper
- */
- public void setMapper(Mapper<GenericRecord,GenericRecord,GenericRecord> mapper)
- {
- this._mapper = mapper;
- }
-
- /**
- * Set the accumulator for the combiner
- *
- * @param combiner accumulator for the combiner
- */
- public void setCombinerAccumulator(Accumulator<GenericRecord,GenericRecord> combiner)
- {
- this._combiner = combiner;
- }
-
- /**
- * Set the accumulator for the reducer.
- *
- * @param reducer accumulator for the reducer
- */
- public void setReducerAccumulator(Accumulator<GenericRecord,GenericRecord> reducer)
- {
- this._reducer = reducer;
- }
-
- /**
- * Sets the Avro schema for the key.
- * <p>
- * This is also used as the key for the map output.
- *
- * @param keySchema key schema
- */
- public void setKeySchema(Schema keySchema)
- {
- this._keySchema = keySchema;
- }
-
- /**
- * Sets the Avro schema for the intermediate value.
- * <p>
- * This is also used for the value for the map output.
- *
- * @param intermediateValueSchema intermediate value schema
- */
- public void setIntermediateValueSchema(Schema intermediateValueSchema)
- {
- this._intermediateValueSchema = intermediateValueSchema;
- }
-
- /**
- * Sets the Avro schema for the output data.
- *
- * @param outputValueSchema output value schema
- */
- public void setOutputValueSchema(Schema outputValueSchema)
- {
- this._outputValueSchema = outputValueSchema;
- }
-
- /**
- * Set callback to provide custom configuration before job begins execution.
- *
- * @param setup object with callback method
- */
- public void setOnSetup(Setup setup)
- {
- _setup = setup;
- }
-
- @Override
- public void config(Configuration conf)
- {
- super.config(conf);
- if (_setup != null)
- {
- _setup.setup(conf);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/ReduceEstimator.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/ReduceEstimator.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/ReduceEstimator.java
deleted file mode 100644
index 9f2f068..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/ReduceEstimator.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-import datafu.hourglass.fs.PathUtils;
-
-/**
- * Estimates the number of reducers needed based on input size.
- *
- * <p>
- * This sums the size of the inputs and uses bytes-per-reducer
- * settings to compute the number of reducers. By default,
- * the bytes-per-reducer is 256 MB. This means that if the
- * total input size is 1 GB, the total number of reducers
- * computed will be 4.
- * </p>
- *
- * <p>
- * The bytes-per-reducer can be configured through properties
- * provided in the constructor. The default bytes-per-reducer
- * can be overriden by setting <em>num.reducers.bytes.per.reducer</em>.
- * For example, if 536870912 (512 MB) is used for this setting,
- * then 2 reducers would be used for 1 GB.
- * </p>
- *
- * <p>
- * The bytes-per-reducer can also be configured separately for
- * different types of inputs. Inputs can be identified by a tag.
- * For example, if an input is tagged with <em>mydata</em>, then
- * the reducers for this input data can be configured with
- * <em>num.reducers.mydata.bytes.per.reducer</em>.
- * </p>
- *
- * @author "Matthew Hayes"
- *
- */
-public class ReduceEstimator
-{
- private final Logger _log = Logger.getLogger(ReduceEstimator.class);
-
- private final Set<Path> inputPaths = new HashSet<Path>();
- private final Map<Path,String> pathToTag = new HashMap<Path,String>();
- private final Map<String, Long> tagToBytesPerReducer = new HashMap<String,Long>();
- private final FileSystem fs;
-
- private final static String DEFAULT = "default";
- private final static Long DEFAULT_BYTES_PER_REDUCER = 256L*1024L*1024L; // 256 MB
-
- public ReduceEstimator(FileSystem fs, Properties props)
- {
- this.fs = fs;
-
- if (props != null)
- {
- for (Object o : props.keySet())
- {
- String key = (String)o;
- if (key.startsWith("num.reducers."))
- {
- if (key.equals("num.reducers.bytes.per.reducer"))
- {
- tagToBytesPerReducer.put(DEFAULT, Long.parseLong(props.getProperty(key)));
- }
- else
- {
- Pattern p = Pattern.compile("num\\.reducers\\.([a-z]+)\\.bytes\\.per\\.reducer");
- Matcher m = p.matcher(key);
- if (m.matches())
- {
- String tag = m.group(1);
- tagToBytesPerReducer.put(tag, Long.parseLong(props.getProperty(key)));
- }
- else
- {
- throw new RuntimeException("Property not recognized: " + key);
- }
- }
- }
- }
- }
-
- if (!tagToBytesPerReducer.containsKey(DEFAULT))
- {
- long defaultValue = DEFAULT_BYTES_PER_REDUCER;
- _log.info(String.format("No default bytes per reducer set, using %.2f MB",toMB(defaultValue)));
- tagToBytesPerReducer.put(DEFAULT, defaultValue);
- }
- }
-
- public void addInputPath(Path input)
- {
- addInputPath(DEFAULT,input);
- }
-
- public void addInputPath(String tag, Path input)
- {
- if (!inputPaths.contains(input))
- {
- inputPaths.add(input);
- pathToTag.put(input, tag);
- }
- else
- {
- throw new RuntimeException("Already added input: " + input);
- }
- }
-
- public int getNumReducers() throws IOException
- {
- Map<String,Long> bytesPerTag = getTagToInputBytes();
-
- double numReducers = 0.0;
- for (String tag : bytesPerTag.keySet())
- {
- long bytes = bytesPerTag.get(tag);
- _log.info(String.format("Found %d bytes (%.2f GB) for inputs tagged with '%s'",bytes,toGB(bytes),tag));
- Long bytesPerReducer = tagToBytesPerReducer.get(tag);
- if (bytesPerReducer == null)
- {
- bytesPerReducer = tagToBytesPerReducer.get(DEFAULT);
-
- if (bytesPerReducer == null)
- {
- throw new RuntimeException("Could not determine bytes per reducer");
- }
-
- _log.info(String.format("No configured bytes per reducer for '%s', using default value of %.2f MB",tag,toMB(bytesPerReducer)));
- }
- else
- {
- _log.info(String.format("Using configured bytes per reducer for '%s' of %.2f MB",tag,toMB(bytesPerReducer)));
- }
-
- double partialNumReducers = bytes/(double)bytesPerReducer;
-
- _log.info(String.format("Reducers computed for '%s' is %.2f",tag,partialNumReducers));
-
- numReducers += bytes/(double)bytesPerReducer;
- }
-
- int finalNumReducers = Math.max(1, (int)Math.ceil(numReducers));
-
- _log.info(String.format("Final computed reducers is: %d",finalNumReducers));
-
- return finalNumReducers;
- }
-
- private static double toGB(long bytes)
- {
- return bytes/(1024.0*1024.0*1024.0);
- }
-
- private static double toMB(long bytes)
- {
- return bytes/(1024.0*1024.0);
- }
-
- /**
- * Gets the total number of bytes per tag.
- *
- * @return Map from tag to total bytes
- * @throws IOException
- */
- private Map<String,Long> getTagToInputBytes() throws IOException
- {
- Map<String,Long> result = new HashMap<String,Long>();
- for (Path input : inputPaths)
- {
- long bytes = PathUtils.countBytes(fs, input);
- String tag = pathToTag.get(input);
- if (tag == null) throw new RuntimeException("Could not find tag for input: " + input);
- Long current = result.get(tag);
- if (current == null) current = 0L;
- current += bytes;
- result.put(tag, current);
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/Setup.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/Setup.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/Setup.java
deleted file mode 100644
index 96ee07d..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/Setup.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Used as a callback by {@link PartitionCollapsingIncrementalJob} and {@link PartitionPreservingIncrementalJob}
- * to provide configuration settings for the Hadoop job.
- *
- * @author "Matthew Hayes"
- *
- */
-public interface Setup
-{
- /**
- * Set custom configuration.
- *
- * @param conf configuration
- */
- void setup(Configuration conf);
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/StagedOutputJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/StagedOutputJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/StagedOutputJob.java
deleted file mode 100644
index 0445322..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/StagedOutputJob.java
+++ /dev/null
@@ -1,662 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.TaskCompletionEvent;
-import org.apache.hadoop.mapred.TaskReport;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.log4j.Logger;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.permission.FsPermission;
-
-/**
- * A derivation of {@link Job} that stages its output in another location and only
- * moves it to the final destination if the job completes successfully.
- * It also outputs a counters file to the file system that contains counters fetched from Hadoop
- * and other task statistics.
- */
-public class StagedOutputJob extends Job implements Callable<Boolean>
-{
- private final String _stagingPrefix;
- private final Logger _log;
- private Path _countersPath;
- private Path _countersParentPath;
- private boolean _writeCounters = false;
-
- /**
- * Creates a job which using a temporary staging location for the output data.
- * The data is only copied to the final output directory on successful completion
- * of the job. This prevents existing output data from being overwritten unless
- * the job completes successfully.
- *
- * @param conf configuration
- * @param jobName job name
- * @param inputPaths input paths
- * @param stagingLocation where to stage output temporarily
- * @param outputPath output path
- * @param log logger
- * @return job
- */
- public static StagedOutputJob createStagedJob(
- Configuration conf,
- String jobName,
- List<String> inputPaths,
- String stagingLocation,
- String outputPath,
- final Logger log)
- {
- final StagedOutputJob retVal;
- try
- {
- retVal = new StagedOutputJob(conf, stagingLocation, log);
- retVal.setJobName(jobName);
- retVal.setJarByClass(getCallersClass());
- FileInputFormat.setInputPathFilter(retVal, HiddenFilePathFilter.class);
- }
- catch (IOException e)
- {
- log.error("IOException when making a job", e);
- throw new RuntimeException(e);
- }
-
- if (inputPaths != null)
- {
- try
- {
- FileInputFormat.setInputPaths(
- retVal,
- StringUtils.join(inputPaths.iterator(),",")
- );
- }
- catch (IOException e)
- {
- log.error("Unable to set up input paths.", e);
- throw new RuntimeException(e);
- }
- }
-
- FileOutputFormat.setOutputPath(retVal, new Path(outputPath));
-
- return retVal;
- }
-
- /**
- * Initializes the job.
- *
- * @param conf configuration
- * @param stagingPrefix where to stage output temporarily
- * @param log logger
- * @throws IOException
- */
- public StagedOutputJob(Configuration conf, String stagingPrefix, Logger log) throws IOException
- {
- super(conf);
- this._stagingPrefix = stagingPrefix;
- this._log = log;
- }
-
- /**
- * Gets path to store the counters. If this is not set then by default the counters will be
- * stored in the output directory.
- *
- * @return path parent path for counters
- */
- public Path getCountersParentPath()
- {
- return _countersParentPath;
- }
-
- /**
- * Sets path to store the counters. If this is not set then by default the counters will be
- * stored in the output directory.
- *
- * @param path parent path for counters
- */
- public void setCountersParentPath(Path path)
- {
- _countersParentPath = path;
- }
-
- /**
- * Path to written counters.
- *
- * @return counters path
- */
- public Path getCountersPath()
- {
- return _countersPath;
- }
-
- /**
- * Get whether counters should be written.
- *
- * @return true if counters should be written
- */
- public boolean getWriteCounters()
- {
- return _writeCounters;
- }
-
- /**
- * Sets whether counters should be written.
- *
- * @param writeCounters true if counters should be written
- */
- public void setWriteCounters(boolean writeCounters)
- {
- this._writeCounters = writeCounters;
- }
-
- /**
- * Run the job.
- */
- @Override
- public Boolean call() throws Exception
- {
- try
- {
- boolean success = false;
- success = waitForCompletion(false);
- String jobId = "?";
-
- if (getJobID() != null)
- {
- jobId = String.format("job_%s_%d",getJobID().getJtIdentifier(), getJobID().getId());
- }
-
- if (success)
- {
- _log.info(String.format("Job %s with ID %s succeeded! Tracking URL: %s", getJobName(), jobId, this.getTrackingURL()));
- }
- else
- {
- _log.error(String.format("Job %s with ID %s failed! Tracking URL: %s", getJobName(), jobId, this.getTrackingURL()));
- }
-
- return success;
- }
- catch (Exception e)
- {
- _log.error("Exception: " + e.toString());
- throw new Exception(e);
- }
- }
-
- /**
- * Run the job and wait for it to complete. Output will be temporarily stored under the staging path.
- * If the job is successful it will be moved to the final location.
- */
- @Override
- public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException
- {
- final Path actualOutputPath = FileOutputFormat.getOutputPath(this);
- final Path stagedPath = new Path(String.format("%s/%s/staged", _stagingPrefix, System.currentTimeMillis()));
-
- FileOutputFormat.setOutputPath(
- this,
- stagedPath
- );
-
- final Thread hook = new Thread(new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- killJob();
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- }
- });
-
- Runtime.getRuntime().addShutdownHook(hook);
-
- final boolean retVal = super.waitForCompletion(verbose);
- Runtime.getRuntime().removeShutdownHook(hook);
-
- if (retVal)
- {
- FileSystem fs = actualOutputPath.getFileSystem(getConfiguration());
-
- fs.mkdirs(actualOutputPath);
-
- _log.info(String.format("Deleting data at old path[%s]", actualOutputPath));
- fs.delete(actualOutputPath, true);
-
- _log.info(String.format("Moving from staged path[%s] to final resting place[%s]", stagedPath, actualOutputPath));
- boolean renamed = fs.rename(stagedPath, actualOutputPath);
-
- if (renamed && _writeCounters)
- {
- writeCounters(fs);
- }
-
- return renamed;
- }
- else
- {
- FileSystem fs = actualOutputPath.getFileSystem(getConfiguration());
- _log.info(String.format("Job failed, deleting staged path[%s]", stagedPath));
- try
- {
- fs.delete(stagedPath, true);
- }
- catch (IOException e)
- {
- }
- }
-
- _log.warn("retVal was false for some reason...");
- return retVal;
- }
-
- /**
- * Gets the class for the caller.
- *
- * @return caller class
- */
- private static Class<?> getCallersClass()
- {
- StackTraceElement[] stack = Thread.currentThread().getStackTrace();
- boolean foundSelf = false;
- for (StackTraceElement element : stack)
- {
- if (foundSelf &&
- !StagedOutputJob.class.getName().equals(element.getClassName()))
- {
- try
- {
- return Class.forName(element.getClassName());
- }
- catch (ClassNotFoundException e)
- {
- throw new RuntimeException(e);
- }
- }
- else if (StagedOutputJob.class.getName().equals(element.getClassName())
- && "getCallersClass".equals(element.getMethodName()))
- {
- foundSelf = true;
- }
- }
- return StagedOutputJob.class;
- }
-
- /**
- * Writes Hadoop counters and other task statistics to a file in the file system.
- *
- * @param fs
- * @throws IOException
- */
- private void writeCounters(final FileSystem fs) throws IOException
- {
- final Path actualOutputPath = FileOutputFormat.getOutputPath(this);
-
- SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyyMMddHHmmss");
-
- String suffix = timestampFormat.format(new Date());
-
- if (_countersParentPath != null)
- {
- if (!fs.exists(_countersParentPath))
- {
- _log.info("Creating counter parent path " + _countersParentPath);
- fs.mkdirs(_countersParentPath, FsPermission.valueOf("-rwxrwxr-x"));
- }
- // make the name as unique as possible in this case because this may be a directory
- // where other counter files will be dropped
- _countersPath = new Path(_countersParentPath,".counters." + suffix);
- }
- else
- {
- _countersPath = new Path(actualOutputPath,".counters." + suffix);
- }
-
- _log.info(String.format("Writing counters to %s", _countersPath));
- FSDataOutputStream counterStream = fs.create(_countersPath);
- BufferedOutputStream buffer = new BufferedOutputStream(counterStream,256*1024);
- OutputStreamWriter writer = new OutputStreamWriter(buffer);
- for (String groupName : getCounters().getGroupNames())
- {
- for (Counter counter : getCounters().getGroup(groupName))
- {
- writeAndLog(writer,String.format("%s=%d",counter.getName(),counter.getValue()));
- }
- }
-
- JobID jobID = this.getJobID();
-
- org.apache.hadoop.mapred.JobID oldJobId = new org.apache.hadoop.mapred.JobID(jobID.getJtIdentifier(),jobID.getId());
-
- long minStart = Long.MAX_VALUE;
- long maxFinish = 0;
- long setupStart = Long.MAX_VALUE;
- long cleanupFinish = 0;
- DescriptiveStatistics mapStats = new DescriptiveStatistics();
- DescriptiveStatistics reduceStats = new DescriptiveStatistics();
- boolean success = true;
-
- JobClient jobClient = new JobClient(this.conf);
-
- Map<String,String> taskIdToType = new HashMap<String,String>();
-
- TaskReport[] setupReports = jobClient.getSetupTaskReports(oldJobId);
- if (setupReports.length > 0)
- {
- _log.info("Processing setup reports");
- for (TaskReport report : jobClient.getSetupTaskReports(oldJobId))
- {
- taskIdToType.put(report.getTaskID().toString(),"SETUP");
- if (report.getStartTime() == 0)
- {
- _log.warn("Skipping report with zero start time");
- continue;
- }
- setupStart = Math.min(setupStart, report.getStartTime());
- }
- }
- else
- {
- _log.error("No setup reports");
- }
-
- TaskReport[] mapReports = jobClient.getMapTaskReports(oldJobId);
- if (mapReports.length > 0)
- {
- _log.info("Processing map reports");
- for (TaskReport report : mapReports)
- {
- taskIdToType.put(report.getTaskID().toString(),"MAP");
- if (report.getFinishTime() == 0 || report.getStartTime() == 0)
- {
- _log.warn("Skipping report with zero start or finish time");
- continue;
- }
- minStart = Math.min(minStart, report.getStartTime());
- mapStats.addValue(report.getFinishTime() - report.getStartTime());
- }
- }
- else
- {
- _log.error("No map reports");
- }
-
- TaskReport[] reduceReports = jobClient.getReduceTaskReports(oldJobId);
- if (reduceReports.length > 0)
- {
- _log.info("Processing reduce reports");
- for (TaskReport report : reduceReports)
- {
- taskIdToType.put(report.getTaskID().toString(),"REDUCE");
- if (report.getFinishTime() == 0 || report.getStartTime() == 0)
- {
- _log.warn("Skipping report with zero start or finish time");
- continue;
- }
- maxFinish = Math.max(maxFinish, report.getFinishTime());
- reduceStats.addValue(report.getFinishTime() - report.getStartTime());
- }
- }
- else
- {
- _log.error("No reduce reports");
- }
-
- TaskReport[] cleanupReports = jobClient.getCleanupTaskReports(oldJobId);
- if (cleanupReports.length > 0)
- {
- _log.info("Processing cleanup reports");
- for (TaskReport report : cleanupReports)
- {
- taskIdToType.put(report.getTaskID().toString(),"CLEANUP");
- if (report.getFinishTime() == 0)
- {
- _log.warn("Skipping report with finish time of zero");
- continue;
- }
- cleanupFinish = Math.max(cleanupFinish, report.getFinishTime());
- }
- }
- else
- {
- _log.error("No cleanup reports");
- }
-
- if (minStart == Long.MAX_VALUE)
- {
- _log.error("Could not determine map-reduce start time");
- success = false;
- }
- if (maxFinish == 0)
- {
- _log.error("Could not determine map-reduce finish time");
- success = false;
- }
-
- if (setupStart == Long.MAX_VALUE)
- {
- _log.error("Could not determine setup start time");
- success = false;
- }
- if (cleanupFinish == 0)
- {
- _log.error("Could not determine cleanup finish time");
- success = false;
- }
-
- // Collect statistics on successful/failed/killed task attempts, categorized by setup/map/reduce/cleanup.
- // Unfortunately the job client doesn't have an easier way to get these statistics.
- Map<String,Integer> attemptStats = new HashMap<String,Integer>();
- _log.info("Processing task attempts");
- for (TaskCompletionEvent event : getTaskCompletionEvents(jobClient,oldJobId))
- {
- String type = taskIdToType.get(event.getTaskAttemptId().getTaskID().toString());
- String status = event.getTaskStatus().toString();
-
- String key = String.format("%s_%s_ATTEMPTS",status,type);
- if (!attemptStats.containsKey(key))
- {
- attemptStats.put(key, 0);
- }
- attemptStats.put(key, attemptStats.get(key) + 1);
- }
-
- if (success)
- {
- writeAndLog(writer,String.format("SETUP_START_TIME_MS=%d",setupStart));
- writeAndLog(writer,String.format("CLEANUP_FINISH_TIME_MS=%d",cleanupFinish));
- writeAndLog(writer,String.format("COMPLETE_WALL_CLOCK_TIME_MS=%d",cleanupFinish - setupStart));
-
- writeAndLog(writer,String.format("MAP_REDUCE_START_TIME_MS=%d",minStart));
- writeAndLog(writer,String.format("MAP_REDUCE_FINISH_TIME_MS=%d",maxFinish));
- writeAndLog(writer,String.format("MAP_REDUCE_WALL_CLOCK_TIME_MS=%d",maxFinish - minStart));
-
- writeAndLog(writer,String.format("MAP_TOTAL_TASKS=%d",(long)mapStats.getN()));
- writeAndLog(writer,String.format("MAP_MAX_TIME_MS=%d",(long)mapStats.getMax()));
- writeAndLog(writer,String.format("MAP_MIN_TIME_MS=%d",(long)mapStats.getMin()));
- writeAndLog(writer,String.format("MAP_AVG_TIME_MS=%d",(long)mapStats.getMean()));
- writeAndLog(writer,String.format("MAP_STD_TIME_MS=%d",(long)mapStats.getStandardDeviation()));
- writeAndLog(writer,String.format("MAP_SUM_TIME_MS=%d",(long)mapStats.getSum()));
-
- writeAndLog(writer,String.format("REDUCE_TOTAL_TASKS=%d",(long)reduceStats.getN()));
- writeAndLog(writer,String.format("REDUCE_MAX_TIME_MS=%d",(long)reduceStats.getMax()));
- writeAndLog(writer,String.format("REDUCE_MIN_TIME_MS=%d",(long)reduceStats.getMin()));
- writeAndLog(writer,String.format("REDUCE_AVG_TIME_MS=%d",(long)reduceStats.getMean()));
- writeAndLog(writer,String.format("REDUCE_STD_TIME_MS=%d",(long)reduceStats.getStandardDeviation()));
- writeAndLog(writer,String.format("REDUCE_SUM_TIME_MS=%d",(long)reduceStats.getSum()));
-
- writeAndLog(writer,String.format("MAP_REDUCE_SUM_TIME_MS=%d",(long)mapStats.getSum() + (long)reduceStats.getSum()));
-
- for (Map.Entry<String, Integer> attemptStat : attemptStats.entrySet())
- {
- writeAndLog(writer,String.format("%s=%d",attemptStat.getKey(),attemptStat.getValue()));
- }
- }
-
- writer.close();
- buffer.close();
- counterStream.close();
- }
-
- /**
- * Get all task completion events for a particular job.
- *
- * @param jobClient job client
- * @param jobId job ID
- * @return task completion events
- * @throws IOException
- */
- private List<TaskCompletionEvent> getTaskCompletionEvents(JobClient jobClient, org.apache.hadoop.mapred.JobID jobId) throws IOException
- {
- List<TaskCompletionEvent> events = new ArrayList<TaskCompletionEvent>();
-
- // Tries to use reflection to get access to the getTaskCompletionEvents method from the private jobSubmitClient field.
- // This method has a parameter for the size, which defaults to 10 for the top level methods and can therefore be extremely slow
- // if the goal is to get all events.
-
- Method getTaskCompletionEventsMethod = null;
- Object jobSubmitClient = null;
-
- try
- {
- Field f = JobClient.class.getDeclaredField("jobSubmitClient");
- f.setAccessible(true);
- jobSubmitClient = f.get(jobClient);
-
- if (jobSubmitClient != null)
- {
- getTaskCompletionEventsMethod = jobSubmitClient.getClass().getDeclaredMethod("getTaskCompletionEvents", org.apache.hadoop.mapred.JobID.class,int.class,int.class);
- getTaskCompletionEventsMethod.setAccessible(true);
- }
- }
- catch (NoSuchMethodException e)
- {
- }
- catch (SecurityException e)
- {
- }
- catch (NoSuchFieldException e)
- {
- }
- catch (IllegalArgumentException e)
- {
- }
- catch (IllegalAccessException e)
- {
- }
-
- if (getTaskCompletionEventsMethod != null)
- {
- _log.info("Will call getTaskCompletionEvents via reflection since it's faster");
- }
- else
- {
- _log.info("Will call getTaskCompletionEvents via the slow method");
- }
-
- int index = 0;
- while(true)
- {
- TaskCompletionEvent[] currentEvents;
- if (getTaskCompletionEventsMethod != null)
- {
- try
- {
- // grab events, 250 at a time, which is faster than the other method which defaults to 10 at a time (with no override ability)
- currentEvents = (TaskCompletionEvent[])getTaskCompletionEventsMethod.invoke(jobSubmitClient, jobId, index, 250);
- }
- catch (IllegalArgumentException e)
- {
- _log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
- getTaskCompletionEventsMethod = null;
- continue;
- }
- catch (IllegalAccessException e)
- {
- _log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
- getTaskCompletionEventsMethod = null;
- continue;
- }
- catch (InvocationTargetException e)
- {
- _log.error("Failed to call getTaskCompletionEventsMethod via reflection, switching to slow direct method", e);
- getTaskCompletionEventsMethod = null;
- continue;
- }
- }
- else
- {
- currentEvents = this.getTaskCompletionEvents(index);
- }
- if (currentEvents.length == 0) break;
- for (TaskCompletionEvent event : currentEvents)
- {
- events.add(event);
- }
- index += currentEvents.length;
- }
-
- return events;
- }
-
- private void writeAndLog(OutputStreamWriter writer, String line) throws IOException
- {
- writer.append(line);
- writer.append("\n");
- _log.info(line);
- }
-
- static class HiddenFilePathFilter implements PathFilter
- {
- @Override
- public boolean accept(Path path)
- {
- String name = path.getName();
- return ! name.startsWith("_") &&
- ! name.startsWith(".");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/TimeBasedJob.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/TimeBasedJob.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/TimeBasedJob.java
deleted file mode 100644
index f8708c8..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/TimeBasedJob.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.text.ParseException;
-import java.util.Date;
-import java.util.Properties;
-
-import datafu.hourglass.fs.PathUtils;
-
-/**
- * Base class for Hadoop jobs that consume time-partitioned data.
- *
- * <p>
- * This class has the same configuration and methods as {@link AbstractJob}.
- * In addition it also recognizes the following properties:
- * </p>
- *
- * <ul>
- * <li><em>num.days</em> - Number of consecutive days of input data to consume</li>
- * <li><em>days.ago</em> - Number of days to subtract off the end of the consumption window</li>
- * <li><em>start.date</em> - Start date for window in yyyyMMdd format</li>
- * <li><em>end.date</em> - End date for window in yyyyMMdd format</li>
- * </ul>
- *
- * <p>
- * Methods are available as well for setting these configuration parameters.
- * </p>
- *
- * @author "Matthew Hayes"
- *
- */
-public abstract class TimeBasedJob extends AbstractJob
-{
- private Integer _numDays;
- private Integer _daysAgo;
- private Date _startDate;
- private Date _endDate;
-
- /**
- * Initializes the job.
- */
- public TimeBasedJob()
- {
- }
-
- /**
- * Initializes the job with a job name and properties.
- *
- * @param name Job name
- * @param props Configuration properties
- */
- public TimeBasedJob(String name, Properties props)
- {
- super(name,props);
- }
-
- @Override
- public void setProperties(Properties props)
- {
- super.setProperties(props);
-
- if (getProperties().get("num.days") != null)
- {
- setNumDays(Integer.parseInt((String)getProperties().get("num.days")));
- }
-
- if (getProperties().get("days.ago") != null)
- {
- setDaysAgo(Integer.parseInt((String)getProperties().get("days.ago")));
- }
-
- if (getProperties().get("start.date") != null)
- {
- try
- {
- // start date treated as inclusive lower bound
- setStartDate(PathUtils.datedPathFormat.parse((String)getProperties().get("start.date")));
- }
- catch (ParseException e)
- {
- throw new IllegalArgumentException(e);
- }
- }
-
- if (getProperties().get("end.date") != null)
- {
- try
- {
- setEndDate(PathUtils.datedPathFormat.parse((String)getProperties().get("end.date")));
- }
- catch (ParseException e)
- {
- throw new IllegalArgumentException(e);
- }
- }
- }
-
- /**
- * Gets the number of consecutive days to process.
- *
- * @return number of days to process
- */
- public Integer getNumDays()
- {
- return _numDays;
- }
-
- /**
- * Sets the number of consecutive days to process.
- *
- * @param numDays number of days to process
- */
- public void setNumDays(Integer numDays)
- {
- this._numDays = numDays;
- }
-
- /**
- * Gets the number of days to subtract off the end of the consumption window.
- *
- * @return Days ago
- */
- public Integer getDaysAgo()
- {
- return _daysAgo;
- }
-
- /**
- * Sets the number of days to subtract off the end of the consumption window.
- *
- * @param daysAgo Days ago
- */
- public void setDaysAgo(Integer daysAgo)
- {
- this._daysAgo = daysAgo;
- }
-
- /**
- * Gets the start date.
- *
- * @return Start date
- */
- public Date getStartDate()
- {
- return _startDate;
- }
-
- /**
- * Sets the start date.
- *
- * @param startDate start date
- */
- public void setStartDate(Date startDate)
- {
- this._startDate = startDate;
- }
-
- /**
- * Gets the end date.
- *
- * @return end date
- */
- public Date getEndDate()
- {
- return _endDate;
- }
-
- /**
- * Sets the end date.
- *
- * @param endDate end date
- */
- public void setEndDate(Date endDate)
- {
- this._endDate = endDate;
- }
-
- @Override
- protected void validate()
- {
- super.validate();
-
- if (_daysAgo != null && _endDate != null)
- {
- throw new IllegalArgumentException("Cannot specify both end date and days ago");
- }
-
- if (_numDays != null && _startDate != null && _endDate != null)
- {
- throw new IllegalArgumentException("Cannot specify num days when both start date and end date are set");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/TimePartitioner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/TimePartitioner.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/TimePartitioner.java
deleted file mode 100644
index 3a9e917..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/TimePartitioner.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.jobs;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Partitioner;
-
-/**
- * A partitioner used by {@link AbstractPartitionPreservingIncrementalJob} to limit the number of named outputs
- * used by each reducer.
- *
- * <p>
- * The purpose of this partitioner is to prevent a proliferation of small files created by {@link AbstractPartitionPreservingIncrementalJob}.
- * This job writes multiple outputs. Each output corresponds to a day of input data. By default records will be distributed across all
- * the reducers. This means that if many input days are consumed, then each reducer will write many outputs. These outputs will typically
- * be small. The problem gets worse as more input data is consumed, as this will cause more reducers to be required.
- * </p>
- *
- * <p>
- * This partitioner solves the problem by limiting how many days of input data will be mapped to each reducer. At the extreme each day of
- * input data could be mapped to only one reducer. This is controlled through the configuration setting <em>incremental.reducers.per.input</em>,
- * which should be set in the Hadoop configuration. Input days are assigned to reducers in a round-robin fashion.
- * </p>
- *
- * @author "Matthew Hayes"
- *
- */
-public class TimePartitioner extends Partitioner<AvroKey<GenericRecord>,AvroValue<GenericRecord>> implements Configurable
-{
- public static String INPUT_TIMES = "incremental.input.times";
- public static String REDUCERS_PER_INPUT = "incremental.reducers.per.input";
- private static String REDUCE_TASKS = "mapred.reduce.tasks";
-
- private int numReducers;
- private Map<Long,List<Integer>> partitionMapping;
- private Configuration conf;
-
- @Override
- public Configuration getConf()
- {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf)
- {
- this.conf = conf;
-
- if (conf.get(REDUCE_TASKS) == null)
- {
- throw new RuntimeException(REDUCE_TASKS + " is required");
- }
-
- this.numReducers = Integer.parseInt(conf.get(REDUCE_TASKS));
-
- if (conf.get(REDUCERS_PER_INPUT) == null)
- {
- throw new RuntimeException(REDUCERS_PER_INPUT + " is required");
- }
-
- int reducersPerInput = Integer.parseInt(conf.get(REDUCERS_PER_INPUT));
-
- this.partitionMapping = new HashMap<Long,List<Integer>>();
- int partition = 0;
- for (String part : conf.get(INPUT_TIMES).split(","))
- {
- Long day = Long.parseLong(part);
-
- List<Integer> partitions = new ArrayList<Integer>();
- for (int r=0; r<reducersPerInput; r++)
- {
- partitions.add(partition);
- partition = (partition + 1) % this.numReducers;
- }
-
- partitionMapping.put(day,partitions);
- }
- }
-
- @Override
- public int getPartition(AvroKey<GenericRecord> key, AvroValue<GenericRecord> value, int numReduceTasks)
- {
- if (numReduceTasks != this.numReducers)
- {
- throw new RuntimeException("numReduceTasks " + numReduceTasks + " does not match expected " + this.numReducers);
- }
-
- Long time = (Long)key.datum().get("time");
- if (time == null)
- {
- throw new RuntimeException("time is null");
- }
-
- List<Integer> partitions = this.partitionMapping.get(time);
-
- if (partitions == null)
- {
- throw new RuntimeException("Couldn't find partition for " + time);
- }
-
- GenericRecord extractedKey = (GenericRecord)key.datum().get("value");
-
- if (extractedKey == null)
- {
- throw new RuntimeException("extracted key is null");
- }
-
- int partitionIndex = (extractedKey.hashCode() & Integer.MAX_VALUE) % partitions.size();
-
- return partitions.get(partitionIndex);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/jobs/package-info.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/jobs/package-info.java b/contrib/hourglass/src/java/datafu/hourglass/jobs/package-info.java
deleted file mode 100644
index 010bfd0..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/jobs/package-info.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Incremental Hadoop jobs and some supporting classes.
- *
- * <p>
- * Jobs within this package form the core of the incremental framework implementation.
- * There are two types of incremental jobs: <em>partition-preserving</em> and
- * <em>partition-collapsing</em>.
- * </p>
- *
- * <p>
- * A partition-preserving job consumes input data partitioned by day and produces output data partitioned by day.
- * This is equivalent to running a MapReduce job for each individual day of input data,
- * but much more efficient. It compares the input data against the existing output data and only processes
- * input data with no corresponding output.
- * </p>
- *
- * <p>
- * A partition-collapsing job consumes input data partitioned by day and produces a single output.
- * What distinguishes this job from a standard MapReduce job is that it can reuse the previous output.
- * This enables it to process data much more efficiently. Rather than consuming all input data on each
- * run, it can consume only the new data since the previous run and merges it with the previous output.
- * </p>
- *
- * <p>
- * Partition-preserving and partition-collapsing jobs can be created by extending {@link datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob}
- * and {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob}, respectively, and implementing the necessary methods.
- * Alternatively, there are concrete versions of these classes, {@link datafu.hourglass.jobs.PartitionPreservingIncrementalJob} and
- * {@link datafu.hourglass.jobs.PartitionCollapsingIncrementalJob}, which can be used instead. With these classes, the implementations are provided
- * through setters.
- * </p>
- *
- * <p>
- * Incremental jobs use Avro for input, intermediate, and output data. To implement an incremental job, one must define their schemas.
- * A <em>key schema</em> and <em>intermediate value schema</em> specify the output of the mapper and combiner, which output key-value pairs.
- * The <em>key schema</em> and an <em>output value schema</em> specify the output of the reducer, which outputs a record having key and value
- * fields.
- * </p>
- *
- * <p>
- * An incremental job also requires that implementations of map and reduce be defined, and optionally combine. The map implementation must
- * implement a {@link datafu.hourglass.model.Mapper} interface, which is very similar to the standard map interface in Hadoop.
- * The combine and reduce operations are implemented through an {@link datafu.hourglass.model.Accumulator} interface.
- * This is similar to the standard reduce in Hadoop, however values are provided one-at-a-time rather than by an enumerable list.
- * Also an accumulator returns either one value or no value at all by returning null. That is, the accumulator may not return an arbitrary number of values
- * for the output. This restriction precludes the implementation of certain operations, like flatten, which do not fit well within the
- * incremental programming model.
- * </p>
- */
-package datafu.hourglass.jobs;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/AvroKeyValueIdentityMapper.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/AvroKeyValueIdentityMapper.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/AvroKeyValueIdentityMapper.java
deleted file mode 100644
index b8571a0..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/AvroKeyValueIdentityMapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- * A mapper which outputs key-value pairs as-is.
- *
- * It assumes the input is an Avro record having "key" and "value" fields.
- * The output is these exact same fields.
- *
- * @author "Matthew Hayes"
- *
- */
-public class AvroKeyValueIdentityMapper extends Mapper<Object, Object, Object, Object>
-{
- @Override
- protected void map(Object keyObj, Object valueObj, Context context) throws java.io.IOException, java.lang.InterruptedException
- {
- @SuppressWarnings("unchecked")
- GenericRecord input = ((AvroKey<GenericRecord>)keyObj).datum();
- GenericRecord key = (GenericRecord)input.get("key");
- GenericRecord value = (GenericRecord)input.get("value");
- context.write(new AvroKey<GenericRecord>(key),new AvroValue<GenericRecord>(value));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingCombiner.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingCombiner.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingCombiner.java
deleted file mode 100644
index aa8d64c..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingCombiner.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.hadoop.mapreduce.ReduceContext;
-
-
-import datafu.hourglass.fs.DateRange;
-import datafu.hourglass.jobs.DateRangeConfigurable;
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.schemas.PartitionCollapsingSchemas;
-
-/**
- * The combiner used by {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob} and its derived classes.
- *
- * <p>
- * An implementation of {@link datafu.hourglass.model.Accumulator} is used to perform aggregation and produce the
- * intermediate value.
- * </p>
- *
- * @author "Matthew Hayes"
- *
- */
-public class CollapsingCombiner extends ObjectReducer implements DateRangeConfigurable, Serializable
-{
- private Accumulator<GenericRecord,GenericRecord> _accumulator;
- private boolean _reusePreviousOutput;
- private PartitionCollapsingSchemas _schemas;
- private long _beginTime;
- private long _endTime;
-
- @SuppressWarnings("unchecked")
- public void reduce(Object keyObj,
- Iterable<Object> values,
- ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
- {
- Accumulator<GenericRecord,GenericRecord> acc = getAccumulator();
-
- if (acc == null)
- {
- throw new RuntimeException("No combiner factory set");
- }
-
- long accumulatedCount = 0;
-
- acc.cleanup();
-
- for (Object valueObj : values)
- {
- GenericRecord value = ((AvroValue<GenericRecord>)valueObj).datum();
- if (value.getSchema().getFullName().equals(getSchemas().getIntermediateValueSchema().getFullName()))
- {
- acc.accumulate(value);
- accumulatedCount++;
- }
- else if (value.getSchema().getFullName().equals(getSchemas().getDatedIntermediateValueSchema().getFullName()))
- {
- if (!_reusePreviousOutput)
- {
- throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName());
- }
-
- Long time = (Long)value.get("time");
- GenericRecord data = (GenericData.Record)value.get("value");
-
- if (time == null)
- {
- throw new RuntimeException("time is null");
- }
-
- if (data == null)
- {
- throw new RuntimeException("value is null");
- }
-
- if (time >= _beginTime && time <= _endTime)
- {
- acc.accumulate(data);
- accumulatedCount++;
- }
- else if (time < _beginTime)
- {
- // pass through unchanged, reducer will handle it
- context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(value));
- }
- else
- {
- throw new RuntimeException(String.format("Time %d is greater than end time %d",time,_endTime));
- }
- }
- else if (value.getSchema().getFullName().equals(getSchemas().getOutputValueSchema().getFullName()))
- {
- if (!_reusePreviousOutput)
- {
- throw new RuntimeException("Did not expect " + getSchemas().getOutputValueSchema().getFullName());
- }
-
- // pass through unchanged, reducer will handle it
- context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(value));
- }
- else
- {
- throw new RuntimeException("Unexpected type: " + value.getSchema().getFullName());
- }
- }
-
- if (accumulatedCount > 0)
- {
- GenericRecord intermediateValue = acc.getFinal();
- if (intermediateValue != null)
- {
- context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(intermediateValue));
- }
- }
- }
-
- /**
- * Sets the schemas.
- *
- * @param schemas
- */
- public void setSchemas(PartitionCollapsingSchemas schemas)
- {
- _schemas = schemas;
- }
-
- /**
- * Gets the schemas.
- *
- * @return schemas
- */
- public PartitionCollapsingSchemas getSchemas()
- {
- return _schemas;
- }
-
- /**
- * Gets whether previous output is being reused.
- *
- * @return true if previous output is reused
- */
- public boolean getReuseOutput()
- {
- return _reusePreviousOutput;
- }
-
- /**
- * Sets whether previous output is being reused.
- *
- * @param reuseOutput true if previous output is reused
- */
- public void setReuseOutput(boolean reuseOutput)
- {
- _reusePreviousOutput = reuseOutput;
- }
-
- /**
- * Gets the accumulator used to perform aggregation.
- *
- * @return The accumulator
- */
- public Accumulator<GenericRecord,GenericRecord> getAccumulator()
- {
- return _accumulator;
- }
-
- /**
- * Sets the accumulator used to perform aggregation.
- *
- * @param acc The accumulator
- */
- public void setAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
- {
- _accumulator = cloneAccumulator(acc);
- }
-
- public void setOutputDateRange(DateRange dateRange)
- {
- _beginTime = dateRange.getBeginDate().getTime();
- _endTime = dateRange.getEndDate().getTime();
- }
-
- /**
- * Clone a {@link Accumulator} by serializing and deserializing it.
- *
- * @param acc The accumulator to clone
- * @return The clone accumulator
- */
- private Accumulator<GenericRecord,GenericRecord> cloneAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
- {
- try
- {
- // clone by serializing
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- ObjectOutputStream objStream;
- objStream = new ObjectOutputStream(outputStream);
- objStream.writeObject(acc);
- objStream.close();
- outputStream.close();
- ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
- ObjectInputStream objInputStream = new ObjectInputStream(inputStream);
- @SuppressWarnings("unchecked")
- Accumulator<GenericRecord,GenericRecord> result = (Accumulator<GenericRecord,GenericRecord>)objInputStream.readObject();
- objInputStream.close();
- inputStream.close();
- return result;
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- catch (ClassNotFoundException e)
- {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingMapper.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingMapper.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingMapper.java
deleted file mode 100644
index 805ed1c..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingMapper.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-import org.apache.avro.UnresolvedUnionException;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.MapContext;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.log4j.Logger;
-
-
-import datafu.hourglass.fs.PathUtils;
-import datafu.hourglass.model.KeyValueCollector;
-import datafu.hourglass.model.Mapper;
-import datafu.hourglass.schemas.PartitionCollapsingSchemas;
-
-/**
- * The mapper used by {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob} and its derived classes.
- *
- * <p>
- * An implementation of {@link datafu.hourglass.model.Mapper} is used for the
- * map operation, which produces key and intermediate value pairs from the input.
- * </p>
- *
- * @author "Matthew Hayes"
- *
- */
-public class CollapsingMapper extends ObjectMapper implements Serializable
-{
- private static Logger _log = Logger.getLogger(CollapsingMapper.class);
-
- private transient IdentityMapCollector _mapCollector;
- private transient TimeMapCollector _timeMapCollector;
-
- private boolean _reusePreviousOutput;
- private PartitionCollapsingSchemas _schemas;
- private Mapper<GenericRecord,GenericRecord,GenericRecord> _mapper;
-
- @Override
- public void map(Object inputObj, MapContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
- {
- @SuppressWarnings("unchecked")
- GenericRecord input = ((AvroKey<GenericRecord>)inputObj).datum();
- try
- {
- getMapCollector().setContext(context);
- getMapper().map(input, getMapCollector());
- }
- catch (InterruptedException e)
- {
- throw new IOException(e);
- }
- catch (UnresolvedUnionException e)
- {
- GenericRecord record = (GenericRecord)e.getUnresolvedDatum();
- _log.error("UnresolvedUnionException on schema: " + record.getSchema());
- throw e;
- }
- }
-
- /**
- * Gets whether previous output is being reused.
- *
- * @return true if previous output is reused
- */
- public boolean getReuseOutput()
- {
- return _reusePreviousOutput;
- }
-
- /**
- * Sets whether previous output is being reused.
- *
- * @param reuseOutput true if previous output is reused
- */
- public void setReuseOutput(boolean reuseOutput)
- {
- _reusePreviousOutput = reuseOutput;
- }
-
- @Override
- public void setContext(TaskInputOutputContext<Object,Object,Object,Object> context)
- {
- super.setContext(context);
-
- if (_mapper instanceof Configurable)
- {
- ((Configurable)_mapper).setConf(context.getConfiguration());
- }
- }
-
- /**
- * Gets the mapper.
- *
- * @return mapper
- */
- public Mapper<GenericRecord,GenericRecord,GenericRecord> getMapper()
- {
- return _mapper;
- }
-
- /**
- * Sets the mapper.
- *
- * @param mapper
- */
- public void setMapper(Mapper<GenericRecord,GenericRecord,GenericRecord> mapper)
- {
- _mapper = mapper;
- }
-
- /**
- * Sets the Avro schemas.
- *
- * @param schemas
- */
- public void setSchemas(PartitionCollapsingSchemas schemas)
- {
- _schemas = schemas;
- }
-
- /**
- * Gets the Avro schemas.
- *
- * @return schemas
- */
- public PartitionCollapsingSchemas getSchemas()
- {
- return _schemas;
- }
-
- /**
- * Gets the collector used to collect key-value pairs.
- *
- * @return The collector
- */
- private MapCollector getMapCollector()
- {
- if (getReuseOutput())
- {
- return getTimeMapCollector();
- }
- else
- {
- return getIdentityMapCollector();
- }
- }
-
- /**
- * Gets a collector that maps key-value pairs, where each value
- * is tagged with the partition from which it was derived.
- *
- * @return collector
- */
- private TimeMapCollector getTimeMapCollector()
- {
- if (_timeMapCollector == null)
- {
- _timeMapCollector = new TimeMapCollector(getSchemas());
- }
-
- return _timeMapCollector;
- }
-
- /**
- * Gets a collector that maps key-value pairs as-is.
- *
- * @return collector
- */
- private IdentityMapCollector getIdentityMapCollector()
- {
- if (_mapCollector == null)
- {
- _mapCollector = new IdentityMapCollector(getSchemas());
- }
-
- return _mapCollector;
- }
-
- private abstract class MapCollector implements KeyValueCollector<GenericRecord,GenericRecord>
- {
- private MapContext<Object,Object,Object,Object> context;
-
- public void setContext(MapContext<Object,Object,Object,Object> context)
- {
- this.context = context;
- }
-
- public MapContext<Object,Object,Object,Object> getContext()
- {
- return context;
- }
- }
-
- /**
- * A {@see KeyValueCollector} that outputs key-value pairs to {@link MapContext}
- * and tags each mapped value with the time for the partition it was derived from.
- *
- * @author "Matthew Hayes"
- *
- */
- private class TimeMapCollector extends MapCollector
- {
- private GenericRecord wrappedValue;
- private InputSplit lastSplit;
- private long lastTime;
-
- public TimeMapCollector(PartitionCollapsingSchemas schemas)
- {
- this.wrappedValue = new GenericData.Record(schemas.getDatedIntermediateValueSchema());
- }
-
- public void collect(GenericRecord key, GenericRecord value) throws IOException, InterruptedException
- {
- if (key == null)
- {
- throw new RuntimeException("key is null");
- }
- if (value == null)
- {
- throw new RuntimeException("value is null");
- }
-
- // wrap the value with the time so we know what to merge and what to unmerge
- long time;
- if (lastSplit == getContext().getInputSplit())
- {
- time = lastTime;
- }
- else
- {
- FileSplit currentSplit;
- lastSplit = getContext().getInputSplit();
- try
- {
- Method m = getContext().getInputSplit().getClass().getMethod("getInputSplit");
- m.setAccessible(true);
- currentSplit = (FileSplit)m.invoke(getContext().getInputSplit());
- }
- catch (SecurityException e)
- {
- throw new RuntimeException(e);
- }
- catch (NoSuchMethodException e)
- {
- throw new RuntimeException(e);
- }
- catch (IllegalArgumentException e)
- {
- throw new RuntimeException(e);
- }
- catch (IllegalAccessException e)
- {
- throw new RuntimeException(e);
- }
- catch (InvocationTargetException e)
- {
- throw new RuntimeException(e);
- }
- time = PathUtils.getDateForNestedDatedPath((currentSplit).getPath().getParent()).getTime();
- lastTime = time;
- }
-
- wrappedValue.put("time", time);
- wrappedValue.put("value", value);
-
- getContext().write(new AvroKey<GenericRecord>(key),new AvroValue<GenericRecord>(wrappedValue));
- }
- }
-
- /**
- * A {@see KeyValueCollector} that outputs key-value pairs to {@link MapContext} as-is.
- *
- * @author "Matthew Hayes"
- *
- */
- private class IdentityMapCollector extends MapCollector
- {
- public IdentityMapCollector(PartitionCollapsingSchemas schemas)
- {
- }
-
- public void collect(GenericRecord key, GenericRecord value) throws IOException, InterruptedException
- {
- if (key == null)
- {
- throw new RuntimeException("key is null");
- }
- if (value == null)
- {
- throw new RuntimeException("value is null");
- }
- getContext().write(new AvroKey<GenericRecord>(key), new AvroValue<GenericRecord>(value));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e6715a42/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingReducer.java
----------------------------------------------------------------------
diff --git a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingReducer.java b/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingReducer.java
deleted file mode 100644
index 5424754..0000000
--- a/contrib/hourglass/src/java/datafu/hourglass/mapreduce/CollapsingReducer.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/**
-* Copyright 2013 LinkedIn, Inc
-*
-* Licensed under the Apache License, Version 2.0 (the "License"); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
-
-package datafu.hourglass.mapreduce;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.hadoop.mapreduce.ReduceContext;
-
-
-import datafu.hourglass.fs.DateRange;
-import datafu.hourglass.jobs.DateRangeConfigurable;
-import datafu.hourglass.model.Accumulator;
-import datafu.hourglass.model.Merger;
-import datafu.hourglass.schemas.PartitionCollapsingSchemas;
-
-/**
- * The reducer used by {@link datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob} and its derived classes.
- *
- * <p>
- * An implementation of {@link datafu.hourglass.model.Accumulator} is used to perform aggregation and produce the
- * output value.
- * </p>
- *
- * @author "Matthew Hayes"
- *
- */
-public class CollapsingReducer extends ObjectReducer implements DateRangeConfigurable, Serializable
-{
- protected long _beginTime;
- protected long _endTime;
- private Accumulator<GenericRecord,GenericRecord> _newAccumulator;
- private Accumulator<GenericRecord,GenericRecord> _oldAccumulator;
- private Merger<GenericRecord> _merger;
- private Merger<GenericRecord> _oldMerger;
- private boolean _reusePreviousOutput;
- private PartitionCollapsingSchemas _schemas;
-
- @SuppressWarnings("unchecked")
- public void reduce(Object keyObj,
- Iterable<Object> values,
- ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
- {
- if (_newAccumulator == null)
- {
- throw new RuntimeException("No reducer set");
- }
-
- GenericRecord key = ((AvroKey<GenericRecord>)keyObj).datum();
-
- // used when processing all data (i.e. no window size)
- Accumulator<GenericRecord,GenericRecord> acc = getNewAccumulator();
- acc.cleanup();
- long accumulatedCount = 0;
-
- Accumulator<GenericRecord,GenericRecord> accOld = null;
- long oldAccumulatedCount = 0;
- if (getReuseOutput())
- {
- accOld = getOldAccumulator();
- accOld.cleanup();
- }
-
- GenericRecord previous = null;
-
- for (Object valueObj : values)
- {
- GenericRecord value = ((AvroValue<GenericRecord>)valueObj).datum();
- if (value.getSchema().getFullName().equals(getSchemas().getIntermediateValueSchema().getFullName()))
- {
- acc.accumulate(value);
- accumulatedCount++;
- }
- else if (value.getSchema().getFullName().equals(getSchemas().getDatedIntermediateValueSchema().getFullName()))
- {
- if (!_reusePreviousOutput)
- {
- throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName());
- }
-
- Long time = (Long)value.get("time");
- GenericRecord data = (GenericData.Record)value.get("value");
-
- if (time == null)
- {
- throw new RuntimeException("time is null");
- }
-
- if (data == null)
- {
- throw new RuntimeException("value is null");
- }
-
- if (time >= _beginTime && time <= _endTime)
- {
- acc.accumulate(data);
- accumulatedCount++;
- }
- else if (time < _beginTime)
- {
- accOld.accumulate(data);
- oldAccumulatedCount++;
- }
- else
- {
- throw new RuntimeException(String.format("Time %d is greater than end time %d",time,_endTime));
- }
- }
- else if (value.getSchema().getFullName().equals(getSchemas().getOutputValueSchema().getFullName()))
- {
- if (!_reusePreviousOutput)
- {
- throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName());
- }
-
- // deep clone the previous output fed back in
- previous = new GenericData.Record((Record)value,true);
- }
- else
- {
- throw new RuntimeException("Unexpected type: " + value.getSchema().getFullName());
- }
- }
-
- GenericRecord newOutputValue = null;
- GenericRecord oldOutputValue = null;
-
- if (accumulatedCount > 0)
- {
- newOutputValue = acc.getFinal();
- }
-
- if (oldAccumulatedCount > 0)
- {
- oldOutputValue = accOld.getFinal();
- }
-
- GenericRecord outputValue = null;
-
- if (previous == null)
- {
- outputValue = newOutputValue;
-
- if (oldOutputValue != null)
- {
- if (_oldMerger == null)
- {
- throw new RuntimeException("No old record merger set");
- }
-
- outputValue = _oldMerger.merge(outputValue, oldOutputValue);
- }
- }
- else
- {
- outputValue = previous;
-
- if (oldOutputValue != null)
- {
- if (_oldMerger == null)
- {
- throw new RuntimeException("No old record merger set");
- }
-
- outputValue = _oldMerger.merge(outputValue, oldOutputValue);
- }
-
- if (newOutputValue != null)
- {
- if (_merger == null)
- {
- throw new RuntimeException("No new record merger set");
- }
-
- outputValue = _merger.merge(outputValue, newOutputValue);
- }
- }
-
- if (outputValue != null)
- {
- GenericRecord output = new GenericData.Record(getSchemas().getReduceOutputSchema());
- output.put("key", key);
- output.put("value", outputValue);
- context.write(new AvroKey<GenericRecord>(output),null);
- }
- }
-
- /**
- * Sets the Avro schemas.
- *
- * @param schemas
- */
- public void setSchemas(PartitionCollapsingSchemas schemas)
- {
- _schemas = schemas;
- }
-
- /**
- * Gets the Avro schemas.
- *
- * @return
- */
- private PartitionCollapsingSchemas getSchemas()
- {
- return _schemas;
- }
-
- /**
- * Gets whether previous output is being reused.
- *
- * @return true if previous output is reused
- */
- public boolean getReuseOutput()
- {
- return _reusePreviousOutput;
- }
-
- /**
- * Sets whether previous output is being reused.
- *
- * @param reuseOutput true if previous output is reused
- */
- public void setReuseOutput(boolean reuseOutput)
- {
- _reusePreviousOutput = reuseOutput;
- }
-
- public void setAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
- {
- _newAccumulator = cloneAccumulator(acc);
- _oldAccumulator = cloneAccumulator(acc);
- }
-
- public Accumulator<GenericRecord,GenericRecord> getNewAccumulator()
- {
- return _newAccumulator;
- }
-
- public Accumulator<GenericRecord,GenericRecord> getOldAccumulator()
- {
- return _oldAccumulator;
- }
-
- public void setRecordMerger(Merger<GenericRecord> merger)
- {
- _merger = merger;
- }
-
- public void setOldRecordMerger(Merger<GenericRecord> merger)
- {
- _oldMerger = merger;
- }
-
- public void setOutputDateRange(DateRange dateRange)
- {
- _beginTime = dateRange.getBeginDate().getTime();
- _endTime = dateRange.getEndDate().getTime();
- }
-
- /**
- * Clone a {@link Accumulator} by serializing and deserializing it.
- *
- * @param acc The accumulator to clone
- * @return The clone accumulator
- */
- private Accumulator<GenericRecord,GenericRecord> cloneAccumulator(Accumulator<GenericRecord,GenericRecord> acc)
- {
- try
- {
- // clone by serializing
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- ObjectOutputStream objStream;
- objStream = new ObjectOutputStream(outputStream);
- objStream.writeObject(acc);
- objStream.close();
- outputStream.close();
- ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
- ObjectInputStream objInputStream = new ObjectInputStream(inputStream);
- @SuppressWarnings("unchecked")
- Accumulator<GenericRecord,GenericRecord> result = (Accumulator<GenericRecord,GenericRecord>)objInputStream.readObject();
- objInputStream.close();
- inputStream.close();
- return result;
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- catch (ClassNotFoundException e)
- {
- throw new RuntimeException(e);
- }
- }
-}