You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/05/16 04:05:37 UTC

svn commit: r1338996 - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/mapreduce/ src/test/org/apache/hcatalog/mapreduce/

Author: toffer
Date: Wed May 16 04:05:37 2012
New Revision: 1338996

URL: http://svn.apache.org/viewvc?rev=1338996&view=rev
Log:
HCATALOG-373 ProgressReporter should work with both old and new MR API (traviscrawford via toffer)

Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1338996&r1=1338995&r2=1338996&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Wed May 16 04:05:37 2012
@@ -98,6 +98,8 @@ Release 0.4.0 - Unreleased
   HCAT-2 Support nested schema conversion between Hive an Pig (julienledem via hashutosh)
 
   IMPROVEMENTS
+  HCAT-373 ProgressReporter should work with both old and new MR API (traviscrawford via toffer) 
+
   HCAT-68 Logging from HCat (avandana via toffer)
 
   HCAT-383 Add clover to build.xml (gates)

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java?rev=1338996&r1=1338995&r2=1338996&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java Wed May 16 04:05:37 2012
@@ -25,15 +25,12 @@ import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
 
-import org.apache.hadoop.hive.serde2.SerDe;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -83,7 +80,7 @@ public abstract class HCatBaseInputForma
                                HCatUtil.serialize(hcatSchema));
   }
 
-  private static 
+  protected static
     org.apache.hadoop.mapred.InputFormat<WritableComparable, Writable>
     getMapRedInputFormat (JobConf job, Class inputFormatClass) throws IOException {
       return (
@@ -178,7 +175,7 @@ public abstract class HCatBaseInputForma
   createRecordReader(InputSplit split,
       TaskAttemptContext taskContext) throws IOException, InterruptedException {
 
-    HCatSplit hcatSplit = (HCatSplit) split;
+    HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split);
     PartInfo partitionInfo = hcatSplit.getPartitionInfo();
     JobContext jobContext = taskContext;
 
@@ -186,46 +183,17 @@ public abstract class HCatBaseInputForma
         jobContext.getConfiguration(), partitionInfo);
     
     JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext);
-
-    Class inputFormatClass = storageHandler.getInputFormatClass();
-    org.apache.hadoop.mapred.InputFormat inputFormat = 
-                              getMapRedInputFormat(jobConf, inputFormatClass);
-
     Map<String, String> jobProperties = partitionInfo.getJobProperties();
     HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
-    Reporter reporter = InternalUtil.createReporter(taskContext);
-    org.apache.hadoop.mapred.RecordReader recordReader =
-      inputFormat.getRecordReader(hcatSplit.getBaseSplit(), jobConf, reporter);
-
-    SerDe serde;
-    try {
-      serde = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), 
-                                          jobContext.getConfiguration());
-
-//    HCatUtil.logEntrySet(LOG, "props to serde", properties.entrySet());
-
-      Configuration conf = storageHandler.getConf();
-      InternalUtil.initializeInputSerDe(serde, conf, 
-                                  partitionInfo.getTableInfo(),partitionInfo.getPartitionSchema());
-                                  
-    } catch (Exception e) {
-      throw new IOException("Unable to create objectInspector "
-          + "for serde class " + storageHandler.getSerDeClass().getName()
-          + e);
-    }
 
     Map<String,String> valuesNotInDataCols = getColValsNotInDataColumns(
         getOutputSchema(jobContext),partitionInfo
         );
 
-    HCatRecordReader hcatRecordReader = new HCatRecordReader(storageHandler, 
-                                                             recordReader, 
-                                                             serde,
-                                                             valuesNotInDataCols);
-    return hcatRecordReader;
+    return new HCatRecordReader(storageHandler, valuesNotInDataCols);
   }
 
-  
+
   /**
    * gets values for fields requested by output schema which will not be in the data
    */

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java?rev=1338996&r1=1338995&r2=1338996&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java Wed May 16 04:05:37 2012
@@ -19,13 +19,15 @@ package org.apache.hcatalog.mapreduce;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.Properties;
 
 import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.DefaultHCatRecord;
@@ -47,9 +49,7 @@ class HCatRecordReader extends RecordRea
     Writable currentValue;
 
     /** The underlying record reader to delegate to. */
-    //org.apache.hadoop.mapred.
-    private final org.apache.hadoop.mapred.RecordReader
-      <WritableComparable, Writable> baseRecordReader;
+    private org.apache.hadoop.mapred.RecordReader<WritableComparable, Writable> baseRecordReader;
 
     /** The storage handler used */
     private final HCatStorageHandler storageHandler;
@@ -63,16 +63,10 @@ class HCatRecordReader extends RecordRea
 
     /**
      * Instantiates a new hcat record reader.
-     * @param baseRecordReader the base record reader
      */
     public HCatRecordReader(HCatStorageHandler storageHandler,
-        org.apache.hadoop.mapred.RecordReader<WritableComparable,
-                     Writable> baseRecordReader,
-                     SerDe serde,
                      Map<String,String> valuesNotInDataCols) {
-      this.baseRecordReader = baseRecordReader;
       this.storageHandler = storageHandler;
-      this.serde = serde;
       this.valuesNotInDataCols = valuesNotInDataCols;
     }
 
@@ -83,37 +77,56 @@ class HCatRecordReader extends RecordRea
      */
     @Override
     public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
-                           TaskAttemptContext taskContext)
-    throws IOException, InterruptedException {
-        org.apache.hadoop.mapred.InputSplit baseSplit;
+        TaskAttemptContext taskContext) throws IOException, InterruptedException {
+
+      HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split);
+
+      baseRecordReader = createBaseRecordReader(hcatSplit, storageHandler, taskContext);
+      serde = createSerDe(hcatSplit, storageHandler, taskContext);
 
-        // Pull the output schema out of the TaskAttemptContext
-        outputSchema = (HCatSchema)HCatUtil.deserialize(
+      // Pull the output schema out of the TaskAttemptContext
+      outputSchema = (HCatSchema) HCatUtil.deserialize(
           taskContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA));
 
-        if( split instanceof HCatSplit ) {
-            baseSplit = ((HCatSplit) split).getBaseSplit();
-        } else {
-          throw new IOException("Not a HCatSplit");
-        }
+      if (outputSchema == null) {
+        outputSchema = hcatSplit.getTableSchema();
+      }
 
-        if (outputSchema == null){
-          outputSchema = ((HCatSplit) split).getTableSchema();
-        }
+      // Pull the table schema out of the Split info
+      // TODO This should be passed in the TaskAttemptContext instead
+      dataSchema = hcatSplit.getDataSchema();
+    }
 
-        // Pull the table schema out of the Split info
-        // TODO This should be passed in the TaskAttemptContext instead
-        dataSchema = ((HCatSplit)split).getDataSchema();
-
-        Properties properties = new Properties();
-        for (Map.Entry<String, String>param :
-            ((HCatSplit)split).getPartitionInfo()
-                              .getJobProperties().entrySet()) {
-          properties.setProperty(param.getKey(), param.getValue());
-        }
+    private org.apache.hadoop.mapred.RecordReader createBaseRecordReader(HCatSplit hcatSplit,
+        HCatStorageHandler storageHandler, TaskAttemptContext taskContext) throws IOException {
+
+      JobConf jobConf = HCatUtil.getJobConfFromContext(taskContext);
+      HCatUtil.copyJobPropertiesToJobConf(hcatSplit.getPartitionInfo().getJobProperties(), jobConf);
+      org.apache.hadoop.mapred.InputFormat inputFormat =
+          HCatInputFormat.getMapRedInputFormat(jobConf, storageHandler.getInputFormatClass());
+      return inputFormat.getRecordReader(hcatSplit.getBaseSplit(), jobConf,
+          InternalUtil.createReporter(taskContext));
     }
 
-    /* (non-Javadoc)
+    private SerDe createSerDe(HCatSplit hcatSplit, HCatStorageHandler storageHandler,
+        TaskAttemptContext taskContext) throws IOException {
+
+      SerDe serde = ReflectionUtils.newInstance(storageHandler.getSerDeClass(),
+          taskContext.getConfiguration());
+
+      try {
+        InternalUtil.initializeInputSerDe(serde, storageHandler.getConf(),
+            hcatSplit.getPartitionInfo().getTableInfo(),
+            hcatSplit.getPartitionInfo().getPartitionSchema());
+      } catch (SerDeException e) {
+        throw new IOException("Failed initializing SerDe "
+            + storageHandler.getSerDeClass().getName(), e);
+      }
+
+      return serde;
+    }
+
+  /* (non-Javadoc)
      * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey()
      */
     @Override

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java?rev=1338996&r1=1338995&r2=1338996&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java Wed May 16 04:05:37 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
@@ -168,4 +169,18 @@ static Reporter createReporter(TaskAttem
       return new ProgressReporter(context);
   }
 
+  /**
+   * Casts an InputSplit into a HCatSplit, providing a useful error message if the cast fails.
+   * @param split the InputSplit
+   * @return the HCatSplit
+   * @throws IOException
+   */
+  public static HCatSplit castToHCatSplit(InputSplit split) throws IOException {
+    if (split instanceof HCatSplit) {
+      return (HCatSplit) split;
+    } else {
+      throw new IOException("Split must be " + HCatSplit.class.getName()
+          + " but found " + split.getClass().getName());
+    }
+  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java?rev=1338996&r1=1338995&r2=1338996&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java Wed May 16 04:05:37 2012
@@ -21,53 +21,65 @@ package org.apache.hcatalog.mapreduce;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.util.Progressable;
 
-class ProgressReporter implements  Reporter {
+class ProgressReporter extends StatusReporter implements Reporter {
 
-    private Progressable progressable;
+  private TaskInputOutputContext context = null;
+  private TaskAttemptContext taskAttemptContext = null;
 
-    public ProgressReporter(TaskAttemptContext context) {
-            this(context instanceof TaskInputOutputContext?
-                    (TaskInputOutputContext)context:
-                    Reporter.NULL);
-    }
-
-    public ProgressReporter(Progressable progressable) {
-        this.progressable = progressable;
-    }
-
-    @Override
-    public void setStatus(String status) {
-    }
-
-    @Override
-    public Counters.Counter getCounter(Enum<?> name) {
-        return Reporter.NULL.getCounter(name);
-    }
-
-    @Override
-    public Counters.Counter getCounter(String group, String name) {
-        return Reporter.NULL.getCounter(group,name);
-    }
-
-    @Override
-    public void incrCounter(Enum<?> key, long amount) {
-    }
-
-    @Override
-    public void incrCounter(String group, String counter, long amount) {
-    }
-
-    @Override
-    public InputSplit getInputSplit() throws UnsupportedOperationException {
-        return Reporter.NULL.getInputSplit();
-    }
-
-    @Override
-    public void progress() {
-        progressable.progress();
+  public ProgressReporter(TaskAttemptContext context) {
+    if (context instanceof TaskInputOutputContext) {
+      this.context = (TaskInputOutputContext) context;
+    } else {
+      taskAttemptContext = context;
+    }
+  }
+
+  @Override
+  public void setStatus(String status) {
+    if (context != null) {
+      context.setStatus(status);
+    }
+  }
+
+  @Override
+  public Counters.Counter getCounter(Enum<?> name) {
+    return (context != null) ? (Counters.Counter) context.getCounter(name) : null;
+  }
+
+  @Override
+  public Counters.Counter getCounter(String group, String name) {
+    return (context != null) ? (Counters.Counter) context.getCounter(group, name) : null;
+  }
+
+  @Override
+  public void incrCounter(Enum<?> key, long amount) {
+    if (context != null) {
+      context.getCounter(key).increment(amount);
+    }
+  }
+
+  @Override
+  public void incrCounter(String group, String counter, long amount) {
+    if (context != null) {
+      context.getCounter(group, counter).increment(amount);
+    }
+  }
+
+  @Override
+  public InputSplit getInputSplit() throws UnsupportedOperationException {
+    return null;
+  }
+
+  @Override
+  public void progress() {
+    if (context != null) {
+      context.progress();
+    } else {
+      taskAttemptContext.progress();
     }
+  }
 }

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1338996&r1=1338995&r2=1338996&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java Wed May 16 04:05:37 2012
@@ -49,6 +49,7 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -267,6 +268,13 @@ public abstract class HCatMapReduceTest 
     HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns));
 
     boolean success = job.waitForCompletion(true);
+
+    // Ensure counters are set when data has actually been read.
+    if (partitionValues != null) {
+      assertTrue(job.getCounters().getGroup("FileSystemCounters")
+          .findCounter("FILE_BYTES_READ").getValue() > 0);
+    }
+
     if (success) {
       new FileOutputCommitterContainer(job,null).commitJob(job);
     } else {