You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2009/12/22 00:30:03 UTC
svn commit: r893029 - in /hadoop/hive/trunk: CHANGES.txt
shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
Author: heyongqiang
Date: Mon Dec 21 23:30:02 2009
New Revision: 893029
URL: http://svn.apache.org/viewvc?rev=893029&view=rev
Log:
HIVE-1001 CombinedHiveInputFormat should parse the inputpath correctly
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=893029&r1=893028&r2=893029&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Mon Dec 21 23:30:02 2009
@@ -339,6 +339,9 @@
HIVE-1000 bug in sampling if buckted table is empty
(He Yongqiang via namit)
+ HIVE-1001 CombinedHiveInputFormat should parse the inputpath correctly
+ (namit via He Yongqiang)
+
Release 0.4.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=893029&r1=893028&r2=893029&view=diff
==============================================================================
--- hadoop/hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hadoop/hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Mon Dec 21 23:30:02 2009
@@ -101,7 +101,7 @@
public int compareText(Text a, Text b) {
return a.compareTo(b);
}
-
+
@Override
public long getAccessTime(FileStatus file) {
return file.getAccessTime();
@@ -112,7 +112,7 @@
return new CombineFileInputFormatShim() {
public RecordReader getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
- throw new IOException("CombineFileInputFormat.getRecordReader not needed.");
+ throw new IOException("CombineFileInputFormat.getRecordReader not needed.");
}
};
}
@@ -121,7 +121,7 @@
public InputSplitShim() {
super();
}
-
+
public InputSplitShim(CombineFileSplit old) throws IOException {
super(old);
}
@@ -133,25 +133,25 @@
*/
public static class CombineFileRecordReader<K, V> implements RecordReader<K, V> {
- static final Class [] constructorSignature = new Class []
- {InputSplit.class,
- Configuration.class,
+ static final Class [] constructorSignature = new Class []
+ {InputSplit.class,
+ Configuration.class,
Reporter.class,
Integer.class};
-
+
protected CombineFileSplit split;
protected JobConf jc;
protected Reporter reporter;
protected Class<RecordReader<K, V>> rrClass;
protected Constructor<RecordReader<K, V>> rrConstructor;
protected FileSystem fs;
-
+
protected int idx;
protected long progress;
protected RecordReader<K, V> curReader;
-
+
public boolean next(K key, V value) throws IOException {
-
+
while ((curReader == null) || !curReader.next(key, value)) {
if (!initNextRecordReader()) {
return false;
@@ -159,41 +159,41 @@
}
return true;
}
-
+
public K createKey() {
return curReader.createKey();
}
-
+
public V createValue() {
return curReader.createValue();
}
-
+
/**
* return the amount of data processed
*/
public long getPos() throws IOException {
return progress;
}
-
+
public void close() throws IOException {
if (curReader != null) {
curReader.close();
curReader = null;
}
}
-
+
/**
* return progress based on the amount of data processed so far.
*/
public float getProgress() throws IOException {
return Math.min(1.0f, progress/(float)(split.getLength()));
}
-
+
/**
* A generic RecordReader that can hand out different recordReaders
* for each chunk in the CombineFileSplit.
*/
- public CombineFileRecordReader(JobConf job, CombineFileSplit split,
+ public CombineFileRecordReader(JobConf job, CombineFileSplit split,
Reporter reporter,
Class<RecordReader<K, V>> rrClass)
throws IOException {
@@ -204,22 +204,22 @@
this.idx = 0;
this.curReader = null;
this.progress = 0;
-
+
try {
rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
rrConstructor.setAccessible(true);
} catch (Exception e) {
- throw new RuntimeException(rrClass.getName() +
+ throw new RuntimeException(rrClass.getName() +
" does not have valid constructor", e);
}
initNextRecordReader();
}
-
+
/**
* Get the record reader for the next chunk in this CombineFileSplit.
*/
protected boolean initNextRecordReader() throws IOException {
-
+
if (curReader != null) {
curReader.close();
curReader = null;
@@ -227,17 +227,17 @@
progress += split.getLength(idx-1); // done processing so far
}
}
-
+
// if all chunks have been processed, nothing more to do.
if (idx == split.getNumPaths()) {
return false;
}
-
+
// get a record reader for the idx-th chunk
try {
- curReader = rrConstructor.newInstance(new Object []
+ curReader = rrConstructor.newInstance(new Object []
{(InputSplit)split, jc, reporter, Integer.valueOf(idx)});
-
+
// setup some helper config variables.
jc.set("map.input.file", split.getPath(idx).toString());
jc.setLong("map.input.start", split.getOffset(idx));
@@ -250,7 +250,7 @@
}
}
- public abstract static class CombineFileInputFormatShim<K, V> extends CombineFileInputFormat<K, V>
+ public abstract static class CombineFileInputFormatShim<K, V> extends CombineFileInputFormat<K, V>
implements HadoopShims.CombineFileInputFormatShim<K, V> {
public Path[] getInputPathsShim(JobConf conf) {
@@ -262,8 +262,8 @@
}
Path[] newPaths = new Path[paths.length];
// remove file:
- for (int pos = 0; pos < paths.length; pos++)
- newPaths[pos] = new Path(paths[pos].toString().substring(5));
+ for (int pos = 0; pos < paths.length; pos++)
+ newPaths[pos] = new Path(paths[pos].toUri().getPath());
return newPaths;
}
@@ -277,10 +277,10 @@
// For backward compatibility, let the above parameter be used
if (job.getLong("mapred.min.split.size.per.node", 0) == 0)
super.setMinSplitSizeNode(minSize);
-
+
if (job.getLong("mapred.min.split.size.per.rack", 0) == 0)
super.setMinSplitSizeRack(minSize);
-
+
if (job.getLong("mapred.max.split.size", 0) == 0)
super.setMaxSplitSize(minSize);
@@ -297,7 +297,7 @@
return new InputSplitShim();
}
- public RecordReader getRecordReader(JobConf job, HadoopShims.InputSplitShim split, Reporter reporter,
+ public RecordReader getRecordReader(JobConf job, HadoopShims.InputSplitShim split, Reporter reporter,
Class<RecordReader<K, V>> rrClass)
throws IOException {
CombineFileSplit cfSplit = (CombineFileSplit)split;
@@ -309,7 +309,7 @@
public String getInputFormatClassName() {
return "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat";
}
-
+
String [] ret = new String[2];
@Override
public String [] getTaskJobIDs(TaskCompletionEvent t) {
@@ -318,5 +318,5 @@
ret[1] = tid.getJobID().toString();
return ret;
}
-
+
}