You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/05/16 04:05:37 UTC
svn commit: r1338996 - in /incubator/hcatalog/trunk: ./
src/java/org/apache/hcatalog/mapreduce/
src/test/org/apache/hcatalog/mapreduce/
Author: toffer
Date: Wed May 16 04:05:37 2012
New Revision: 1338996
URL: http://svn.apache.org/viewvc?rev=1338996&view=rev
Log:
HCATALOG-373 ProgressReporter should work with both old and new MR API (traviscrawford via toffer)
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1338996&r1=1338995&r2=1338996&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Wed May 16 04:05:37 2012
@@ -98,6 +98,8 @@ Release 0.4.0 - Unreleased
HCAT-2 Support nested schema conversion between Hive an Pig (julienledem via hashutosh)
IMPROVEMENTS
+ HCAT-373 ProgressReporter should work with both old and new MR API (traviscrawford via toffer)
+
HCAT-68 Logging from HCat (avandana via toffer)
HCAT-383 Add clover to build.xml (gates)
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java?rev=1338996&r1=1338995&r2=1338996&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java Wed May 16 04:05:37 2012
@@ -25,15 +25,12 @@ import java.util.Map;
import java.util.HashMap;
import java.util.List;
-import org.apache.hadoop.hive.serde2.SerDe;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -83,7 +80,7 @@ public abstract class HCatBaseInputForma
HCatUtil.serialize(hcatSchema));
}
- private static
+ protected static
org.apache.hadoop.mapred.InputFormat<WritableComparable, Writable>
getMapRedInputFormat (JobConf job, Class inputFormatClass) throws IOException {
return (
@@ -178,7 +175,7 @@ public abstract class HCatBaseInputForma
createRecordReader(InputSplit split,
TaskAttemptContext taskContext) throws IOException, InterruptedException {
- HCatSplit hcatSplit = (HCatSplit) split;
+ HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split);
PartInfo partitionInfo = hcatSplit.getPartitionInfo();
JobContext jobContext = taskContext;
@@ -186,46 +183,17 @@ public abstract class HCatBaseInputForma
jobContext.getConfiguration(), partitionInfo);
JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext);
-
- Class inputFormatClass = storageHandler.getInputFormatClass();
- org.apache.hadoop.mapred.InputFormat inputFormat =
- getMapRedInputFormat(jobConf, inputFormatClass);
-
Map<String, String> jobProperties = partitionInfo.getJobProperties();
HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
- Reporter reporter = InternalUtil.createReporter(taskContext);
- org.apache.hadoop.mapred.RecordReader recordReader =
- inputFormat.getRecordReader(hcatSplit.getBaseSplit(), jobConf, reporter);
-
- SerDe serde;
- try {
- serde = ReflectionUtils.newInstance(storageHandler.getSerDeClass(),
- jobContext.getConfiguration());
-
-// HCatUtil.logEntrySet(LOG, "props to serde", properties.entrySet());
-
- Configuration conf = storageHandler.getConf();
- InternalUtil.initializeInputSerDe(serde, conf,
- partitionInfo.getTableInfo(),partitionInfo.getPartitionSchema());
-
- } catch (Exception e) {
- throw new IOException("Unable to create objectInspector "
- + "for serde class " + storageHandler.getSerDeClass().getName()
- + e);
- }
Map<String,String> valuesNotInDataCols = getColValsNotInDataColumns(
getOutputSchema(jobContext),partitionInfo
);
- HCatRecordReader hcatRecordReader = new HCatRecordReader(storageHandler,
- recordReader,
- serde,
- valuesNotInDataCols);
- return hcatRecordReader;
+ return new HCatRecordReader(storageHandler, valuesNotInDataCols);
}
-
+
/**
* gets values for fields requested by output schema which will not be in the data
*/
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java?rev=1338996&r1=1338995&r2=1338996&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java Wed May 16 04:05:37 2012
@@ -19,13 +19,15 @@ package org.apache.hcatalog.mapreduce;
import java.io.IOException;
import java.util.Map;
-import java.util.Properties;
import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.DefaultHCatRecord;
@@ -47,9 +49,7 @@ class HCatRecordReader extends RecordRea
Writable currentValue;
/** The underlying record reader to delegate to. */
- //org.apache.hadoop.mapred.
- private final org.apache.hadoop.mapred.RecordReader
- <WritableComparable, Writable> baseRecordReader;
+ private org.apache.hadoop.mapred.RecordReader<WritableComparable, Writable> baseRecordReader;
/** The storage handler used */
private final HCatStorageHandler storageHandler;
@@ -63,16 +63,10 @@ class HCatRecordReader extends RecordRea
/**
* Instantiates a new hcat record reader.
- * @param baseRecordReader the base record reader
*/
public HCatRecordReader(HCatStorageHandler storageHandler,
- org.apache.hadoop.mapred.RecordReader<WritableComparable,
- Writable> baseRecordReader,
- SerDe serde,
Map<String,String> valuesNotInDataCols) {
- this.baseRecordReader = baseRecordReader;
this.storageHandler = storageHandler;
- this.serde = serde;
this.valuesNotInDataCols = valuesNotInDataCols;
}
@@ -83,37 +77,56 @@ class HCatRecordReader extends RecordRea
*/
@Override
public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
- TaskAttemptContext taskContext)
- throws IOException, InterruptedException {
- org.apache.hadoop.mapred.InputSplit baseSplit;
+ TaskAttemptContext taskContext) throws IOException, InterruptedException {
+
+ HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split);
+
+ baseRecordReader = createBaseRecordReader(hcatSplit, storageHandler, taskContext);
+ serde = createSerDe(hcatSplit, storageHandler, taskContext);
- // Pull the output schema out of the TaskAttemptContext
- outputSchema = (HCatSchema)HCatUtil.deserialize(
+ // Pull the output schema out of the TaskAttemptContext
+ outputSchema = (HCatSchema) HCatUtil.deserialize(
taskContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA));
- if( split instanceof HCatSplit ) {
- baseSplit = ((HCatSplit) split).getBaseSplit();
- } else {
- throw new IOException("Not a HCatSplit");
- }
+ if (outputSchema == null) {
+ outputSchema = hcatSplit.getTableSchema();
+ }
- if (outputSchema == null){
- outputSchema = ((HCatSplit) split).getTableSchema();
- }
+ // Pull the table schema out of the Split info
+ // TODO This should be passed in the TaskAttemptContext instead
+ dataSchema = hcatSplit.getDataSchema();
+ }
- // Pull the table schema out of the Split info
- // TODO This should be passed in the TaskAttemptContext instead
- dataSchema = ((HCatSplit)split).getDataSchema();
-
- Properties properties = new Properties();
- for (Map.Entry<String, String>param :
- ((HCatSplit)split).getPartitionInfo()
- .getJobProperties().entrySet()) {
- properties.setProperty(param.getKey(), param.getValue());
- }
+ private org.apache.hadoop.mapred.RecordReader createBaseRecordReader(HCatSplit hcatSplit,
+ HCatStorageHandler storageHandler, TaskAttemptContext taskContext) throws IOException {
+
+ JobConf jobConf = HCatUtil.getJobConfFromContext(taskContext);
+ HCatUtil.copyJobPropertiesToJobConf(hcatSplit.getPartitionInfo().getJobProperties(), jobConf);
+ org.apache.hadoop.mapred.InputFormat inputFormat =
+ HCatInputFormat.getMapRedInputFormat(jobConf, storageHandler.getInputFormatClass());
+ return inputFormat.getRecordReader(hcatSplit.getBaseSplit(), jobConf,
+ InternalUtil.createReporter(taskContext));
}
- /* (non-Javadoc)
+ private SerDe createSerDe(HCatSplit hcatSplit, HCatStorageHandler storageHandler,
+ TaskAttemptContext taskContext) throws IOException {
+
+ SerDe serde = ReflectionUtils.newInstance(storageHandler.getSerDeClass(),
+ taskContext.getConfiguration());
+
+ try {
+ InternalUtil.initializeInputSerDe(serde, storageHandler.getConf(),
+ hcatSplit.getPartitionInfo().getTableInfo(),
+ hcatSplit.getPartitionInfo().getPartitionSchema());
+ } catch (SerDeException e) {
+ throw new IOException("Failed initializing SerDe "
+ + storageHandler.getSerDeClass().getName(), e);
+ }
+
+ return serde;
+ }
+
+ /* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey()
*/
@Override
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java?rev=1338996&r1=1338995&r2=1338996&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java Wed May 16 04:05:37 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
@@ -168,4 +169,18 @@ static Reporter createReporter(TaskAttem
return new ProgressReporter(context);
}
+ /**
+ * Casts an InputSplit into a HCatSplit, providing a useful error message if the cast fails.
+ * @param split the InputSplit
+ * @return the HCatSplit
+ * @throws IOException
+ */
+ public static HCatSplit castToHCatSplit(InputSplit split) throws IOException {
+ if (split instanceof HCatSplit) {
+ return (HCatSplit) split;
+ } else {
+ throw new IOException("Split must be " + HCatSplit.class.getName()
+ + " but found " + split.getClass().getName());
+ }
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java?rev=1338996&r1=1338995&r2=1338996&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java Wed May 16 04:05:37 2012
@@ -21,53 +21,65 @@ package org.apache.hcatalog.mapreduce;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.util.Progressable;
-class ProgressReporter implements Reporter {
+class ProgressReporter extends StatusReporter implements Reporter {
- private Progressable progressable;
+ private TaskInputOutputContext context = null;
+ private TaskAttemptContext taskAttemptContext = null;
- public ProgressReporter(TaskAttemptContext context) {
- this(context instanceof TaskInputOutputContext?
- (TaskInputOutputContext)context:
- Reporter.NULL);
- }
-
- public ProgressReporter(Progressable progressable) {
- this.progressable = progressable;
- }
-
- @Override
- public void setStatus(String status) {
- }
-
- @Override
- public Counters.Counter getCounter(Enum<?> name) {
- return Reporter.NULL.getCounter(name);
- }
-
- @Override
- public Counters.Counter getCounter(String group, String name) {
- return Reporter.NULL.getCounter(group,name);
- }
-
- @Override
- public void incrCounter(Enum<?> key, long amount) {
- }
-
- @Override
- public void incrCounter(String group, String counter, long amount) {
- }
-
- @Override
- public InputSplit getInputSplit() throws UnsupportedOperationException {
- return Reporter.NULL.getInputSplit();
- }
-
- @Override
- public void progress() {
- progressable.progress();
+ public ProgressReporter(TaskAttemptContext context) {
+ if (context instanceof TaskInputOutputContext) {
+ this.context = (TaskInputOutputContext) context;
+ } else {
+ taskAttemptContext = context;
+ }
+ }
+
+ @Override
+ public void setStatus(String status) {
+ if (context != null) {
+ context.setStatus(status);
+ }
+ }
+
+ @Override
+ public Counters.Counter getCounter(Enum<?> name) {
+ return (context != null) ? (Counters.Counter) context.getCounter(name) : null;
+ }
+
+ @Override
+ public Counters.Counter getCounter(String group, String name) {
+ return (context != null) ? (Counters.Counter) context.getCounter(group, name) : null;
+ }
+
+ @Override
+ public void incrCounter(Enum<?> key, long amount) {
+ if (context != null) {
+ context.getCounter(key).increment(amount);
+ }
+ }
+
+ @Override
+ public void incrCounter(String group, String counter, long amount) {
+ if (context != null) {
+ context.getCounter(group, counter).increment(amount);
+ }
+ }
+
+ @Override
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ return null;
+ }
+
+ @Override
+ public void progress() {
+ if (context != null) {
+ context.progress();
+ } else {
+ taskAttemptContext.progress();
}
+ }
}
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1338996&r1=1338995&r2=1338996&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java Wed May 16 04:05:37 2012
@@ -49,6 +49,7 @@ import org.apache.hadoop.io.BytesWritabl
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.Mapper;
@@ -267,6 +268,13 @@ public abstract class HCatMapReduceTest
HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns));
boolean success = job.waitForCompletion(true);
+
+ // Ensure counters are set when data has actually been read.
+ if (partitionValues != null) {
+ assertTrue(job.getCounters().getGroup("FileSystemCounters")
+ .findCounter("FILE_BYTES_READ").getValue() > 0);
+ }
+
if (success) {
new FileOutputCommitterContainer(job,null).commitJob(job);
} else {