You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/01/16 07:46:07 UTC

svn commit: r899891 [2/31] - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/...

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Sat Jan 16 06:44:01 2010
@@ -19,280 +19,381 @@
 package org.apache.hadoop.hive.ql.exec.persistence;
 
 import java.io.File;
-import java.io.RandomAccessFile;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * Simple persistent container for rows.
- *
+ * 
  * This container interface only accepts adding or appending new rows and
  * iterating through the rows in the order of their insertions.
- *
- * The iterator interface is a lightweight first()/next() API rather than
- * the Java Iterator interface. This way we do not need to create 
- * an Iterator object every time we want to start a new iteration. Below is 
- * simple example of how to convert a typical Java's Iterator code to the LW
- * iterator iterface.
  * 
- * Itereator itr = rowContainer.iterator();
- * while (itr.hasNext()) {
- *   v = itr.next();
- *   // do anything with v
- * }
+ * The iterator interface is a lightweight first()/next() API rather than the
+ * Java Iterator interface. This way we do not need to create an Iterator object
+ * every time we want to start a new iteration. Below is simple example of how
+ * to convert a typical Java's Iterator code to the LW iterator iterface.
+ * 
+ * Itereator itr = rowContainer.iterator(); while (itr.hasNext()) { v =
+ * itr.next(); // do anything with v }
  * 
  * can be rewritten to:
  * 
- * for ( v =  rowContainer.first(); 
- *       v != null; 
- *       v =  rowContainer.next()) {
- *   // do anything with v
- * }
- *
- * The adding and iterating operations can be interleaving. 
- *
+ * for ( v = rowContainer.first(); v != null; v = rowContainer.next()) { // do
+ * anything with v }
+ * 
+ * Once the first is called, it will not be able to write again. So there can
+ * not be any writes after read. It can be read multiple times, but it does not
+ * support multiple reader interleaving reading.
+ * 
  */
-public class RowContainer<Row extends List> {
+public class RowContainer<Row extends List<Object>> {
   
   protected Log LOG = LogFactory.getLog(this.getClass().getName());
   
   // max # of rows can be put into one block
   private static final int BLOCKSIZE   = 25000;
-  private static final int BLKMETA_LEN = 100; // default # of block metadata: (offset,length) pair
   
-  private Row[] lastBlock;   // the last block that add() should append to 
-  private Row[] currBlock;   // the current block where the cursor is in
+  private Row[] currentWriteBlock;   // the last block that add() should append to 
+  private Row[] currentReadBlock;   // the current block where the cursor is in
+  // since currentReadBlock may assigned to currentWriteBlock, we need to store
+  // orginal read block
+  private Row[] firstReadBlockPointer;  
   private int blockSize;     // number of objects in the block before it is spilled to disk
-  private int numBlocks;     // total # of blocks
+  private int numFlushedBlocks;     // total # of blocks
   private int size;          // total # of elements in the RowContainer
   private File tmpFile;            // temporary file holding the spilled blocks
-  private RandomAccessFile rFile;  // random access file holding the data
-  private long[] off_len;          // offset length pair: i-th position is offset, (i+1)-th position is length
+  Path tempOutPath = null;
+  private File parentFile;
   private int itrCursor;     // iterator cursor in the currBlock
+  private int readBlockSize; //size of current read block
   private int addCursor;     // append cursor in the lastBlock
-  private int pBlock;        // pointer to the iterator block
   private SerDe serde;       // serialization/deserialization for the row
   private ObjectInspector standardOI;  // object inspector for the row
-  private ArrayList dummyRow; // representing empty row (no columns since value art is null)
   
-  public RowContainer() {
-    this(BLOCKSIZE);
+  private List<Object> keyObject;
+
+  private tableDesc tblDesc;
+  
+  boolean firstCalled = false; //once called first, it will never be able to write again.
+  int acutalSplitNum = 0;
+  int currentSplitPointer = 0;
+  org.apache.hadoop.mapred.RecordReader rr = null; //record reader
+  RecordWriter rw = null;
+  InputFormat<WritableComparable, Writable> inputFormat = null;
+  InputSplit[] inputSplits = null;
+  private Row dummyRow = null;
+  
+  Writable val = null; //cached to use serialize data
+  
+  JobConf jobCloneUsingLocalFs = null;
+  private LocalFileSystem localFs;
+
+  public RowContainer(Configuration jc) throws HiveException {
+    this(BLOCKSIZE, jc);
   }
   
-  public RowContainer(int blockSize) {
+  public RowContainer(int blockSize, Configuration jc) throws HiveException {
     // no 0-sized block
     this.blockSize = blockSize == 0 ? BLOCKSIZE : blockSize;
     this.size      = 0;
     this.itrCursor = 0;
     this.addCursor = 0;
-    this.numBlocks = 0;
-    this.pBlock    = 0;
+    this.numFlushedBlocks = 0;
     this.tmpFile   = null;
-    this.lastBlock = (Row[]) new ArrayList[blockSize];
-    this.currBlock = this.lastBlock;
-    this.off_len   = new long[BLKMETA_LEN * 2];
+    this.currentWriteBlock = (Row[]) new ArrayList[blockSize];
+    this.currentReadBlock = this.currentWriteBlock;
+    this.firstReadBlockPointer = currentReadBlock;
     this.serde     = null;
     this.standardOI= null;
-    this.dummyRow  = new ArrayList(0);
+    try {
+      this.localFs = FileSystem.getLocal(jc);
+    } catch (IOException e) {
+     throw new HiveException(e);
+    }
+    this.jobCloneUsingLocalFs = new JobConf(jc);
+    HiveConf.setVar(jobCloneUsingLocalFs, HiveConf.ConfVars.HADOOPFS, Utilities.HADOOP_LOCAL_FS);
   }
   
-  public RowContainer(int blockSize, SerDe sd, ObjectInspector oi) {
-    this(blockSize);
+  public RowContainer(int blockSize, SerDe sd, ObjectInspector oi, Configuration jc) throws HiveException {
+    this(blockSize, jc);
     setSerDe(sd, oi);
   }
   
   public void setSerDe(SerDe sd, ObjectInspector oi) {
-    assert serde != null : "serde is null";
-    assert oi != null : "oi is null";
     this.serde = sd;
     this.standardOI = oi;
   }
   
   public void add(Row t) throws HiveException {
-    if ( addCursor >= blockSize ) { // spill the current block to tmp file
-      spillBlock(lastBlock);
-      addCursor = 0;
-      if ( numBlocks == 1 )
-        lastBlock = (Row[]) new ArrayList[blockSize];
-    } 
-    lastBlock[addCursor++] = t;
+    if(this.tblDesc != null) {
+      if ( addCursor >= blockSize ) { // spill the current block to tmp file
+        spillBlock(currentWriteBlock, addCursor);
+        addCursor = 0;
+        if ( numFlushedBlocks == 1 )
+          currentWriteBlock = (Row[]) new ArrayList[blockSize];
+      } 
+      currentWriteBlock[addCursor++] = t;
+    } else if(t != null) {
+      // the tableDesc will be null in the case that all columns in that table
+      // is not used. we use a dummy row to denote all rows in that table, and
+      // the dummy row is added by caller. 
+      this.dummyRow = t;
+    }
     ++size;
   }
   
-  public Row first() {
+  public Row first() throws HiveException {
     if ( size == 0 )
       return null;
     
-    if ( pBlock > 0 ) {
-      pBlock = 0;
-      currBlock = getBlock(0);
-      assert currBlock != null: "currBlock == null";
-    }  
-    if ( currBlock == null && lastBlock != null ) {
-      currBlock = lastBlock;
-    }
-    assert pBlock == 0: "pBlock != 0 ";
-    itrCursor = 1;
-    return currBlock[0];
-  }
-  
-  public Row next() {
-    assert pBlock<= numBlocks: "pBlock " + pBlock + " > numBlocks" + numBlocks; // pBlock should not be greater than numBlocks;
-    if ( pBlock < numBlocks ) {
-      if ( itrCursor < blockSize ) {
-     	  return currBlock[itrCursor++];
-	    } else if (  ++pBlock < numBlocks ) {
- 	      currBlock = getBlock(pBlock);
- 	      assert currBlock != null: "currBlock == null";
- 	      itrCursor = 1;
-	    	return  currBlock[0];
-    	} else {
-    	  itrCursor = 0;
-    	  currBlock = lastBlock;
-    	}
-    } 
-    // last block (pBlock == numBlocks)
-    if ( itrCursor < addCursor )
-      return currBlock[itrCursor++];
-    else
-      return null;
-  }
-  
-  private void spillBlock(Row[] block) throws HiveException {
     try {
-      if ( tmpFile == null ) {
-        tmpFile = File.createTempFile("RowContainer", ".tmp", new File("/tmp"));
-	      LOG.info("RowContainer created temp file " + tmpFile.getAbsolutePath());
- 	      // Delete the temp file if the JVM terminate normally through Hadoop job kill command.
-        // Caveat: it won't be deleted if JVM is killed by 'kill -9'.
-        tmpFile.deleteOnExit(); 
-        rFile = new RandomAccessFile(tmpFile, "rw");
+      firstCalled = true;
+      // when we reach here, we must have some data already (because size >0).
+      // We need to see if there are any data flushed into file system. If not, we can
+      // directly read from the current write block. Otherwise, we need to read
+      // from the beginning of the underlying file.
+      this.itrCursor = 0;
+      closeWriter();
+      closeReader();
+      
+      if(tblDesc == null) {
+        this.itrCursor ++;
+        return dummyRow;
       }
-      byte[] buf = serialize(block);
-      long offset = rFile.length();
-      long len = buf.length;
-      // append the block at the end
-      rFile.seek(offset);
-      rFile.write(buf);
       
-      // maintain block metadata
-      addBlockMetadata(offset, len);
+      this.currentReadBlock = this.firstReadBlockPointer;
+      if (this.numFlushedBlocks == 0) {
+        this.readBlockSize = this.addCursor;
+        this.currentReadBlock = this.currentWriteBlock;
+      } else {
+        if (inputSplits == null) {
+          if (this.inputFormat == null)
+            inputFormat = (InputFormat<WritableComparable, Writable>) ReflectionUtils.newInstance(tblDesc.getInputFileFormatClass(),
+                    jobCloneUsingLocalFs);
+
+          HiveConf.setVar(jobCloneUsingLocalFs,
+              HiveConf.ConfVars.HADOOPMAPREDINPUTDIR,
+              org.apache.hadoop.util.StringUtils.escapeString(parentFile.getAbsolutePath()));
+          inputSplits = inputFormat.getSplits(jobCloneUsingLocalFs, 1);
+          acutalSplitNum = inputSplits.length;
+        }
+        currentSplitPointer = 0;
+        rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer],
+            jobCloneUsingLocalFs, Reporter.NULL);
+        currentSplitPointer++;
+
+        nextBlock();
+      }
+      // we are guaranteed that we can get data here (since 'size' is not zero)
+      Row ret = currentReadBlock[itrCursor++];
+      removeKeys(ret);
+      return ret;
     } catch (Exception e) {
-      LOG.debug(e.toString());
       throw new HiveException(e);
     }
+    
   }
-  
-  /**
-   * Maintain the blocks meta data: number of blocks, and the block (offset, length)
-   * pair. 
-   * @param offset offset of the tmp file where the block was serialized.
-   * @param len the length of the serialized block in the temp file.
-   */
-  private void addBlockMetadata(long offset, long len) {
-    if ( (numBlocks+1) * 2 >= off_len.length ) { // expand (offset, len) array
-      off_len = Arrays.copyOf(off_len, off_len.length*2);
-    }
-    off_len[numBlocks*2] = offset;
-    off_len[numBlocks*2+1] = len;
-    ++numBlocks;
-  }
-  
-  /**
-   * Serialize the object into a byte array.
-   * @param obj object needed to be serialized
-   * @return the byte array that contains the serialized array.
-   * @throws IOException
-   */
-  private byte[] serialize(Row[] obj) throws HiveException {
-    assert(serde != null && standardOI != null);
+
+  public Row next() throws HiveException {
+    
+    if(!firstCalled)
+      throw new RuntimeException("Call first() then call next().");
+    
+    if ( size == 0 )
+      return null;
     
-    ByteArrayOutputStream  baos;
-    DataOutputStream     oos;
+    if(tblDesc == null) {
+      if(this.itrCursor < size) {
+        this.itrCursor++;
+        return dummyRow;
+      }
+      return null;  
+    }
     
+    Row ret;
+    if(itrCursor < this.readBlockSize) {
+      ret = this.currentReadBlock[itrCursor++];
+      removeKeys(ret);
+      return ret;
+    }
+    else {
+      nextBlock();
+      if ( this.readBlockSize == 0) {
+        if (currentWriteBlock != null && currentReadBlock != currentWriteBlock) {
+          this.itrCursor = 0;
+          this.readBlockSize = this.addCursor;
+          this.firstReadBlockPointer = this.currentReadBlock;
+          currentReadBlock = currentWriteBlock;
+        } else {
+          return null;
+        }
+      }
+      return next();
+    }
+  }
+
+  private void removeKeys(Row ret) {
+    if (this.keyObject != null
+        && this.currentReadBlock != this.currentWriteBlock) {
+      int len = this.keyObject.size();
+      int rowSize = ((ArrayList)ret).size();
+      for(int i=0;i<len;i++) {
+        ((ArrayList) ret).remove(rowSize - i - 1);
+      }
+    }
+  }
+  
+  ArrayList<Object> row = new ArrayList<Object>(2);
+  private void spillBlock(Row[] block, int length) throws HiveException {
     try {
-      baos = new ByteArrayOutputStream();
-      oos = new DataOutputStream(baos);
+      if ( tmpFile == null ) {
+
+        String suffix = ".tmp";
+        if(this.keyObject != null)
+          suffix = "." + this.keyObject.toString() + suffix;
+        
+        while(true) {
+          String parentId = "hive-rowcontainer" + Utilities.randGen.nextInt();
+          parentFile = new File("/tmp/"+ parentId);
+          boolean success = parentFile.mkdir();
+          if(success)
+            break;
+          LOG.debug("retry creating tmp row-container directory...");
+        }
+        
+        tmpFile = File.createTempFile("RowContainer", suffix, parentFile);
+        LOG.info("RowContainer created temp file " + tmpFile.getAbsolutePath());
+        // Delete the temp file if the JVM terminate normally through Hadoop job kill command.
+        // Caveat: it won't be deleted if JVM is killed by 'kill -9'.
+        parentFile.deleteOnExit();
+        tmpFile.deleteOnExit(); 
+        
+        // rFile = new RandomAccessFile(tmpFile, "rw");
+        HiveOutputFormat<?, ?> hiveOutputFormat = tblDesc.getOutputFileFormatClass().newInstance();
+        tempOutPath = new Path(tmpFile.toString());
+        rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs, hiveOutputFormat, 
+            serde.getSerializedClass(), false, tblDesc.getProperties(), tempOutPath);
+      } else if (rw == null) {
+        throw new HiveException("RowContainer has already been closed for writing.");
+      }
       
-      // # of rows
-      oos.writeInt(obj.length); 
+      row.clear();
+      row.add(null);
+      row.add(null);
       
-      // if serde or OI is null, meaning the join value is null, we don't need
-      // to serialize anything to disk, just need to keep the length.
-      if ( serde != null && standardOI != null ) {
-        for ( int i = 0; i < obj.length; ++i ) {
-          Writable outVal = serde.serialize(obj[i], standardOI);
-          outVal.write(oos);
+      if (this.keyObject != null) {
+        row.set(1, this.keyObject);
+        for (int i = 0; i < length; ++i) {
+          Row currentValRow = block[i];
+          row.set(0, currentValRow);
+          Writable outVal = serde.serialize(row, standardOI);
+          rw.write(outVal);
+        }
+      }else {
+        for ( int i = 0; i < length; ++i ) {
+          Row currentValRow = block[i];
+          Writable outVal = serde.serialize(currentValRow, standardOI);
+          rw.write(outVal);
         }
       }
-      oos.close();
+      
+      if(block == this.currentWriteBlock)
+        this.addCursor = 0;
+      
+      this.numFlushedBlocks ++;
     } catch (Exception e) {
-      e.printStackTrace();
+      clear();
+      LOG.error(e.toString(), e);
       throw new HiveException(e);
     }
-    return baos.toByteArray();
   }
 
   /**
-   * Deserialize an object from a byte array
-   * @param buf the byte array containing the serialized object.
-   * @return the serialized object.
+   * Get the number of elements in the RowContainer.
+   * @return number of elements in the RowContainer
    */
-  private Row[] deserialize(byte[] buf) throws HiveException {
-    ByteArrayInputStream  bais;
-    DataInputStream     ois;
+  public int size() {
+    return size;
+  }
+
+  private boolean nextBlock() throws HiveException {
+    itrCursor = 0;
+    this.readBlockSize = 0;
+    if (this.numFlushedBlocks == 0) return false;
     
     try {
-      bais = new ByteArrayInputStream(buf);
-      ois = new DataInputStream(bais);
-      int sz = ois.readInt();
-      assert sz == blockSize: 
-             "deserialized size " + sz + " is not the same as block size " + blockSize;
-      Row[] ret = (Row[]) new ArrayList[sz];
+      if(val == null)
+        val = serde.getSerializedClass().newInstance();
+      boolean nextSplit = true;
+      int i = 0;
       
-      // if serde or OI is null, meaning the join value is null, we don't need
-      // to serialize anything to disk, just need to keep the length.
-      for ( int i = 0; i < sz; ++i ) {
-        if ( serde != null && standardOI != null ) {
-          Writable val = serde.getSerializedClass().newInstance();
-          val.readFields(ois);
-        
-          ret[i] = (Row) ObjectInspectorUtils.copyToStandardObject(
-                           serde.deserialize(val),
-                           serde.getObjectInspector(),
-                           ObjectInspectorCopyOption.WRITABLE);
-        } else {
-          ret[i] = (Row) dummyRow;
+      if(rr != null) {
+        Object key = rr.createKey();
+        while (i < this.currentReadBlock.length && rr.next(key, val)) {
+          nextSplit = false;
+          this.currentReadBlock[i++] = (Row) ObjectInspectorUtils.copyToStandardObject(
+              serde.deserialize(val),
+              serde.getObjectInspector(),
+              ObjectInspectorCopyOption.WRITABLE);
         }
       }
-      return ret;
+      
+      if (nextSplit && this.currentSplitPointer < this.acutalSplitNum) {
+        //open record reader to read next split
+        rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
+            Reporter.NULL);
+        currentSplitPointer++;
+        return nextBlock();
+      }
+      
+      this.readBlockSize = i;
+      return this.readBlockSize > 0;
     } catch (Exception e) {
-      e.printStackTrace();
+      LOG.error(e.getMessage(),e);
+      try {
+        this.clear();
+      } catch (HiveException e1) {
+        LOG.error(e.getMessage(),e);
+      }
       throw new HiveException(e);
     }
   }
 
-  /**
-   * Get the number of elements in the RowContainer.
-   * @return number of elements in the RowContainer
-   */
-  public int size() {
-    return size;
+  public void copyToDFSDirecory(FileSystem destFs, Path destPath) throws IOException, HiveException {
+    if (addCursor > 0)
+      this.spillBlock(this.currentWriteBlock, addCursor);
+    if(tempOutPath == null || tempOutPath.toString().trim().equals(""))
+      return;
+    this.closeWriter();
+    LOG.info("RowContainer copied temp file " + tmpFile.getAbsolutePath()+ " to dfs directory " + destPath.toString());
+    destFs.copyFromLocalFile(true,tempOutPath, new Path(destPath, new Path(tempOutPath.getName())));
+    clear();
   }
   
   /**
@@ -301,33 +402,72 @@
   public void clear() throws HiveException {
     itrCursor = 0;
     addCursor = 0;
-    numBlocks = 0;
-    pBlock    = 0;
-    size      = 0;
+    numFlushedBlocks = 0;
+    this.readBlockSize = 0;
+    this.acutalSplitNum = 0;
+    this.currentSplitPointer = -1;
+    this.firstCalled = false;
+    this.inputSplits = null;
+    tempOutPath = null;
+    addCursor = 0;
+    
+    size = 0;
     try {
-      if ( rFile != null )
-        rFile.close();
-      if ( tmpFile != null )
-        tmpFile.delete();
+      if (rw != null)
+        rw.close(false);
+      if (rr != null)
+        rr.close();
     } catch (Exception e) {
       LOG.error(e.toString());
       throw new HiveException(e);
+    } finally {
+      rw = null;
+      rr = null;
+      tmpFile = null;
+      deleteLocalFile(parentFile, true);
+      parentFile = null;
     }
-    tmpFile = null;
   }
-  
-  private Row[] getBlock(int block) {
-    long offset = off_len[block*2];
-    long len = off_len[block*2+1];
-    byte[] buf = new byte[(int)len];
-    try {
-      rFile.seek(offset);
-      rFile.readFully(buf);
-      currBlock = deserialize(buf);
+
+  private void deleteLocalFile(File file, boolean recursive) {
+    try{
+      if (file != null) {
+        if(!file.exists())
+          return;
+        if(file.isDirectory() && recursive) {
+          File[] files = file.listFiles();
+          for (int i = 0; i < files.length; i++)
+            deleteLocalFile(files[i], true);
+        }
+        boolean deleteSuccess = file.delete();
+        if(!deleteSuccess)
+          LOG.error("Error deleting tmp file:" + file.getAbsolutePath());
+      }
     } catch (Exception e) {
-      LOG.error(e.toString());
-      return null;
+      LOG.error("Error deleting tmp file:" + file.getAbsolutePath(), e);
     }
-    return currBlock;
   }
+  
+  private void closeWriter() throws IOException {
+    if (this.rw != null) {
+      this.rw.close(false);
+      this.rw = null;
+    }
+  }
+  
+  private void closeReader() throws IOException {
+    if (this.rr != null) {
+      this.rr.close();
+      this.rr = null;
+    }
+  }
+  
+  public void setKeyObject(List<Object> dummyKey) {
+    this.keyObject = dummyKey;
+  }
+
+  public void setTableDesc(tableDesc tblDesc) {
+    this.tblDesc = tblDesc;
+  }
+  
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Sat Jan 16 06:44:01 2010
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
@@ -30,7 +31,14 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
@@ -187,4 +195,44 @@
     }
     return true;
   }
+  
+  
+  public static RecordWriter getHiveRecordWriter(JobConf jc,
+      tableDesc tableInfo, Class<? extends Writable> outputClass,
+      fileSinkDesc conf, Path outPath) throws HiveException {
+    try {
+      HiveOutputFormat<?, ?> hiveOutputFormat = tableInfo.getOutputFileFormatClass().newInstance();
+      boolean isCompressed = conf.getCompressed();
+      JobConf jc_output = jc;
+      if (isCompressed) {
+        jc_output = new JobConf(jc);
+        String codecStr = conf.getCompressCodec();
+        if (codecStr != null && !codecStr.trim().equals("")) {
+          Class<? extends CompressionCodec> codec = (Class<? extends CompressionCodec>) Class.forName(codecStr);
+          FileOutputFormat.setOutputCompressorClass(jc_output, codec);
+        }
+        String type = conf.getCompressType();
+        if (type != null && !type.trim().equals("")) {
+          CompressionType style = CompressionType.valueOf(type);
+          SequenceFileOutputFormat.setOutputCompressionType(jc, style);
+        }
+      }
+      return getRecordWriter(jc_output, hiveOutputFormat, outputClass,
+          isCompressed, tableInfo.getProperties(), outPath);
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
+  public static RecordWriter getRecordWriter(JobConf jc,
+      HiveOutputFormat<?, ?> hiveOutputFormat,
+      final Class<? extends Writable> valueClass, boolean isCompressed,
+      Properties tableProp, Path outPath) throws IOException, HiveException {
+    if (hiveOutputFormat != null) {
+      return hiveOutputFormat.getHiveRecordWriter(jc, outPath, valueClass,
+          isCompressed, tableProp, null);
+    }
+    return null;
+  }
+  
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Sat Jan 16 06:44:01 2010
@@ -205,8 +205,10 @@
     List<Task<? extends Serializable>> mvTasks = ctx.getMvTask();
     Task<? extends Serializable> mvTask = findMoveTask(mvTasks, newOutput);
     
-    if (mvTask != null)
-      cndTsk.addDependentTask(mvTask);
+    if (mvTask != null) {
+      for(Task<? extends Serializable> tsk : cndTsk.getListTasks())
+        tsk.addDependentTask(mvTask);
+    }
   }
  
   private Task<? extends Serializable> findMoveTask(List<Task<? extends Serializable>> mvTasks, FileSinkOperator fsOp) {

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin;
+import org.apache.hadoop.hive.ql.plan.ConditionalWork;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.fetchWork;
+import org.apache.hadoop.hive.ql.plan.joinDesc;
+import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.mapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.partitionDesc;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
+import org.apache.hadoop.hive.ql.plan.tableScanDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+public class GenMRSkewJoinProcessor {
+
+  public GenMRSkewJoinProcessor() {
+  }
+
+  /**
+   * Create tasks for processing skew joins. The idea is (HIVE-964) to use
+   * separated jobs and map-joins to handle skew joins.
+   * <p>
+   * <ul>
+   * <li>
+   * Number of mr jobs to handle skew keys is the number of table minus 1 (we
+   * can stream the last table, so big keys in the last table will not be a
+   * problem).
+   * <li>
+   * At runtime in Join, we output big keys in one table into one corresponding
+   * directories, and all same keys in other tables into different dirs(one for
+   * each table). The directories will look like:
+   * <ul>
+   * <li>
+   * dir-T1-bigkeys(containing big keys in T1), dir-T2-keys(containing keys
+   * which is big in T1),dir-T3-keys(containing keys which is big in T1), ...
+   * <li>
+   * dir-T1-keys(containing keys which is big in T2), dir-T2-bigkeys(containing
+   * big keys in T2),dir-T3-keys(containing keys which is big in T2), ...
+   * <li>
+   * dir-T1-keys(containing keys which is big in T3), dir-T2-keys(containing big
+   * keys in T3),dir-T3-bigkeys(containing keys which is big in T3), ... .....
+   * </ul>
+   * </ul>
+   * For each table, we launch one mapjoin job, taking the directory containing
+   * big keys in this table and corresponding dirs in other tables as input.
+   * (Actally one job for one row in the above.)
+   * 
+   * <p>
+   * For more discussions, please check
+   * https://issues.apache.org/jira/browse/HIVE-964.
+   * 
+   */
+  public static void processSkewJoin(JoinOperator joinOp,
+      Task<? extends Serializable> currTask, ParseContext parseCtx)
+      throws SemanticException {
+    
+    // We are trying to adding map joins to handle skew keys, and map join right
+    // now does not work with outer joins
+    if (!GenMRSkewJoinProcessor.skewJoinEnabled(parseCtx.getConf(), joinOp))
+      return;
+    
+    String baseTmpDir = parseCtx.getContext().getMRTmpFileURI();
+    
+    joinDesc joinDescriptor = joinOp.getConf();
+    Map<Byte, List<exprNodeDesc>> joinValues = joinDescriptor.getExprs();
+    int numAliases = joinValues.size();
+    
+    Map<Byte, String> bigKeysDirMap = new HashMap<Byte, String>();
+    Map<Byte, Map<Byte, String>> smallKeysDirMap = new HashMap<Byte, Map<Byte, String>>();
+    Map<Byte, String> skewJoinJobResultsDir = new HashMap<Byte, String>();
+    Byte[] tags = joinDescriptor.getTagOrder();
+    for (int i = 0; i < numAliases; i++) {
+      Byte alias = tags[i];
+      String bigKeysDir = getBigKeysDir(baseTmpDir, alias);
+      bigKeysDirMap.put(alias, bigKeysDir);
+      Map<Byte, String> smallKeysMap = new HashMap<Byte, String>();
+      smallKeysDirMap.put(alias, smallKeysMap);
+      for(Byte src2 : tags) {
+        if(!src2.equals(alias))
+          smallKeysMap.put(src2, getSmallKeysDir(baseTmpDir, alias, src2));
+      }
+      skewJoinJobResultsDir.put(alias, getBigKeysSkewJoinResultDir(baseTmpDir, alias));
+    }
+    
+    joinDescriptor.setHandleSkewJoin(true);
+    joinDescriptor.setBigKeysDirMap(bigKeysDirMap);
+    joinDescriptor.setSmallKeysDirMap(smallKeysDirMap);
+    joinDescriptor.setSkewKeyDefinition(HiveConf.getIntVar(parseCtx.getConf(), HiveConf.ConfVars.HIVESKEWJOINKEY));
+    
+    Map<String, Task<? extends Serializable>> bigKeysDirToTaskMap = new HashMap<String, Task<? extends Serializable>>();
+    List<Serializable> listWorks = new ArrayList<Serializable>();
+    List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
+    mapredWork currPlan = (mapredWork) currTask.getWork();
+
+    tableDesc keyTblDesc =  (tableDesc) currPlan.getKeyDesc().clone();
+    List<String> joinKeys = Utilities.getColumnNames(keyTblDesc.getProperties());
+    List<String> joinKeyTypes = Utilities.getColumnTypes(keyTblDesc.getProperties());
+    
+    Map<Byte, tableDesc> tableDescList = new HashMap<Byte, tableDesc>();
+    Map<Byte, List<exprNodeDesc>> newJoinValues = new HashMap<Byte, List<exprNodeDesc>>();
+    Map<Byte, List<exprNodeDesc>> newJoinKeys = new HashMap<Byte, List<exprNodeDesc>>();
+    List<tableDesc> newJoinValueTblDesc = new ArrayList<tableDesc>();// used for create mapJoinDesc, should be in order
+    
+    for (int i = 0; i < tags.length; i++) // fill with null, otherwise we will expect NPE
+      newJoinValueTblDesc.add(null);
+    
+    for (int i = 0; i < numAliases; i++) {
+      Byte alias = tags[i];
+      List<exprNodeDesc> valueCols = joinValues.get(alias);
+      String colNames = "";
+      String colTypes = "";
+      int columnSize = valueCols.size();
+      List<exprNodeDesc> newValueExpr = new ArrayList<exprNodeDesc>();
+      List<exprNodeDesc> newKeyExpr = new ArrayList<exprNodeDesc>();
+      
+      boolean first = true;
+      for (int k = 0; k < columnSize; k++) {
+        TypeInfo type = valueCols.get(k).getTypeInfo();
+        String newColName = i + "_VALUE_" + k; // any name, it does not matter.
+        newValueExpr.add(new exprNodeColumnDesc(type, newColName, ""+i, false));
+        if(!first) {
+          colNames = colNames + ",";
+          colTypes = colTypes +",";
+        }
+        first = false;
+        colNames = colNames + newColName;
+        colTypes = colTypes + valueCols.get(k).getTypeString();
+      }
+      
+      //we are putting join keys at last part of the spilled table 
+      for (int k = 0; k < joinKeys.size(); k++) {
+        if(!first) {
+          colNames = colNames + ",";
+          colTypes = colTypes +",";
+        }
+        first = false;
+        colNames = colNames + joinKeys.get(k); 
+        colTypes = colTypes + joinKeyTypes.get(k);
+        newKeyExpr.add(new exprNodeColumnDesc(TypeInfoFactory.getPrimitiveTypeInfo(joinKeyTypes.get(k)), joinKeys.get(k), ""+i, false));
+      }
+      
+      newJoinValues.put(alias, newValueExpr);
+      newJoinKeys.put(alias, newKeyExpr);
+      tableDescList.put(alias, Utilities.getTableDesc(colNames, colTypes));
+      
+      //construct value table Desc
+      String valueColNames ="";
+      String valueColTypes ="";
+      first = true;
+      for (int k = 0; k < columnSize; k++) {
+        String newColName = i + "_VALUE_" + k; // any name, it does not matter.
+        if(!first) {
+          valueColNames = valueColNames + ",";
+          valueColTypes = valueColTypes + ",";
+        }
+        valueColNames = valueColNames + newColName;
+        valueColTypes = valueColTypes + valueCols.get(k).getTypeString();
+        first = false;
+      }
+      newJoinValueTblDesc.set(Byte.valueOf((byte)i), Utilities.getTableDesc(valueColNames, valueColTypes));
+    }
+    
+    joinDescriptor.setSkewKeysValuesTables(tableDescList);
+    joinDescriptor.setKeyTableDesc(keyTblDesc);
+    
+    for (int i = 0; i < numAliases -1; i++) {
+      Byte src = tags[i];
+      mapredWork newPlan = PlanUtils.getMapRedWork();
+      mapredWork clonePlan = null;
+      try {
+        String xmlPlan = currPlan.toXML();
+        StringBuffer sb = new StringBuffer(xmlPlan);
+        ByteArrayInputStream bis;
+        bis = new ByteArrayInputStream(sb.toString().getBytes("UTF-8"));
+        clonePlan = Utilities.deserializeMapRedWork(bis, parseCtx.getConf());
+      } catch (UnsupportedEncodingException e) {
+        throw new SemanticException(e);
+      }
+
+      Operator<? extends Serializable>[] parentOps = new TableScanOperator[tags.length];
+      for (int k = 0; k < tags.length; k++) {
+        Operator<? extends Serializable> ts = OperatorFactory.get(tableScanDesc.class, (RowSchema) null);
+        parentOps[k] = ts;
+      }
+      Operator<? extends Serializable> tblScan_op = parentOps[i];
+
+      ArrayList<String> aliases = new ArrayList<String>();
+      String alias = src.toString();
+      aliases.add(alias);
+      String bigKeyDirPath = bigKeysDirMap.get(src);
+      newPlan.getPathToAliases().put(bigKeyDirPath, aliases);
+      newPlan.getAliasToWork().put(alias, tblScan_op);
+      partitionDesc part = new partitionDesc(tableDescList.get(src), null);
+      newPlan.getPathToPartitionInfo().put(bigKeyDirPath, part);
+      newPlan.getAliasToPartnInfo().put(alias, part);
+      
+      Operator<? extends Serializable> reducer = clonePlan.getReducer();
+      assert reducer instanceof JoinOperator;
+      JoinOperator cloneJoinOp = (JoinOperator) reducer;
+      
+      mapJoinDesc mapJoinDescriptor = new mapJoinDesc(newJoinKeys,
+          keyTblDesc, newJoinValues, newJoinValueTblDesc, joinDescriptor.getOutputColumnNames(),
+          i, joinDescriptor.getConds());
+      mapJoinDescriptor.setNoOuterJoin(joinDescriptor.isNoOuterJoin());
+      mapJoinDescriptor.setTagOrder(tags);
+      mapJoinDescriptor.setHandleSkewJoin(false);
+      
+      mapredLocalWork localPlan = new mapredLocalWork(
+          new LinkedHashMap<String, Operator<? extends Serializable>>(),
+          new LinkedHashMap<String, fetchWork>());
+      Map<Byte, String> smallTblDirs = smallKeysDirMap.get(src);
+      
+      for (int j = 0; j < numAliases; j++) {
+        if (j == i)
+          continue;
+        Byte small_alias = tags[j];
+        Operator<? extends Serializable> tblScan_op2 = parentOps[j];
+        localPlan.getAliasToWork().put(small_alias.toString(), tblScan_op2);
+        Path tblDir = new Path(smallTblDirs.get(small_alias));
+        localPlan.getAliasToFetchWork().put(small_alias.toString(),
+            new fetchWork(tblDir.toString(), tableDescList.get(small_alias)));
+      }
+      
+      newPlan.setMapLocalWork(localPlan);
+      
+      // construct a map join and set it as the child operator of tblScan_op
+      MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild(mapJoinDescriptor, (RowSchema) null, parentOps);
+      // change the children of the original join operator to point to the map
+      // join operator
+      List<Operator<? extends Serializable>> childOps = cloneJoinOp.getChildOperators();
+      for (Operator<? extends Serializable> childOp : childOps)
+        childOp.replaceParent(cloneJoinOp, mapJoinOp);
+      mapJoinOp.setChildOperators(childOps);
+      
+      HiveConf jc = new HiveConf(parseCtx.getConf(), GenMRSkewJoinProcessor.class);
+      HiveConf.setVar(jc, HiveConf.ConfVars.HIVEINPUTFORMAT,
+          org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.class.getCanonicalName());
+      Task<? extends Serializable> skewJoinMapJoinTask = TaskFactory.get(newPlan, jc);
+      bigKeysDirToTaskMap.put(bigKeyDirPath, skewJoinMapJoinTask);
+      listWorks.add(skewJoinMapJoinTask.getWork());
+      listTasks.add(skewJoinMapJoinTask);
+    }
+    
+    ConditionalWork cndWork = new ConditionalWork(listWorks);
+    ConditionalTask cndTsk = (ConditionalTask)TaskFactory.get(cndWork, parseCtx.getConf());
+    cndTsk.setListTasks(listTasks);
+    cndTsk.setResolver(new ConditionalResolverSkewJoin());
+    cndTsk.setResolverCtx(new ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap));
+    List<Task<? extends Serializable>> oldChildTasks = currTask.getChildTasks();
+    currTask.setChildTasks(new ArrayList<Task<? extends Serializable>>());
+    currTask.addDependentTask(cndTsk);
+    
+    if (oldChildTasks != null) {
+      for(Task<? extends Serializable> tsk : cndTsk.getListTasks())
+        for (Task<? extends Serializable> oldChild : oldChildTasks)
+          tsk.addDependentTask(oldChild);
+    }
+    return;
+  }
+  
+  public static boolean skewJoinEnabled(HiveConf conf, JoinOperator joinOp) {
+
+    if (conf != null && !conf.getBoolVar(HiveConf.ConfVars.HIVESKEWJOIN))
+      return false;
+
+    if(!joinOp.getConf().isNoOuterJoin())
+      return false;
+    
+    byte pos = 0;
+    for(Byte tag: joinOp.getConf().getTagOrder()) {
+      if(tag != pos)
+        return false;
+      pos++;
+    }
+    
+    return true;
+  }
+  
+  private static String skewJoinPrefix = "hive_skew_join";
+  private static String UNDERLINE = "_";
+  private static String BIGKEYS = "bigkeys";
+  private static String SMALLKEYS = "smallkeys";
+  private static String RESULTS = "results";
+  static String getBigKeysDir(String baseDir, Byte srcTbl) {
+    return baseDir + File.separator + skewJoinPrefix + UNDERLINE + BIGKEYS
+        + UNDERLINE + srcTbl;
+  }
+  
+  static String getBigKeysSkewJoinResultDir(String baseDir, Byte srcTbl) {
+    return baseDir + File.separator + skewJoinPrefix + UNDERLINE + BIGKEYS
+        + UNDERLINE + RESULTS + UNDERLINE+ srcTbl;
+  }
+  
+  static String getSmallKeysDir(String baseDir, Byte srcTblBigTbl,
+      Byte srcTblSmallTbl) {
+    return baseDir + File.separator + skewJoinPrefix + UNDERLINE + SMALLKEYS
+        + UNDERLINE + srcTblBigTbl + UNDERLINE + srcTblSmallTbl;
+  }
+  
+}
\ No newline at end of file

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+
+/**
+ * physical context used by physical resolvers.
+ */
+public class PhysicalContext {
+  
+  protected HiveConf conf;
+  private ParseContext parseContext;
+  private Context context;
+  protected List<Task<? extends Serializable>> rootTasks;
+  protected Task<? extends Serializable> fetchTask;
+  
+  public PhysicalContext(HiveConf conf, ParseContext parseContext,
+      Context context, List<Task<? extends Serializable>> rootTasks,
+      Task<? extends Serializable> fetchTask) {
+    super();
+    this.conf = conf;
+    this.parseContext = parseContext;
+    this.context = context;
+    this.rootTasks = rootTasks;
+    this.fetchTask = fetchTask;
+  }
+
+  public HiveConf getConf() {
+    return conf;
+  }
+
+  public void setConf(HiveConf conf) {
+    this.conf = conf;
+  }
+
+  public ParseContext getParseContext() {
+    return parseContext;
+  }
+
+  public void setParseContext(ParseContext parseContext) {
+    this.parseContext = parseContext;
+  }
+
+  public Context getContext() {
+    return context;
+  }
+
+  public void setContext(Context context) {
+    this.context = context;
+  }
+
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * A hierarchy physical optimizer, which contains a list of
+ * PhysicalPlanResolver. Each resolver has its own set of optimization rule.
+ */
+public class PhysicalOptimizer {
+  private PhysicalContext pctx;
+  private List<PhysicalPlanResolver> resolvers;
+
+  public PhysicalOptimizer(PhysicalContext pctx, HiveConf hiveConf) {
+    super();
+    this.pctx = pctx;
+    initialize(hiveConf);
+  }
+
+  /**
+   * create the list of physical plan resolvers
+   * 
+   * @param hiveConf
+   */
+  private void initialize(HiveConf hiveConf) {
+    resolvers = new ArrayList<PhysicalPlanResolver>();
+    if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVESKEWJOIN))
+      resolvers.add(new SkewJoinResolver());
+  }
+
+  /**
+   * invoke all the resolvers one-by-one, and alter the physical plan
+   * 
+   * @return PhysicalContext
+   * @throws HiveException
+   */
+  public PhysicalContext optimize() throws SemanticException {
+    for (PhysicalPlanResolver r : resolvers)
+      pctx = r.resolve(pctx);
+    return pctx;
+  }
+
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalPlanResolver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalPlanResolver.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalPlanResolver.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalPlanResolver.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * Physical plan optimization interface. Each resolver has its own set of
+ * optimization rules and transformations.
+ */
+public interface PhysicalPlanResolver {
+
+  /**
+   * All physical plan resolvers have to implement this entry method.
+   * @param pctx
+   * @return
+   */
+  public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException;
+
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.physical.SkewJoinResolver.SkewJoinProcCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * Node processor factory for skew join resolver.
+ */
+public class SkewJoinProcFactory {
+
+  public static NodeProcessor getDefaultProc() {
+    return new SkewJoinDefaultProcessor();
+  }
+
+  public static NodeProcessor getJoinProc() {
+    return new SkewJoinJoinProcessor();
+  }
+  
+  public static class SkewJoinJoinProcessor implements NodeProcessor {  
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException {
+      SkewJoinProcCtx context = (SkewJoinProcCtx)ctx;
+      JoinOperator op = (JoinOperator) nd;
+      ParseContext parseContext = context.getParseCtx();
+      Task<? extends Serializable> currentTsk = context.getCurrentTask();
+      GenMRSkewJoinProcessor.processSkewJoin(op, currentTsk, parseContext);
+      return null;
+    }
+  }
+  
+  public static class SkewJoinDefaultProcessor implements NodeProcessor{
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+        Object... nodeOutputs) throws SemanticException {
+      return null;
+    }
+  }
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+
+/**
+ * An implementation of PhysicalPlanResolver. It iterator each task with a rule
+ * dispatcher for its reducer operator tree, for task with join op in reducer, it
+ * will try to add a conditional task associated a list of skew join tasks.
+ */
+public class SkewJoinResolver implements PhysicalPlanResolver{
+  @Override
+  public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
+    Dispatcher disp = new SkewJoinTaskDispatcher(pctx);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pctx.rootTasks);
+    ogw.startWalking(topNodes, null);
+    return null;
+  }
+  
+  /**
+   * Iterator a task with a rule dispatcher for its reducer operator tree,
+   */
+  class SkewJoinTaskDispatcher implements Dispatcher{
+    
+    private PhysicalContext physicalContext;
+
+    public SkewJoinTaskDispatcher(PhysicalContext context) {
+      super();
+      this.physicalContext = context;
+    }
+
+    @Override
+    public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
+        throws SemanticException {
+      Task<? extends Serializable> task = (Task<? extends Serializable>)nd;
+      
+      if (!task.isMapRedTask()
+          || task instanceof ConditionalTask ||((mapredWork) task.getWork()).getReducer() == null)
+        return null;
+      
+      SkewJoinProcCtx skewJoinProcContext = new SkewJoinProcCtx(task, this.physicalContext.getParseContext());
+      
+      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+      opRules.put(new RuleRegExp("R1", "JOIN%"), SkewJoinProcFactory.getJoinProc());
+
+      // The dispatcher fires the processor corresponding to the closest matching rule and passes the context along
+      Dispatcher disp = new DefaultRuleDispatcher(SkewJoinProcFactory.getDefaultProc(), opRules, skewJoinProcContext);
+      GraphWalker ogw = new DefaultGraphWalker(disp);
+     
+      // iterator the reducer operator tree
+      ArrayList<Node> topNodes = new ArrayList<Node>();
+      topNodes.add(((mapredWork) task.getWork()).getReducer());
+      ogw.startWalking(topNodes, null);
+      return null;
+    }
+
+    public PhysicalContext getPhysicalContext() {
+      return physicalContext;
+    }
+
+    public void setPhysicalContext(PhysicalContext physicalContext) {
+      this.physicalContext = physicalContext;
+    }
+  }
+  
+  /**
+   * A container of current task and parse context. 
+   */
+  public static class SkewJoinProcCtx implements NodeProcessorCtx {
+    private Task<? extends Serializable> currentTask;
+    private ParseContext parseCtx;
+    
+    public SkewJoinProcCtx(Task<? extends Serializable> task, ParseContext parseCtx) {
+      this.currentTask = task;
+      this.parseCtx = parseCtx;
+    }
+
+    public Task<? extends Serializable> getCurrentTask() {
+      return currentTask;
+    }
+
+    public void setCurrentTask(Task<? extends Serializable> currentTask) {
+      this.currentTask = currentTask;
+    }
+
+    public ParseContext getParseCtx() {
+      return parseCtx;
+    }
+
+    public void setParseCtx(ParseContext parseCtx) {
+      this.parseCtx = parseCtx;
+    }
+  }
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Sat Jan 16 06:44:01 2010
@@ -78,6 +78,7 @@
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory;
 import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1;
 import org.apache.hadoop.hive.ql.optimizer.GenMROperator;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext;
@@ -91,6 +92,9 @@
 import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory;
 import org.apache.hadoop.hive.ql.optimizer.Optimizer;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.optimizer.physical.GenMRSkewJoinProcessor;
+import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
+import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
@@ -4993,7 +4997,7 @@
     opRules.put(new RuleRegExp(new String("R9"), "UNION%.*MAPJOIN%"), MapJoinFactory.getUnionMapJoin());
     opRules.put(new RuleRegExp(new String("R10"), "MAPJOIN%.*MAPJOIN%"), MapJoinFactory.getMapJoinMapJoin());
     opRules.put(new RuleRegExp(new String("R11"), "MAPJOIN%SEL%"), MapJoinFactory.getMapJoin());
-
+    
     // The dispatcher fires the processor corresponding to the closest matching rule and passes the context along
     Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules, procCtx);
 
@@ -5007,10 +5011,14 @@
     // For each task, go over all operators recursively
     for (Task<? extends Serializable> rootTask: rootTasks)
       breakTaskTree(rootTask);
-
+    
     // For each task, set the key descriptor for the reducer
     for (Task<? extends Serializable> rootTask: rootTasks)
       setKeyDescTaskTree(rootTask);
+    
+    PhysicalContext physicalContext = new PhysicalContext(conf, getParseContext(), ctx, rootTasks, fetchTask);
+    PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(physicalContext, conf);
+    physicalOptimizer.optimize();
 
     // For each operator, generate the counters if needed
     if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEJOBPROGRESS))
@@ -5039,7 +5047,7 @@
       }
     }
   }
-
+  
   /**
    * Find all leaf tasks of the list of root tasks.
    */
@@ -5104,7 +5112,7 @@
     for (Operator<? extends Serializable> child: op.getChildOperators())
       generateCountersOperator(child);
   }
-
+  
   // loop over all the tasks recursviely
   private void breakTaskTree(Task<? extends Serializable> task) {
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolver.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolver.java Sat Jan 16 06:44:01 2010
@@ -18,7 +18,11 @@
 
 package org.apache.hadoop.hive.ql.plan;
 
+import java.io.Serializable;
+import java.util.List;
+
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Task;
 
 /**
  * Conditional task resolution interface. This is invoked at run time to get the task to invoke. 
@@ -31,5 +35,5 @@
 	 * @param ctx  opaque context
 	 * @return position of the task
 	 */
-	public int getTaskId(HiveConf conf, Object ctx);
+	public List<Task<? extends Serializable>> getTasks(HiveConf conf, Object ctx);
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java Sat Jan 16 06:44:01 2010
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -82,10 +83,11 @@
     }
   }
   
-	public int getTaskId(HiveConf conf, Object objCtx) {
+	public List<Task<? extends Serializable>> getTasks(HiveConf conf, Object objCtx) {
     ConditionalResolverMergeFilesCtx ctx = (ConditionalResolverMergeFilesCtx)objCtx;
     String dirName = ctx.getDir();
     
+    List<Task<? extends Serializable>> resTsks = new ArrayList<Task<? extends Serializable>>();
     // check if a map-reduce job is needed to merge the files
     // If the current size is smaller than the target, merge
     long trgtSize = conf.getLongVar(HiveConf.ConfVars.HIVEMERGEMAPFILESSIZE);
@@ -114,13 +116,14 @@
           reducers = Math.max(1, reducers);
           reducers = Math.min(maxReducers, reducers);
           work.setNumReduceTasks(reducers);
-          
-          return 1;
+          resTsks.add(tsk);
+          return resTsks;
         }
       }
     } catch (IOException e) {
       e.printStackTrace();
     }
-    return 0;    
+    resTsks.add(ctx.getListTasks().get(0));
+    return resTsks;
   }
 }

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Task;
+
+public class ConditionalResolverSkewJoin implements ConditionalResolver, Serializable {
+  private static final long serialVersionUID = 1L;
+
+  public static class ConditionalResolverSkewJoinCtx implements Serializable {
+    private static final long serialVersionUID = 1L;
+    // we store big keys in one table into one dir, and same keys in other
+    // tables into corresponding different dirs (one dir per table).
+    // this map stores mapping from "big key dir" to its corresponding mapjoin
+    // task.
+    Map<String, Task<? extends Serializable>> dirToTaskMap;
+    
+    public ConditionalResolverSkewJoinCtx(
+        Map<String, Task<? extends Serializable>> dirToTaskMap) {
+      super();
+      this.dirToTaskMap = dirToTaskMap;
+    }
+
+    public Map<String, Task<? extends Serializable>> getDirToTaskMap() {
+      return dirToTaskMap;
+    }
+
+    public void setDirToTaskMap(
+        Map<String, Task<? extends Serializable>> dirToTaskMap) {
+      this.dirToTaskMap = dirToTaskMap;
+    }
+  }
+  
+  public ConditionalResolverSkewJoin(){
+  }
+  
+  @Override
+  public List<Task<? extends Serializable>> getTasks(HiveConf conf, Object objCtx) {
+    ConditionalResolverSkewJoinCtx ctx = (ConditionalResolverSkewJoinCtx)objCtx;
+    List<Task<? extends Serializable>> resTsks = new ArrayList<Task<? extends Serializable>>();
+
+    Map<String, Task<? extends Serializable>> dirToTaskMap = ctx.getDirToTaskMap();
+    Iterator<Entry<String, Task<? extends Serializable>>> bigKeysPathsIter =  dirToTaskMap.entrySet().iterator();
+    try {
+      while(bigKeysPathsIter.hasNext()) {
+        Entry<String, Task<? extends Serializable>> entry = bigKeysPathsIter.next();
+        String path = entry.getKey();
+        Path dirPath = new Path(path);
+        FileSystem inpFs = dirPath.getFileSystem(conf);
+        FileStatus[] fstatus = inpFs.listStatus(dirPath);
+        if (fstatus.length > 0)
+          resTsks.add(entry.getValue());
+      }
+    }catch (IOException e) {
+      e.printStackTrace();
+    }
+    return resTsks;
+  }
+  
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalWork.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalWork.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalWork.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalWork.java Sat Jan 16 06:44:01 2010
@@ -36,7 +36,6 @@
   /**
    * @return the listWorks
    */
-  @explain(displayName="list of dependent Tasks")
   public List<? extends Serializable> getListWorks() {
     return listWorks;
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/joinDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/joinDesc.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/joinDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/joinDesc.java Sat Jan 16 06:44:01 2010
@@ -28,6 +28,7 @@
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -46,6 +47,13 @@
   public static final int UNIQUE_JOIN      = 4;
   public static final int LEFT_SEMI_JOIN   = 5;
 
+  //used to handle skew join
+  private boolean handleSkewJoin = false;
+  private int skewKeyDefinition = -1;
+  private Map<Byte, String> bigKeysDirMap;
+  private Map<Byte, Map<Byte, String>> smallKeysDirMap;
+  private Map<Byte, tableDesc> skewKeysValuesTables;
+  
   // alias to key mapping
   private Map<Byte, List<exprNodeDesc>> exprs;
   
@@ -61,6 +69,7 @@
   protected joinCond[] conds;
   
   protected Byte[] tagOrder;
+  private tableDesc keyTableDesc;
   
   public joinDesc() { }
   
@@ -187,4 +196,95 @@
   public void setTagOrder(Byte[] tagOrder) {
     this.tagOrder = tagOrder;
   }
+
+  @explain(displayName="handleSkewJoin")
+  public boolean getHandleSkewJoin() {
+    return handleSkewJoin;
+  }
+
+  /**
+   * set to handle skew join in this join op
+   * @param handleSkewJoin
+   */
+  public void setHandleSkewJoin(boolean handleSkewJoin) {
+    this.handleSkewJoin = handleSkewJoin;
+  }
+
+  /**
+   * @return mapping from tbl to dir for big keys
+   */
+  public Map<Byte, String> getBigKeysDirMap() {
+    return bigKeysDirMap;
+  }
+
+  /**
+   * set the mapping from tbl to dir for big keys
+   * @param bigKeysDirMap
+   */
+  public void setBigKeysDirMap(Map<Byte, String> bigKeysDirMap) {
+    this.bigKeysDirMap = bigKeysDirMap;
+  }
+
+  /**
+   * @return mapping from tbl to dir for small keys
+   */
+  public Map<Byte, Map<Byte, String>> getSmallKeysDirMap() {
+    return smallKeysDirMap;
+  }
+
+  /**
+   * set the mapping from tbl to dir for small keys
+   * @param bigKeysDirMap
+   */
+  public void setSmallKeysDirMap(Map<Byte, Map<Byte, String>> smallKeysDirMap) {
+    this.smallKeysDirMap = smallKeysDirMap;
+  }
+
+  /**
+   * @return skew key definition. If we see a key's associated entries' number
+   *         is bigger than this, we will define this key as a skew key.
+   */
+  public int getSkewKeyDefinition() {
+    return skewKeyDefinition;
+  }
+
+  /**
+   * set skew key definition
+   * @param skewKeyDefinition
+   */
+  public void setSkewKeyDefinition(int skewKeyDefinition) {
+    this.skewKeyDefinition = skewKeyDefinition;
+  }
+
+  /**
+   * @return the table desc for storing skew keys and their corresponding value;
+   */
+  public Map<Byte, tableDesc> getSkewKeysValuesTables() {
+    return skewKeysValuesTables;
+  }
+
+  /**
+   * @param skewKeysValuesTable set the table desc for storing skew keys and their corresponding value;
+   */
+  public void setSkewKeysValuesTables(Map<Byte, tableDesc> skewKeysValuesTables) {
+    this.skewKeysValuesTables = skewKeysValuesTables;
+  }
+  
+  public boolean isNoOuterJoin() {
+    for (org.apache.hadoop.hive.ql.plan.joinCond cond : conds) {
+      if (cond.getType() == joinDesc.FULL_OUTER_JOIN
+          || (cond.getType() == joinDesc.LEFT_OUTER_JOIN)
+          || cond.getType() == joinDesc.RIGHT_OUTER_JOIN)
+        return false;
+    }
+    return true;
+  }
+
+  public void setKeyTableDesc(tableDesc keyTblDesc) {
+    this.keyTableDesc = keyTblDesc;    
+  }
+
+  public tableDesc getKeyTableDesc() {
+    return keyTableDesc;
+  }
 }

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/skewjoin.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/skewjoin.q?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/skewjoin.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/skewjoin.q Sat Jan 16 06:44:01 2010
@@ -0,0 +1,138 @@
+set hive.optimize.skewjoin = true;
+set hive.skewjoin.key = 2;
+
+DROP TABLE T1;
+DROP TABLE T2;
+DROP TABLE T3;
+DROP TABLE T4;
+DROP TABLE dest_j1;
+
+CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE;
+CREATE TABLE T2(key STRING, val STRING) STORED AS TEXTFILE;
+CREATE TABLE T3(key STRING, val STRING) STORED AS TEXTFILE;
+CREATE TABLE T4(key STRING, val STRING) STORED AS TEXTFILE;
+CREATE TABLE dest_j1(key INT, value STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+LOAD DATA LOCAL INPATH '../data/files/T3.txt' INTO TABLE T3;
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T4;
+
+
+EXPLAIN
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+INSERT OVERWRITE TABLE dest_j1 SELECT src1.key, src2.value;
+
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+INSERT OVERWRITE TABLE dest_j1 SELECT src1.key, src2.value;
+
+SELECT sum(hash(key)), sum(hash(value)) FROM dest_j1;
+
+
+EXPLAIN
+SELECT /*+ STREAMTABLE(a) */ *
+FROM T1 a JOIN T2 b ON a.key = b.key
+          JOIN T3 c ON b.key = c.key
+          JOIN T4 d ON c.key = d.key;
+
+SELECT /*+ STREAMTABLE(a) */ *
+FROM T1 a JOIN T2 b ON a.key = b.key
+          JOIN T3 c ON b.key = c.key
+          JOIN T4 d ON c.key = d.key;
+
+EXPLAIN
+SELECT /*+ STREAMTABLE(a,c) */ *
+FROM T1 a JOIN T2 b ON a.key = b.key
+          JOIN T3 c ON b.key = c.key
+          JOIN T4 d ON c.key = d.key;
+
+SELECT /*+ STREAMTABLE(a,c) */ *
+FROM T1 a JOIN T2 b ON a.key = b.key
+          JOIN T3 c ON b.key = c.key
+          JOIN T4 d ON c.key = d.key;
+
+
+EXPLAIN FROM T1 a JOIN src c ON c.key+1=a.key SELECT /*+ STREAMTABLE(a) */ sum(hash(a.key)), sum(hash(a.val)), sum(hash(c.key));
+FROM T1 a JOIN src c ON c.key+1=a.key SELECT /*+ STREAMTABLE(a) */ sum(hash(a.key)), sum(hash(a.val)), sum(hash(c.key));
+
+EXPLAIN FROM 
+(SELECT src.* FROM src) x
+JOIN 
+(SELECT src.* FROM src) Y
+ON (x.key = Y.key)
+SELECT sum(hash(Y.key)), sum(hash(Y.value));
+
+FROM 
+(SELECT src.* FROM src) x
+JOIN 
+(SELECT src.* FROM src) Y
+ON (x.key = Y.key)
+SELECT sum(hash(Y.key)), sum(hash(Y.value));
+
+
+EXPLAIN FROM 
+(SELECT src.* FROM src) x
+JOIN 
+(SELECT src.* FROM src) Y
+ON (x.key = Y.key and substring(x.value, 5)=substring(y.value, 5)+1)
+SELECT sum(hash(Y.key)), sum(hash(Y.value));
+
+FROM 
+(SELECT src.* FROM src) x
+JOIN 
+(SELECT src.* FROM src) Y
+ON (x.key = Y.key and substring(x.value, 5)=substring(y.value, 5)+1)
+SELECT sum(hash(Y.key)), sum(hash(Y.value));
+
+
+EXPLAIN
+SELECT sum(hash(src1.c1)), sum(hash(src2.c4)) 
+FROM
+(SELECT src.key as c1, src.value as c2 from src) src1
+JOIN
+(SELECT src.key as c3, src.value as c4 from src) src2
+ON src1.c1 = src2.c3 AND src1.c1 < 100
+JOIN
+(SELECT src.key as c5, src.value as c6 from src) src3
+ON src1.c1 = src3.c5 AND src3.c5 < 80;
+
+SELECT sum(hash(src1.c1)), sum(hash(src2.c4))
+FROM
+(SELECT src.key as c1, src.value as c2 from src) src1
+JOIN
+(SELECT src.key as c3, src.value as c4 from src) src2
+ON src1.c1 = src2.c3 AND src1.c1 < 100
+JOIN
+(SELECT src.key as c5, src.value as c6 from src) src3
+ON src1.c1 = src3.c5 AND src3.c5 < 80;
+
+EXPLAIN
+SELECT /*+ mapjoin(v)*/ sum(hash(k.key)), sum(hash(v.val)) FROM T1 k LEFT OUTER JOIN T1 v ON k.key+1=v.key;
+SELECT /*+ mapjoin(v)*/ sum(hash(k.key)), sum(hash(v.val)) FROM T1 k LEFT OUTER JOIN T1 v ON k.key+1=v.key;
+
+select /*+ mapjoin(k)*/ sum(hash(k.key)), sum(hash(v.val)) from T1 k join T1 v on k.key=v.val;
+
+select /*+ mapjoin(k)*/ sum(hash(k.key)), sum(hash(v.val)) from T1 k join T1 v on k.key=v.key;
+
+select sum(hash(k.key)), sum(hash(v.val)) from T1 k join T1 v on k.key=v.key;
+
+select count(1) from  T1 a join T1 b on a.key = b.key;
+
+FROM T1 a LEFT OUTER JOIN T2 c ON c.key+1=a.key SELECT sum(hash(a.key)), sum(hash(a.val)), sum(hash(c.key));
+
+FROM T1 a RIGHT OUTER JOIN T2 c ON c.key+1=a.key SELECT /*+ STREAMTABLE(a) */ sum(hash(a.key)), sum(hash(a.val)), sum(hash(c.key));
+
+FROM T1 a FULL OUTER JOIN T2 c ON c.key+1=a.key SELECT /*+ STREAMTABLE(a) */ sum(hash(a.key)), sum(hash(a.val)), sum(hash(c.key));
+
+SELECT sum(hash(src1.key)), sum(hash(src1.val)), sum(hash(src2.key)) FROM T1 src1 LEFT OUTER JOIN T2 src2 ON src1.key+1 = src2.key RIGHT OUTER JOIN T2 src3 ON src2.key = src3.key;
+
+SELECT sum(hash(src1.key)), sum(hash(src1.val)), sum(hash(src2.key)) FROM T1 src1 JOIN T2 src2 ON src1.key+1 = src2.key JOIN T2 src3 ON src2.key = src3.key;
+
+select /*+ mapjoin(v)*/ sum(hash(k.key)), sum(hash(v.val)) from T1 k left outer join T1 v on k.key+1=v.key;
+
+
+DROP TABLE dest_j1;
+DROP TABLE T1;
+DROP TABLE T2;
+DROP TABLE T3;
+DROP TABLE T4;
\ No newline at end of file

Modified: hadoop/hive/trunk/ql/src/test/results/clientpositive/case_sensitivity.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/case_sensitivity.q.out?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/case_sensitivity.q.out (original)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/case_sensitivity.q.out Sat Jan 16 06:44:01 2010
@@ -16,8 +16,10 @@
 
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-4 depends on stages: Stage-1
-  Stage-0 depends on stages: Stage-4
+  Stage-4 depends on stages: Stage-1 , consists of Stage-3, Stage-2
+  Stage-3
+  Stage-0 depends on stages: Stage-3, Stage-2
+  Stage-2
 
 STAGE PLANS:
   Stage: Stage-1
@@ -52,35 +54,12 @@
 
   Stage: Stage-4
     Conditional Operator
-      list of dependent Tasks:
-          Move Operator
-            files:
-                hdfs directory: true
-                destination: file:/data/users/njain/hive5/hive5/build/ql/tmp/596729572/10000
-          Map Reduce
-            Alias -> Map Operator Tree:
-              file:/data/users/njain/hive5/hive5/build/ql/tmp/790017029/10002 
-                  Reduce Output Operator
-                    sort order: 
-                    Map-reduce partition columns:
-                          expr: rand()
-                          type: double
-                    tag: -1
-                    value expressions:
-                          expr: key
-                          type: int
-                          expr: value
-                          type: string
-            Reduce Operator Tree:
-              Extract
-                File Output Operator
-                  compressed: false
-                  GlobalTableId: 0
-                  table:
-                      input format: org.apache.hadoop.mapred.TextInputFormat
-                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                      name: dest1
+
+  Stage: Stage-3
+    Move Operator
+      files:
+          hdfs directory: true
+          destination: file:/data/users/heyongqiang/hive-trunk/.ptest_2/build/ql/tmp/1283457982/10000
 
   Stage: Stage-0
     Move Operator
@@ -92,6 +71,32 @@
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: dest1
 
+  Stage: Stage-2
+    Map Reduce
+      Alias -> Map Operator Tree:
+        file:/data/users/heyongqiang/hive-trunk/.ptest_2/build/ql/tmp/34390708/10002 
+            Reduce Output Operator
+              sort order: 
+              Map-reduce partition columns:
+                    expr: rand()
+                    type: double
+              tag: -1
+              value expressions:
+                    expr: key
+                    type: int
+                    expr: value
+                    type: string
+      Reduce Operator Tree:
+        Extract
+          File Output Operator
+            compressed: false
+            GlobalTableId: 0
+            table:
+                input format: org.apache.hadoop.mapred.TextInputFormat
+                output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                name: dest1
+
 
 PREHOOK: query: FROM SRC_THRIFT
 INSERT OVERWRITE TABLE dest1 SELECT src_Thrift.LINT[1], src_thrift.lintstring[0].MYSTRING where src_thrift.liNT[0] > 0
@@ -106,11 +111,11 @@
 PREHOOK: query: SELECT DEST1.* FROM Dest1
 PREHOOK: type: QUERY
 PREHOOK: Input: default@dest1
-PREHOOK: Output: file:/data/users/njain/hive5/hive5/build/ql/tmp/423054543/10000
+PREHOOK: Output: file:/data/users/heyongqiang/hive-trunk/.ptest_2/build/ql/tmp/2134218659/10000
 POSTHOOK: query: SELECT DEST1.* FROM Dest1
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@dest1
-POSTHOOK: Output: file:/data/users/njain/hive5/hive5/build/ql/tmp/423054543/10000
+POSTHOOK: Output: file:/data/users/heyongqiang/hive-trunk/.ptest_2/build/ql/tmp/2134218659/10000
 2	1
 4	8
 6	27

Modified: hadoop/hive/trunk/ql/src/test/results/clientpositive/cast1.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/cast1.q.out?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/cast1.q.out (original)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/cast1.q.out Sat Jan 16 06:44:01 2010
@@ -14,8 +14,10 @@
 
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-4 depends on stages: Stage-1
-  Stage-0 depends on stages: Stage-4
+  Stage-4 depends on stages: Stage-1 , consists of Stage-3, Stage-2
+  Stage-3
+  Stage-0 depends on stages: Stage-3, Stage-2
+  Stage-2
 
 STAGE PLANS:
   Stage: Stage-1
@@ -60,45 +62,12 @@
 
   Stage: Stage-4
     Conditional Operator
-      list of dependent Tasks:
-          Move Operator
-            files:
-                hdfs directory: true
-                destination: file:/data/users/njain/hive5/hive5/build/ql/tmp/1071125214/10000
-          Map Reduce
-            Alias -> Map Operator Tree:
-              file:/data/users/njain/hive5/hive5/build/ql/tmp/517302853/10002 
-                  Reduce Output Operator
-                    sort order: 
-                    Map-reduce partition columns:
-                          expr: rand()
-                          type: double
-                    tag: -1
-                    value expressions:
-                          expr: c1
-                          type: int
-                          expr: c2
-                          type: double
-                          expr: c3
-                          type: double
-                          expr: c4
-                          type: double
-                          expr: c5
-                          type: int
-                          expr: c6
-                          type: string
-                          expr: c7
-                          type: int
-            Reduce Operator Tree:
-              Extract
-                File Output Operator
-                  compressed: false
-                  GlobalTableId: 0
-                  table:
-                      input format: org.apache.hadoop.mapred.TextInputFormat
-                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                      name: dest1
+
+  Stage: Stage-3
+    Move Operator
+      files:
+          hdfs directory: true
+          destination: file:/data/users/heyongqiang/hive-trunk/.ptest_0/build/ql/tmp/1062243767/10000
 
   Stage: Stage-0
     Move Operator
@@ -110,6 +79,42 @@
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: dest1
 
+  Stage: Stage-2
+    Map Reduce
+      Alias -> Map Operator Tree:
+        file:/data/users/heyongqiang/hive-trunk/.ptest_0/build/ql/tmp/493733948/10002 
+            Reduce Output Operator
+              sort order: 
+              Map-reduce partition columns:
+                    expr: rand()
+                    type: double
+              tag: -1
+              value expressions:
+                    expr: c1
+                    type: int
+                    expr: c2
+                    type: double
+                    expr: c3
+                    type: double
+                    expr: c4
+                    type: double
+                    expr: c5
+                    type: int
+                    expr: c6
+                    type: string
+                    expr: c7
+                    type: int
+      Reduce Operator Tree:
+        Extract
+          File Output Operator
+            compressed: false
+            GlobalTableId: 0
+            table:
+                input format: org.apache.hadoop.mapred.TextInputFormat
+                output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                name: dest1
+
 
 PREHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 SELECT 3 + 2, 3.0 + 2, 3 + 2.0, 3.0 + 2.0, 3 + CAST(2.0 AS INT) + CAST(CAST(0 AS SMALLINT) AS INT), CAST(1 AS BOOLEAN), CAST(TRUE AS INT) WHERE src.key = 86
 PREHOOK: type: QUERY
@@ -122,9 +127,9 @@
 PREHOOK: query: select dest1.* FROM dest1
 PREHOOK: type: QUERY
 PREHOOK: Input: default@dest1
-PREHOOK: Output: file:/data/users/njain/hive5/hive5/build/ql/tmp/822287214/10000
+PREHOOK: Output: file:/data/users/heyongqiang/hive-trunk/.ptest_0/build/ql/tmp/1057577384/10000
 POSTHOOK: query: select dest1.* FROM dest1
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@dest1
-POSTHOOK: Output: file:/data/users/njain/hive5/hive5/build/ql/tmp/822287214/10000
+POSTHOOK: Output: file:/data/users/heyongqiang/hive-trunk/.ptest_0/build/ql/tmp/1057577384/10000
 5	5.0	5.0	5.0	5	true	1