You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2012/11/27 18:12:19 UTC

svn commit: r1414269 - in /hive/branches/branch-0.10/ql/src: java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/exec/persistence/ java/org/apache/hadoop/hive/ql/io/ java/org/apache/hadoop/hive/ql/io/rcfile/merge/ test/org/apache/hadoop...

Author: hashutosh
Date: Tue Nov 27 17:12:17 2012
New Revision: 1414269

URL: http://svn.apache.org/viewvc?rev=1414269&view=rev
Log:
HIVE-3234 getting the reporter in the recordwriter (Owen Omalley via Ashutosh Chauhan)

Added:
    hive/branches/branch-0.10/ql/src/test/org/apache/hadoop/hive/ql/io/udf/
    hive/branches/branch-0.10/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13InputFormat.java
    hive/branches/branch-0.10/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java
    hive/branches/branch-0.10/ql/src/test/queries/clientpositive/custom_input_output_format.q
    hive/branches/branch-0.10/ql/src/test/results/clientpositive/custom_input_output_format.q.out
Modified:
    hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
    hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
    hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
    hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
    hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
    hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
    hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java

Modified: hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=1414269&r1=1414268&r2=1414269&view=diff
==============================================================================
--- hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (original)
+++ hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Tue Nov 27 17:12:17 2012
@@ -95,11 +95,12 @@ public abstract class AbstractMapJoinOpe
     // all other tables are small, and are cached in the hash table
     posBigTable = conf.getPosBigTable();
 
-    emptyList = new RowContainer<ArrayList<Object>>(1, hconf);
+    emptyList = new RowContainer<ArrayList<Object>>(1, hconf, reporter);
 
     RowContainer bigPosRC = JoinUtil.getRowContainer(hconf,
         rowContainerStandardObjectInspectors.get((byte) posBigTable),
-        order[posBigTable], joinCacheSize,spillTableDesc, conf, !hasFilter(posBigTable));
+        order[posBigTable], joinCacheSize,spillTableDesc, conf,
+        !hasFilter(posBigTable), reporter);
     storage.put((byte) posBigTable, bigPosRC);
 
     mapJoinRowsKey = HiveConf.getIntVar(hconf,

Modified: hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=1414269&r1=1414268&r2=1414269&view=diff
==============================================================================
--- hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Tue Nov 27 17:12:17 2012
@@ -313,7 +313,7 @@ public abstract class CommonJoinOperator
       // there should be only 1 dummy object in the RowContainer
       RowContainer<ArrayList<Object>> values = JoinUtil.getRowContainer(hconf,
           rowContainerStandardObjectInspectors.get((byte)pos),
-          alias, 1, spillTableDesc, conf, !hasFilter(pos));
+          alias, 1, spillTableDesc, conf, !hasFilter(pos), reporter);
 
       values.add((ArrayList<Object>) dummyObj[pos]);
       dummyObjVectors[pos] = values;
@@ -322,7 +322,7 @@ public abstract class CommonJoinOperator
       // e.g., the output columns does not contains the input table
       RowContainer rc = JoinUtil.getRowContainer(hconf,
           rowContainerStandardObjectInspectors.get((byte)pos),
-          alias, joinCacheSize,spillTableDesc, conf, !hasFilter(pos));
+          alias, joinCacheSize,spillTableDesc, conf, !hasFilter(pos), reporter);
       storage.put(pos, rc);
 
       pos++;

Modified: hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1414269&r1=1414268&r2=1414269&view=diff
==============================================================================
--- hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Nov 27 17:12:17 2012
@@ -494,7 +494,8 @@ public class FileSinkOperator extends Te
         // only create bucket files only if no dynamic partitions,
         // buckets of dynamic partitions will be created for each newly created partition
         fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(
-            jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx]);
+            jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx],
+            reporter);
         // increment the CREATED_FILES counter
         if (reporter != null) {
           reporter.incrCounter(ProgressCounter.CREATED_FILES, 1);
@@ -521,8 +522,6 @@ public class FileSinkOperator extends Te
    * Report status to JT so that JT won't kill this task if closing takes too long
    * due to too many files to close and the NN is overloaded.
    *
-   * @param lastUpdateTime
-   *          the time (msec) that progress update happened.
    * @return true if a new progress update is reported, false otherwise.
    */
   private boolean updateProgress() {
@@ -784,7 +783,8 @@ public class FileSinkOperator extends Te
         if (conf.isLinkedFileSink() && (dpCtx != null)) {
           specPath = conf.getParentDir();
         }
-        Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf);
+        Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf,
+          reporter);
       }
     } catch (IOException e) {
       throw new HiveException(e);

Modified: hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java?rev=1414269&r1=1414268&r2=1414269&view=diff
==============================================================================
--- hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java (original)
+++ hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java Tue Nov 27 17:12:17 2012
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -375,7 +376,7 @@ public class JoinUtil {
   public static RowContainer getRowContainer(Configuration hconf,
       List<ObjectInspector> structFieldObjectInspectors,
       Byte alias,int containerSize, Map<Byte, TableDesc> spillTableDesc,
-      JoinDesc conf,boolean noFilter) throws HiveException {
+      JoinDesc conf,boolean noFilter, Reporter reporter) throws HiveException {
 
     TableDesc tblDesc = JoinUtil.getSpillTableDesc(alias,spillTableDesc,conf, noFilter);
     SerDe serde = JoinUtil.getSpillSerDe(alias, spillTableDesc, conf, noFilter);
@@ -384,7 +385,7 @@ public class JoinUtil {
       containerSize = -1;
     }
 
-    RowContainer rc = new RowContainer(containerSize, hconf);
+    RowContainer rc = new RowContainer(containerSize, hconf, reporter);
     StructObjectInspector rcOI = null;
     if (tblDesc != null) {
       // arbitrary column names used internally for serializing to spill table

Modified: hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1414269&r1=1414268&r2=1414269&view=diff
==============================================================================
--- hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Tue Nov 27 17:12:17 2012
@@ -116,11 +116,13 @@ public class SMBMapJoinOperator extends 
     for (Byte alias : order) {
       RowContainer rc = JoinUtil.getRowContainer(hconf,
           rowContainerStandardObjectInspectors.get(storePos),
-          alias, bucketSize,spillTableDesc, conf, !hasFilter(storePos));
+          alias, bucketSize,spillTableDesc, conf, !hasFilter(storePos),
+          reporter);
       nextGroupStorage[storePos] = rc;
       RowContainer candidateRC = JoinUtil.getRowContainer(hconf,
           rowContainerStandardObjectInspectors.get((byte)storePos),
-          alias,bucketSize,spillTableDesc, conf, !hasFilter(storePos));
+          alias,bucketSize,spillTableDesc, conf, !hasFilter(storePos),
+          reporter);
       candidateStorage[alias] = candidateRC;
       storePos++;
     }

Modified: hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1414269&r1=1414268&r2=1414269&view=diff
==============================================================================
--- hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Nov 27 17:12:17 2012
@@ -142,6 +142,7 @@ import org.apache.hadoop.io.compress.Def
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -1342,7 +1343,8 @@ public final class Utilities {
   }
 
   public static void mvFileToFinalPath(String specPath, Configuration hconf,
-      boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf) throws IOException,
+      boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf,
+      Reporter reporter) throws IOException,
       HiveException {
 
     FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
@@ -1363,7 +1365,7 @@ public final class Utilities {
             Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx);
         // create empty buckets if necessary
         if (emptyBuckets.size() > 0) {
-          createEmptyBuckets(hconf, emptyBuckets, conf);
+          createEmptyBuckets(hconf, emptyBuckets, conf, reporter);
         }
 
         // Step3: move to the file destination
@@ -1380,17 +1382,15 @@ public final class Utilities {
    * Check the existence of buckets according to bucket specification. Create empty buckets if
    * needed.
    *
-   * @param specPath
-   *          The final path where the dynamic partitions should be in.
-   * @param conf
-   *          FileSinkDesc.
-   * @param dpCtx
-   *          dynamic partition context.
+   * @param hconf
+   * @param paths A list of empty buckets to create
+   * @param conf The definition of the FileSink.
+   * @param reporter The mapreduce reporter object
    * @throws HiveException
    * @throws IOException
    */
   private static void createEmptyBuckets(Configuration hconf, ArrayList<String> paths,
-      FileSinkDesc conf)
+      FileSinkDesc conf, Reporter reporter)
       throws HiveException, IOException {
 
     JobConf jc;
@@ -1420,7 +1420,8 @@ public final class Utilities {
     for (String p : paths) {
       Path path = new Path(p);
       RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
-          jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), path);
+          jc, hiveOutputFormat, outputClass, isCompressed,
+          tableInfo.getProperties(), path, reporter);
       writer.close(false);
       LOG.info("created empty bucket for enforcing bucketing at " + path);
     }

Modified: hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=1414269&r1=1414268&r2=1414269&view=diff
==============================================================================
--- hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Tue Nov 27 17:12:17 2012
@@ -109,6 +109,7 @@ public class RowContainer<Row extends Li
   InputFormat<WritableComparable, Writable> inputFormat = null;
   InputSplit[] inputSplits = null;
   private Row dummyRow = null;
+  private final Reporter reporter;
 
   Writable val = null; // cached to use serialize data
 
@@ -116,11 +117,12 @@ public class RowContainer<Row extends Li
   JobConf jobCloneUsingLocalFs = null;
   private LocalFileSystem localFs;
 
-  public RowContainer(Configuration jc) throws HiveException {
-    this(BLOCKSIZE, jc);
+  public RowContainer(Configuration jc, Reporter reporter) throws HiveException {
+    this(BLOCKSIZE, jc, reporter);
   }
 
-  public RowContainer(int bs, Configuration jc) throws HiveException {
+  public RowContainer(int bs, Configuration jc, Reporter reporter
+                     ) throws HiveException {
     // no 0-sized block
     this.blockSize = bs <= 0 ? BLOCKSIZE : bs;
     this.size = 0;
@@ -134,6 +136,11 @@ public class RowContainer<Row extends Li
     this.serde = null;
     this.standardOI = null;
     this.jc = jc;
+    if (reporter == null) {
+      this.reporter = Reporter.NULL;
+    } else {
+      this.reporter = reporter;
+    }
   }
   
   private JobConf getLocalFSJobConfClone(Configuration jc) {
@@ -210,7 +217,8 @@ public class RowContainer<Row extends Li
           acutalSplitNum = inputSplits.length;
         }
         currentSplitPointer = 0;
-        rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], localJc, Reporter.NULL);
+        rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer],
+          localJc, reporter);
         currentSplitPointer++;
 
         nextBlock();
@@ -307,8 +315,9 @@ public class RowContainer<Row extends Li
         HiveOutputFormat<?, ?> hiveOutputFormat = tblDesc.getOutputFileFormatClass().newInstance();
         tempOutPath = new Path(tmpFile.toString());
         JobConf localJc = getLocalFSJobConfClone(jc);
-        rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs, hiveOutputFormat, serde
-            .getSerializedClass(), false, tblDesc.getProperties(), tempOutPath);
+        rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs,
+            hiveOutputFormat, serde.getSerializedClass(), false,
+            tblDesc.getProperties(), tempOutPath, reporter);
       } else if (rw == null) {
         throw new HiveException("RowContainer has already been closed for writing.");
       }
@@ -382,7 +391,7 @@ public class RowContainer<Row extends Li
         JobConf localJc = getLocalFSJobConfClone(jc);
         // open record reader to read next split
         rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
-            Reporter.NULL);
+            reporter);
         currentSplitPointer++;
         return nextBlock();
       }

Modified: hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1414269&r1=1414268&r2=1414269&view=diff
==============================================================================
--- hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Tue Nov 27 17:12:17 2012
@@ -46,6 +46,7 @@ import org.apache.hadoop.mapred.FileOutp
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
@@ -213,7 +214,7 @@ public final class HiveFileFormatUtils {
 
   public static RecordWriter getHiveRecordWriter(JobConf jc,
       TableDesc tableInfo, Class<? extends Writable> outputClass,
-      FileSinkDesc conf, Path outPath) throws HiveException {
+      FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException {
     try {
       HiveOutputFormat<?, ?> hiveOutputFormat = tableInfo
           .getOutputFileFormatClass().newInstance();
@@ -234,7 +235,7 @@ public final class HiveFileFormatUtils {
         }
       }
       return getRecordWriter(jc_output, hiveOutputFormat, outputClass,
-          isCompressed, tableInfo.getProperties(), outPath);
+          isCompressed, tableInfo.getProperties(), outPath, reporter);
     } catch (Exception e) {
       throw new HiveException(e);
     }
@@ -243,10 +244,11 @@ public final class HiveFileFormatUtils {
   public static RecordWriter getRecordWriter(JobConf jc,
       HiveOutputFormat<?, ?> hiveOutputFormat,
       final Class<? extends Writable> valueClass, boolean isCompressed,
-      Properties tableProp, Path outPath) throws IOException, HiveException {
+      Properties tableProp, Path outPath, Reporter reporter
+      ) throws IOException, HiveException {
     if (hiveOutputFormat != null) {
       return hiveOutputFormat.getHiveRecordWriter(jc, outPath, valueClass,
-          isCompressed, tableProp, null);
+          isCompressed, tableProp, reporter);
     }
     return null;
   }

Modified: hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1414269&r1=1414268&r2=1414269&view=diff
==============================================================================
--- hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Tue Nov 27 17:12:17 2012
@@ -234,7 +234,8 @@ public class BlockMergeTask extends Task
           HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID());
           jobID = rj.getID().toString();
         }
-        RCFileMergeMapper.jobClose(outputPath, success, job, console, work.getDynPartCtx());
+        RCFileMergeMapper.jobClose(outputPath, success, job, console,
+          work.getDynPartCtx(), null);
       } catch (Exception e) {
       }
     }

Modified: hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java?rev=1414269&r1=1414268&r2=1414269&view=diff
==============================================================================
--- hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java (original)
+++ hive/branches/branch-0.10/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java Tue Nov 27 17:12:17 2012
@@ -232,11 +232,13 @@ public class RCFileMergeMapper extends M
   }
 
   public static void jobClose(String outputPath, boolean success, JobConf job,
-      LogHelper console, DynamicPartitionCtx dynPartCtx) throws HiveException, IOException {
+      LogHelper console, DynamicPartitionCtx dynPartCtx, Reporter reporter
+      ) throws HiveException, IOException {
     Path outpath = new Path(outputPath);
     FileSystem fs = outpath.getFileSystem(job);
     Path backupPath = backupOutputPath(fs, outpath, job);
-    Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null);
+    Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null,
+      reporter);
     fs.delete(backupPath, true);
   }
 

Added: hive/branches/branch-0.10/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13InputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.10/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13InputFormat.java?rev=1414269&view=auto
==============================================================================
--- hive/branches/branch-0.10/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13InputFormat.java (added)
+++ hive/branches/branch-0.10/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13InputFormat.java Tue Nov 27 17:12:17 2012
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.udf;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import java.io.IOException;
+
+/**
+A simple input format that does a rot13 on the inputs
+ */
+class Rot13InputFormat extends TextInputFormat {
+
+  public static void rot13(byte[] bytes, int offset, int length) {
+    for(int i=offset; i < offset+length; i++) {
+      if (bytes[i] >= 'A' && bytes[i] <= 'Z') {
+        bytes[i] = (byte) ('A' + (bytes[i] - 'A' + 13) % 26);
+      } else if (bytes[i] >= 'a' && bytes[i] <= 'z') {
+        bytes[i] = (byte) ('a' + (bytes[i] - 'a' + 13) % 26);
+      }
+    }
+  }
+
+  private static class Rot13LineRecordReader extends LineRecordReader {
+    Rot13LineRecordReader(JobConf job, FileSplit split) throws IOException {
+      super(job, split);
+    }
+
+    public synchronized boolean next(LongWritable key,
+                                     Text value) throws IOException {
+      boolean result = super.next(key, value);
+      if (result) {
+        System.out.println("Read " + value);
+        rot13(value.getBytes(), 0, value.getLength());
+        System.out.println("Returned " + value);
+      }
+      return result;
+    }
+  }
+
+  public RecordReader<LongWritable, Text>
+    getRecordReader(InputSplit genericSplit, JobConf job,
+                    Reporter reporter) throws IOException {
+    reporter.setStatus(genericSplit.toString());
+    return new Rot13LineRecordReader(job, (FileSplit) genericSplit);
+  }
+}

Added: hive/branches/branch-0.10/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.10/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java?rev=1414269&view=auto
==============================================================================
--- hive/branches/branch-0.10/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java (added)
+++ hive/branches/branch-0.10/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java Tue Nov 27 17:12:17 2012
@@ -0,0 +1,57 @@
+package org.apache.hadoop.hive.ql.io.udf;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class Rot13OutputFormat
+  extends HiveIgnoreKeyTextOutputFormat<LongWritable,Text> {
+
+  @Override
+  public RecordWriter
+    getHiveRecordWriter(JobConf jc,
+                        Path outPath,
+                        Class<? extends Writable> valueClass,
+                        boolean isCompressed,
+                        Properties tableProperties,
+                        Progressable progress) throws IOException {
+    final RecordWriter result =
+      super.getHiveRecordWriter(jc,outPath,valueClass,isCompressed,
+        tableProperties,progress);
+    final Reporter reporter = (Reporter) progress;
+    reporter.setStatus("got here");
+    System.out.println("Got a reporter " + reporter);
+    return new RecordWriter() {
+      @Override
+      public void write(Writable w) throws IOException {
+        if (w instanceof Text) {
+          Text value = (Text) w;
+          Rot13InputFormat.rot13(value.getBytes(), 0, value.getLength());
+          result.write(w);
+        } else if (w instanceof BytesWritable) {
+          BytesWritable value = (BytesWritable) w;
+          Rot13InputFormat.rot13(value.getBytes(), 0, value.getLength());
+          result.write(w);
+        } else {
+          throw new IllegalArgumentException("need text or bytes writable " +
+            " instead of " + w.getClass().getName());
+        }
+      }
+
+      @Override
+      public void close(boolean abort) throws IOException {
+        result.close(abort);
+      }
+    };
+  }
+}

Added: hive/branches/branch-0.10/ql/src/test/queries/clientpositive/custom_input_output_format.q
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.10/ql/src/test/queries/clientpositive/custom_input_output_format.q?rev=1414269&view=auto
==============================================================================
--- hive/branches/branch-0.10/ql/src/test/queries/clientpositive/custom_input_output_format.q (added)
+++ hive/branches/branch-0.10/ql/src/test/queries/clientpositive/custom_input_output_format.q Tue Nov 27 17:12:17 2012
@@ -0,0 +1,6 @@
+ADD JAR ../build/ql/test/test-udfs.jar;
+CREATE TABLE src1_rot13_iof(key STRING, value STRING) 
+  STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.udf.Rot13InputFormat'
+            OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.udf.Rot13OutputFormat';
+INSERT OVERWRITE TABLE src1_rot13_iof SELECT * FROM src1;
+SELECT * FROM src1_rot13_iof;

Added: hive/branches/branch-0.10/ql/src/test/results/clientpositive/custom_input_output_format.q.out
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.10/ql/src/test/results/clientpositive/custom_input_output_format.q.out?rev=1414269&view=auto
==============================================================================
--- hive/branches/branch-0.10/ql/src/test/results/clientpositive/custom_input_output_format.q.out (added)
+++ hive/branches/branch-0.10/ql/src/test/results/clientpositive/custom_input_output_format.q.out Tue Nov 27 17:12:17 2012
@@ -0,0 +1,54 @@
+PREHOOK: query: CREATE TABLE src1_rot13_iof(key STRING, value STRING) 
+  STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.udf.Rot13InputFormat'
+            OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.udf.Rot13OutputFormat'
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE src1_rot13_iof(key STRING, value STRING) 
+  STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.udf.Rot13InputFormat'
+            OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.udf.Rot13OutputFormat'
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@src1_rot13_iof
+PREHOOK: query: INSERT OVERWRITE TABLE src1_rot13_iof SELECT * FROM src1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src1
+PREHOOK: Output: default@src1_rot13_iof
+POSTHOOK: query: INSERT OVERWRITE TABLE src1_rot13_iof SELECT * FROM src1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src1
+POSTHOOK: Output: default@src1_rot13_iof
+POSTHOOK: Lineage: src1_rot13_iof.key SIMPLE [(src1)src1.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: src1_rot13_iof.value SIMPLE [(src1)src1.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: SELECT * FROM src1_rot13_iof
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src1_rot13_iof
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM src1_rot13_iof
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src1_rot13_iof
+#### A masked pattern was here ####
+POSTHOOK: Lineage: src1_rot13_iof.key SIMPLE [(src1)src1.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: src1_rot13_iof.value SIMPLE [(src1)src1.FieldSchema(name:value, type:string, comment:default), ]
+238	val_238
+	
+311	val_311
+	val_27
+	val_165
+	val_409
+255	val_255
+278	val_278
+98	val_98
+	val_484
+	val_265
+	val_193
+401	val_401
+150	val_150
+273	val_273
+224	
+369	
+66	val_66
+128	
+213	val_213
+146	val_146
+406	val_406
+	
+	
+