You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/08/17 07:32:34 UTC
svn commit: r986193 - in /hadoop/hive/trunk: CHANGES.txt
ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
Author: namit
Date: Tue Aug 17 05:32:34 2010
New Revision: 986193
URL: http://svn.apache.org/viewvc?rev=986193&view=rev
Log:
HIVE-1543 Abort in ExecMapper when record reader's next gets a exception
(Ning Zhang via namit)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=986193&r1=986192&r2=986193&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Tue Aug 17 05:32:34 2010
@@ -144,6 +144,9 @@ Trunk - Unreleased
HIVE-1532 Replace globStatus with listStatus inside Hive.java's replaceFiles
(He Yongqiang via namit)
+ HIVE-1543 Abort in ExecMapper when record reader's next gets a exception
+ (Ning Zhang via namit)
+
TESTS
HIVE-1464. improve test query performance
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java?rev=986193&r1=986192&r2=986193&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java Tue Aug 17 05:32:34 2010
@@ -102,8 +102,8 @@ public class ExecMapper extends MapReduc
}
fetchOperators = new HashMap<String, FetchOperator>();
-
- Map<FetchOperator, JobConf> fetchOpJobConfMap = new HashMap<FetchOperator, JobConf>();
+
+ Map<FetchOperator, JobConf> fetchOpJobConfMap = new HashMap<FetchOperator, JobConf>();
// create map local operators
for (Map.Entry<String, FetchWork> entry : localWork.getAliasToFetchWork()
.entrySet()) {
@@ -118,7 +118,7 @@ public class ExecMapper extends MapReduc
setColumnsNeeded = true;
}
}
-
+
if (!setColumnsNeeded) {
ColumnProjectionUtils.setFullyReadColumns(jobClone);
}
@@ -219,6 +219,11 @@ public class ExecMapper extends MapReduc
l4j.trace("Close called. no row processed by map.");
}
+ // check if there are IOExceptions
+ if (!abort) {
+ abort = execContext.getIoCxt().getIOExceptions();
+ }
+
// detecting failed executions by exceptions thrown by the operator tree
// ideally hadoop should let us know whether map execution failed or not
try {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=986193&r1=986192&r2=986193&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Tue Aug 17 05:32:34 2010
@@ -29,10 +29,10 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
public abstract class HiveContextAwareRecordReader<K, V> implements RecordReader<K, V> {
-
+
private boolean initDone = false;
-
- /**
+
+ /**
* Reads the next key/value pair from the input for processing.
*
* @param key the key to read data into
@@ -40,32 +40,37 @@ public abstract class HiveContextAwareRe
* @return true if a key/value was read, false if at EOF
*/
public abstract boolean doNext(K key, V value) throws IOException;
-
- /**
+
+ /**
* Close this {@link InputSplit} to future operations.
- *
+ *
* @throws IOException
- */
+ */
public abstract void doClose() throws IOException;
-
+
private IOContext ioCxtRef = null;
-
+
@Override
public void close() throws IOException {
doClose();
initDone = false;
ioCxtRef = null;
}
-
+
@Override
public boolean next(K key, V value) throws IOException {
if(!initDone) {
throw new IOException("Hive IOContext is not inited.");
}
updateIOContext();
- return doNext(key, value);
+ try {
+ return doNext(key, value);
+ } catch (IOException e) {
+ ioCxtRef.setIOExceptions(true);
+ throw e;
+ }
}
-
+
protected void updateIOContext()
throws IOException {
long pointerPos = this.getPos();
@@ -85,11 +90,11 @@ public abstract class HiveContextAwareRe
ioCxtRef.nextBlockStart = pointerPos;
}
}
-
+
public IOContext getIOContext() {
return IOContext.get();
}
-
+
public void initIOContext(long startPos, boolean isBlockPointer, String inputFile) {
ioCxtRef = this.getIOContext();
ioCxtRef.currentBlockStart = startPos;
@@ -101,7 +106,7 @@ public abstract class HiveContextAwareRe
public void initIOContext(FileSplit split, JobConf job,
Class inputFormatClass) throws IOException {
boolean blockPointer = false;
- long blockStart = -1;
+ long blockStart = -1;
FileSplit fileSplit = (FileSplit) split;
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(job);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=986193&r1=986192&r2=986193&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Tue Aug 17 05:32:34 2010
@@ -28,14 +28,14 @@ package org.apache.hadoop.hive.ql.io;
* nextBlockStart refers the end of current row and beginning of next row.
*/
public class IOContext {
-
+
private static ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>();
static {
if (threadLocal.get() == null) {
- threadLocal.set(new IOContext());
+ threadLocal.set(new IOContext());
}
}
-
+
public static IOContext get() {
return IOContext.threadLocal.get();
}
@@ -43,13 +43,15 @@ public class IOContext {
long currentBlockStart;
long nextBlockStart;
boolean isBlockPointer;
-
+ boolean ioExceptions;
+
String inputFile;
-
+
public IOContext() {
this.currentBlockStart = 0;
this.nextBlockStart = -1;
this.isBlockPointer = true;
+ this.ioExceptions = false;
}
public long getCurrentBlockStart() {
@@ -75,7 +77,7 @@ public class IOContext {
public void setBlockPointer(boolean isBlockPointer) {
this.isBlockPointer = isBlockPointer;
}
-
+
public String getInputFile() {
return inputFile;
}
@@ -83,4 +85,12 @@ public class IOContext {
public void setInputFile(String inputFile) {
this.inputFile = inputFile;
}
+
+ public void setIOExceptions(boolean ioe) {
+ this.ioExceptions = ioe;
+ }
+
+ public boolean getIOExceptions() {
+ return ioExceptions;
+ }
}