You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2012/11/27 18:08:35 UTC
svn commit: r1414266 - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/exec/
java/org/apache/hadoop/hive/ql/exec/persistence/
java/org/apache/hadoop/hive/ql/io/
java/org/apache/hadoop/hive/ql/io/rcfile/merge/
test/org/apache/hadoop/hive/ql/io/udf...
Author: hashutosh
Date: Tue Nov 27 17:08:32 2012
New Revision: 1414266
URL: http://svn.apache.org/viewvc?rev=1414266&view=rev
Log:
HIVE-3234 : getting the reporter in the recordwriter (Owen Omalley via Ashutosh Chauhan)
Added:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13InputFormat.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java
hive/trunk/ql/src/test/queries/clientpositive/custom_input_output_format.q
hive/trunk/ql/src/test/results/clientpositive/custom_input_output_format.q.out
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=1414266&r1=1414265&r2=1414266&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Tue Nov 27 17:08:32 2012
@@ -95,11 +95,12 @@ public abstract class AbstractMapJoinOpe
// all other tables are small, and are cached in the hash table
posBigTable = conf.getPosBigTable();
- emptyList = new RowContainer<ArrayList<Object>>(1, hconf);
+ emptyList = new RowContainer<ArrayList<Object>>(1, hconf, reporter);
RowContainer bigPosRC = JoinUtil.getRowContainer(hconf,
rowContainerStandardObjectInspectors.get((byte) posBigTable),
- order[posBigTable], joinCacheSize,spillTableDesc, conf, !hasFilter(posBigTable));
+ order[posBigTable], joinCacheSize,spillTableDesc, conf,
+ !hasFilter(posBigTable), reporter);
storage.put((byte) posBigTable, bigPosRC);
mapJoinRowsKey = HiveConf.getIntVar(hconf,
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=1414266&r1=1414265&r2=1414266&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Tue Nov 27 17:08:32 2012
@@ -313,7 +313,7 @@ public abstract class CommonJoinOperator
// there should be only 1 dummy object in the RowContainer
RowContainer<ArrayList<Object>> values = JoinUtil.getRowContainer(hconf,
rowContainerStandardObjectInspectors.get((byte)pos),
- alias, 1, spillTableDesc, conf, !hasFilter(pos));
+ alias, 1, spillTableDesc, conf, !hasFilter(pos), reporter);
values.add((ArrayList<Object>) dummyObj[pos]);
dummyObjVectors[pos] = values;
@@ -322,7 +322,7 @@ public abstract class CommonJoinOperator
// e.g., the output columns does not contains the input table
RowContainer rc = JoinUtil.getRowContainer(hconf,
rowContainerStandardObjectInspectors.get((byte)pos),
- alias, joinCacheSize,spillTableDesc, conf, !hasFilter(pos));
+ alias, joinCacheSize,spillTableDesc, conf, !hasFilter(pos), reporter);
storage.put(pos, rc);
pos++;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1414266&r1=1414265&r2=1414266&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Nov 27 17:08:32 2012
@@ -494,7 +494,8 @@ public class FileSinkOperator extends Te
// only create bucket files only if no dynamic partitions,
// buckets of dynamic partitions will be created for each newly created partition
fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(
- jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx]);
+ jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx],
+ reporter);
// increment the CREATED_FILES counter
if (reporter != null) {
reporter.incrCounter(ProgressCounter.CREATED_FILES, 1);
@@ -521,8 +522,6 @@ public class FileSinkOperator extends Te
* Report status to JT so that JT won't kill this task if closing takes too long
* due to too many files to close and the NN is overloaded.
*
- * @param lastUpdateTime
- * the time (msec) that progress update happened.
* @return true if a new progress update is reported, false otherwise.
*/
private boolean updateProgress() {
@@ -784,7 +783,8 @@ public class FileSinkOperator extends Te
if (conf.isLinkedFileSink() && (dpCtx != null)) {
specPath = conf.getParentDir();
}
- Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf);
+ Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf,
+ reporter);
}
} catch (IOException e) {
throw new HiveException(e);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java?rev=1414266&r1=1414265&r2=1414266&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java Tue Nov 27 17:08:32 2012
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.util.ReflectionUtils;
@@ -375,7 +376,7 @@ public class JoinUtil {
public static RowContainer getRowContainer(Configuration hconf,
List<ObjectInspector> structFieldObjectInspectors,
Byte alias,int containerSize, Map<Byte, TableDesc> spillTableDesc,
- JoinDesc conf,boolean noFilter) throws HiveException {
+ JoinDesc conf,boolean noFilter, Reporter reporter) throws HiveException {
TableDesc tblDesc = JoinUtil.getSpillTableDesc(alias,spillTableDesc,conf, noFilter);
SerDe serde = JoinUtil.getSpillSerDe(alias, spillTableDesc, conf, noFilter);
@@ -384,7 +385,7 @@ public class JoinUtil {
containerSize = -1;
}
- RowContainer rc = new RowContainer(containerSize, hconf);
+ RowContainer rc = new RowContainer(containerSize, hconf, reporter);
StructObjectInspector rcOI = null;
if (tblDesc != null) {
// arbitrary column names used internally for serializing to spill table
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1414266&r1=1414265&r2=1414266&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Tue Nov 27 17:08:32 2012
@@ -116,11 +116,13 @@ public class SMBMapJoinOperator extends
for (Byte alias : order) {
RowContainer rc = JoinUtil.getRowContainer(hconf,
rowContainerStandardObjectInspectors.get(storePos),
- alias, bucketSize,spillTableDesc, conf, !hasFilter(storePos));
+ alias, bucketSize,spillTableDesc, conf, !hasFilter(storePos),
+ reporter);
nextGroupStorage[storePos] = rc;
RowContainer candidateRC = JoinUtil.getRowContainer(hconf,
rowContainerStandardObjectInspectors.get((byte)storePos),
- alias,bucketSize,spillTableDesc, conf, !hasFilter(storePos));
+ alias,bucketSize,spillTableDesc, conf, !hasFilter(storePos),
+ reporter);
candidateStorage[alias] = candidateRC;
storePos++;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1414266&r1=1414265&r2=1414266&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Nov 27 17:08:32 2012
@@ -142,6 +142,7 @@ import org.apache.hadoop.io.compress.Def
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
@@ -1342,7 +1343,8 @@ public final class Utilities {
}
public static void mvFileToFinalPath(String specPath, Configuration hconf,
- boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf) throws IOException,
+ boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf,
+ Reporter reporter) throws IOException,
HiveException {
FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
@@ -1363,7 +1365,7 @@ public final class Utilities {
Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx);
// create empty buckets if necessary
if (emptyBuckets.size() > 0) {
- createEmptyBuckets(hconf, emptyBuckets, conf);
+ createEmptyBuckets(hconf, emptyBuckets, conf, reporter);
}
// Step3: move to the file destination
@@ -1380,17 +1382,15 @@ public final class Utilities {
* Check the existence of buckets according to bucket specification. Create empty buckets if
* needed.
*
- * @param specPath
- * The final path where the dynamic partitions should be in.
- * @param conf
- * FileSinkDesc.
- * @param dpCtx
- * dynamic partition context.
+ * @param hconf
+ * @param paths A list of empty buckets to create
+ * @param conf The definition of the FileSink.
+ * @param reporter The mapreduce reporter object
* @throws HiveException
* @throws IOException
*/
private static void createEmptyBuckets(Configuration hconf, ArrayList<String> paths,
- FileSinkDesc conf)
+ FileSinkDesc conf, Reporter reporter)
throws HiveException, IOException {
JobConf jc;
@@ -1420,7 +1420,8 @@ public final class Utilities {
for (String p : paths) {
Path path = new Path(p);
RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
- jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), path);
+ jc, hiveOutputFormat, outputClass, isCompressed,
+ tableInfo.getProperties(), path, reporter);
writer.close(false);
LOG.info("created empty bucket for enforcing bucketing at " + path);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=1414266&r1=1414265&r2=1414266&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Tue Nov 27 17:08:32 2012
@@ -109,6 +109,7 @@ public class RowContainer<Row extends Li
InputFormat<WritableComparable, Writable> inputFormat = null;
InputSplit[] inputSplits = null;
private Row dummyRow = null;
+ private final Reporter reporter;
Writable val = null; // cached to use serialize data
@@ -116,11 +117,12 @@ public class RowContainer<Row extends Li
JobConf jobCloneUsingLocalFs = null;
private LocalFileSystem localFs;
- public RowContainer(Configuration jc) throws HiveException {
- this(BLOCKSIZE, jc);
+ public RowContainer(Configuration jc, Reporter reporter) throws HiveException {
+ this(BLOCKSIZE, jc, reporter);
}
- public RowContainer(int bs, Configuration jc) throws HiveException {
+ public RowContainer(int bs, Configuration jc, Reporter reporter
+ ) throws HiveException {
// no 0-sized block
this.blockSize = bs <= 0 ? BLOCKSIZE : bs;
this.size = 0;
@@ -134,6 +136,11 @@ public class RowContainer<Row extends Li
this.serde = null;
this.standardOI = null;
this.jc = jc;
+ if (reporter == null) {
+ this.reporter = Reporter.NULL;
+ } else {
+ this.reporter = reporter;
+ }
}
private JobConf getLocalFSJobConfClone(Configuration jc) {
@@ -210,7 +217,8 @@ public class RowContainer<Row extends Li
acutalSplitNum = inputSplits.length;
}
currentSplitPointer = 0;
- rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], localJc, Reporter.NULL);
+ rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer],
+ localJc, reporter);
currentSplitPointer++;
nextBlock();
@@ -307,8 +315,9 @@ public class RowContainer<Row extends Li
HiveOutputFormat<?, ?> hiveOutputFormat = tblDesc.getOutputFileFormatClass().newInstance();
tempOutPath = new Path(tmpFile.toString());
JobConf localJc = getLocalFSJobConfClone(jc);
- rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs, hiveOutputFormat, serde
- .getSerializedClass(), false, tblDesc.getProperties(), tempOutPath);
+ rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs,
+ hiveOutputFormat, serde.getSerializedClass(), false,
+ tblDesc.getProperties(), tempOutPath, reporter);
} else if (rw == null) {
throw new HiveException("RowContainer has already been closed for writing.");
}
@@ -382,7 +391,7 @@ public class RowContainer<Row extends Li
JobConf localJc = getLocalFSJobConfClone(jc);
// open record reader to read next split
rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
- Reporter.NULL);
+ reporter);
currentSplitPointer++;
return nextBlock();
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1414266&r1=1414265&r2=1414266&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Tue Nov 27 17:08:32 2012
@@ -46,6 +46,7 @@ import org.apache.hadoop.mapred.FileOutp
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
@@ -213,7 +214,7 @@ public final class HiveFileFormatUtils {
public static RecordWriter getHiveRecordWriter(JobConf jc,
TableDesc tableInfo, Class<? extends Writable> outputClass,
- FileSinkDesc conf, Path outPath) throws HiveException {
+ FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException {
try {
HiveOutputFormat<?, ?> hiveOutputFormat = tableInfo
.getOutputFileFormatClass().newInstance();
@@ -234,7 +235,7 @@ public final class HiveFileFormatUtils {
}
}
return getRecordWriter(jc_output, hiveOutputFormat, outputClass,
- isCompressed, tableInfo.getProperties(), outPath);
+ isCompressed, tableInfo.getProperties(), outPath, reporter);
} catch (Exception e) {
throw new HiveException(e);
}
@@ -243,10 +244,11 @@ public final class HiveFileFormatUtils {
public static RecordWriter getRecordWriter(JobConf jc,
HiveOutputFormat<?, ?> hiveOutputFormat,
final Class<? extends Writable> valueClass, boolean isCompressed,
- Properties tableProp, Path outPath) throws IOException, HiveException {
+ Properties tableProp, Path outPath, Reporter reporter
+ ) throws IOException, HiveException {
if (hiveOutputFormat != null) {
return hiveOutputFormat.getHiveRecordWriter(jc, outPath, valueClass,
- isCompressed, tableProp, null);
+ isCompressed, tableProp, reporter);
}
return null;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1414266&r1=1414265&r2=1414266&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Tue Nov 27 17:08:32 2012
@@ -234,7 +234,8 @@ public class BlockMergeTask extends Task
HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID());
jobID = rj.getID().toString();
}
- RCFileMergeMapper.jobClose(outputPath, success, job, console, work.getDynPartCtx());
+ RCFileMergeMapper.jobClose(outputPath, success, job, console,
+ work.getDynPartCtx(), null);
} catch (Exception e) {
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java?rev=1414266&r1=1414265&r2=1414266&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java Tue Nov 27 17:08:32 2012
@@ -232,11 +232,13 @@ public class RCFileMergeMapper extends M
}
public static void jobClose(String outputPath, boolean success, JobConf job,
- LogHelper console, DynamicPartitionCtx dynPartCtx) throws HiveException, IOException {
+ LogHelper console, DynamicPartitionCtx dynPartCtx, Reporter reporter
+ ) throws HiveException, IOException {
Path outpath = new Path(outputPath);
FileSystem fs = outpath.getFileSystem(job);
Path backupPath = backupOutputPath(fs, outpath, job);
- Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null);
+ Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null,
+ reporter);
fs.delete(backupPath, true);
}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13InputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13InputFormat.java?rev=1414266&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13InputFormat.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13InputFormat.java Tue Nov 27 17:08:32 2012
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.udf;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import java.io.IOException;
+
+/**
+A simple input format that does a rot13 on the inputs
+ */
+class Rot13InputFormat extends TextInputFormat {
+
+ public static void rot13(byte[] bytes, int offset, int length) {
+ for(int i=offset; i < offset+length; i++) {
+ if (bytes[i] >= 'A' && bytes[i] <= 'Z') {
+ bytes[i] = (byte) ('A' + (bytes[i] - 'A' + 13) % 26);
+ } else if (bytes[i] >= 'a' && bytes[i] <= 'z') {
+ bytes[i] = (byte) ('a' + (bytes[i] - 'a' + 13) % 26);
+ }
+ }
+ }
+
+ private static class Rot13LineRecordReader extends LineRecordReader {
+ Rot13LineRecordReader(JobConf job, FileSplit split) throws IOException {
+ super(job, split);
+ }
+
+ public synchronized boolean next(LongWritable key,
+ Text value) throws IOException {
+ boolean result = super.next(key, value);
+ if (result) {
+ System.out.println("Read " + value);
+ rot13(value.getBytes(), 0, value.getLength());
+ System.out.println("Returned " + value);
+ }
+ return result;
+ }
+ }
+
+ public RecordReader<LongWritable, Text>
+ getRecordReader(InputSplit genericSplit, JobConf job,
+ Reporter reporter) throws IOException {
+ reporter.setStatus(genericSplit.toString());
+ return new Rot13LineRecordReader(job, (FileSplit) genericSplit);
+ }
+}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java?rev=1414266&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java Tue Nov 27 17:08:32 2012
@@ -0,0 +1,57 @@
+package org.apache.hadoop.hive.ql.io.udf;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class Rot13OutputFormat
+ extends HiveIgnoreKeyTextOutputFormat<LongWritable,Text> {
+
+ @Override
+ public RecordWriter
+ getHiveRecordWriter(JobConf jc,
+ Path outPath,
+ Class<? extends Writable> valueClass,
+ boolean isCompressed,
+ Properties tableProperties,
+ Progressable progress) throws IOException {
+ final RecordWriter result =
+ super.getHiveRecordWriter(jc,outPath,valueClass,isCompressed,
+ tableProperties,progress);
+ final Reporter reporter = (Reporter) progress;
+ reporter.setStatus("got here");
+ System.out.println("Got a reporter " + reporter);
+ return new RecordWriter() {
+ @Override
+ public void write(Writable w) throws IOException {
+ if (w instanceof Text) {
+ Text value = (Text) w;
+ Rot13InputFormat.rot13(value.getBytes(), 0, value.getLength());
+ result.write(w);
+ } else if (w instanceof BytesWritable) {
+ BytesWritable value = (BytesWritable) w;
+ Rot13InputFormat.rot13(value.getBytes(), 0, value.getLength());
+ result.write(w);
+ } else {
+ throw new IllegalArgumentException("need text or bytes writable " +
+ " instead of " + w.getClass().getName());
+ }
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ result.close(abort);
+ }
+ };
+ }
+}
Added: hive/trunk/ql/src/test/queries/clientpositive/custom_input_output_format.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/custom_input_output_format.q?rev=1414266&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/custom_input_output_format.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/custom_input_output_format.q Tue Nov 27 17:08:32 2012
@@ -0,0 +1,6 @@
+ADD JAR ../build/ql/test/test-udfs.jar;
+CREATE TABLE src1_rot13_iof(key STRING, value STRING)
+ STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.udf.Rot13InputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.udf.Rot13OutputFormat';
+INSERT OVERWRITE TABLE src1_rot13_iof SELECT * FROM src1;
+SELECT * FROM src1_rot13_iof;
Added: hive/trunk/ql/src/test/results/clientpositive/custom_input_output_format.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/custom_input_output_format.q.out?rev=1414266&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/custom_input_output_format.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/custom_input_output_format.q.out Tue Nov 27 17:08:32 2012
@@ -0,0 +1,54 @@
+PREHOOK: query: CREATE TABLE src1_rot13_iof(key STRING, value STRING)
+ STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.udf.Rot13InputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.udf.Rot13OutputFormat'
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE src1_rot13_iof(key STRING, value STRING)
+ STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.udf.Rot13InputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.udf.Rot13OutputFormat'
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@src1_rot13_iof
+PREHOOK: query: INSERT OVERWRITE TABLE src1_rot13_iof SELECT * FROM src1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src1
+PREHOOK: Output: default@src1_rot13_iof
+POSTHOOK: query: INSERT OVERWRITE TABLE src1_rot13_iof SELECT * FROM src1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src1
+POSTHOOK: Output: default@src1_rot13_iof
+POSTHOOK: Lineage: src1_rot13_iof.key SIMPLE [(src1)src1.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: src1_rot13_iof.value SIMPLE [(src1)src1.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: SELECT * FROM src1_rot13_iof
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src1_rot13_iof
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM src1_rot13_iof
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src1_rot13_iof
+#### A masked pattern was here ####
+POSTHOOK: Lineage: src1_rot13_iof.key SIMPLE [(src1)src1.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: src1_rot13_iof.value SIMPLE [(src1)src1.FieldSchema(name:value, type:string, comment:default), ]
+238 val_238
+
+311 val_311
+ val_27
+ val_165
+ val_409
+255 val_255
+278 val_278
+98 val_98
+ val_484
+ val_265
+ val_193
+401 val_401
+150 val_150
+273 val_273
+224
+369
+66 val_66
+128
+213 val_213
+146 val_146
+406 val_406
+
+
+