You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/08/17 07:32:34 UTC

svn commit: r986193 - in /hadoop/hive/trunk: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java

Author: namit
Date: Tue Aug 17 05:32:34 2010
New Revision: 986193

URL: http://svn.apache.org/viewvc?rev=986193&view=rev
Log:
HIVE-1543 Abort in ExecMapper when record reader's next gets a exception
(Ning Zhang via namit)


Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=986193&r1=986192&r2=986193&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Tue Aug 17 05:32:34 2010
@@ -144,6 +144,9 @@ Trunk -  Unreleased
     HIVE-1532 Replace globStatus with listStatus inside Hive.java's replaceFiles
     (He Yongqiang via namit)
 
+    HIVE-1543 Abort in ExecMapper when record reader's next gets a exception
+    (Ning Zhang via namit)
+
   TESTS
 
     HIVE-1464. improve  test query performance

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java?rev=986193&r1=986192&r2=986193&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java Tue Aug 17 05:32:34 2010
@@ -102,8 +102,8 @@ public class ExecMapper extends MapReduc
       }
 
       fetchOperators = new HashMap<String, FetchOperator>();
-      
-      Map<FetchOperator, JobConf> fetchOpJobConfMap = new HashMap<FetchOperator, JobConf>(); 
+
+      Map<FetchOperator, JobConf> fetchOpJobConfMap = new HashMap<FetchOperator, JobConf>();
       // create map local operators
       for (Map.Entry<String, FetchWork> entry : localWork.getAliasToFetchWork()
           .entrySet()) {
@@ -118,7 +118,7 @@ public class ExecMapper extends MapReduc
             setColumnsNeeded = true;
           }
         }
-        
+
         if (!setColumnsNeeded) {
           ColumnProjectionUtils.setFullyReadColumns(jobClone);
         }
@@ -219,6 +219,11 @@ public class ExecMapper extends MapReduc
       l4j.trace("Close called. no row processed by map.");
     }
 
+    // check if there are IOExceptions
+    if (!abort) {
+      abort = execContext.getIoCxt().getIOExceptions();
+    }
+
     // detecting failed executions by exceptions thrown by the operator tree
     // ideally hadoop should let us know whether map execution failed or not
     try {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=986193&r1=986192&r2=986193&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Tue Aug 17 05:32:34 2010
@@ -29,10 +29,10 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 
 public abstract class HiveContextAwareRecordReader<K, V> implements RecordReader<K, V> {
-  
+
   private boolean initDone = false;
-  
-  /** 
+
+  /**
    * Reads the next key/value pair from the input for processing.
    *
    * @param key the key to read data into
@@ -40,32 +40,37 @@ public abstract class HiveContextAwareRe
    * @return true if a key/value was read, false if at EOF
    */
   public abstract boolean doNext(K key, V value) throws IOException;
-  
-  /** 
+
+  /**
    * Close this {@link InputSplit} to future operations.
-   * 
+   *
    * @throws IOException
-   */ 
+   */
   public abstract void doClose() throws IOException;
-  
+
   private IOContext ioCxtRef =  null;
-  
+
   @Override
   public void close() throws IOException {
     doClose();
     initDone = false;
     ioCxtRef = null;
   }
-  
+
   @Override
   public boolean next(K key, V value) throws IOException {
     if(!initDone) {
       throw new IOException("Hive IOContext is not inited.");
     }
     updateIOContext();
-    return doNext(key, value);
+    try {
+      return doNext(key, value);
+    } catch (IOException e) {
+      ioCxtRef.setIOExceptions(true);
+      throw e;
+    }
   }
-  
+
   protected void updateIOContext()
       throws IOException {
     long pointerPos = this.getPos();
@@ -85,11 +90,11 @@ public abstract class HiveContextAwareRe
       ioCxtRef.nextBlockStart = pointerPos;
     }
   }
-  
+
   public IOContext getIOContext() {
     return IOContext.get();
   }
-  
+
   public void initIOContext(long startPos, boolean isBlockPointer, String inputFile) {
     ioCxtRef = this.getIOContext();
     ioCxtRef.currentBlockStart = startPos;
@@ -101,7 +106,7 @@ public abstract class HiveContextAwareRe
   public void initIOContext(FileSplit split, JobConf job,
       Class inputFormatClass) throws IOException {
     boolean blockPointer = false;
-    long blockStart = -1;    
+    long blockStart = -1;
     FileSplit fileSplit = (FileSplit) split;
     Path path = fileSplit.getPath();
     FileSystem fs = path.getFileSystem(job);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=986193&r1=986192&r2=986193&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Tue Aug 17 05:32:34 2010
@@ -28,14 +28,14 @@ package org.apache.hadoop.hive.ql.io;
  * nextBlockStart refers the end of current row and beginning of next row.
  */
 public class IOContext {
-  
+
   private static ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>();
   static {
     if (threadLocal.get() == null) {
-      threadLocal.set(new IOContext());      
+      threadLocal.set(new IOContext());
     }
   }
-  
+
   public static IOContext get() {
     return IOContext.threadLocal.get();
   }
@@ -43,13 +43,15 @@ public class IOContext {
   long currentBlockStart;
   long nextBlockStart;
   boolean isBlockPointer;
-  
+  boolean ioExceptions;
+
   String inputFile;
-  
+
   public IOContext() {
     this.currentBlockStart = 0;
     this.nextBlockStart = -1;
     this.isBlockPointer = true;
+    this.ioExceptions = false;
   }
 
   public long getCurrentBlockStart() {
@@ -75,7 +77,7 @@ public class IOContext {
   public void setBlockPointer(boolean isBlockPointer) {
     this.isBlockPointer = isBlockPointer;
   }
-  
+
   public String getInputFile() {
     return inputFile;
   }
@@ -83,4 +85,12 @@ public class IOContext {
   public void setInputFile(String inputFile) {
     this.inputFile = inputFile;
   }
+
+  public void setIOExceptions(boolean ioe) {
+    this.ioExceptions = ioe;
+  }
+
+  public boolean getIOExceptions() {
+    return ioExceptions;
+  }
 }