You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by am...@apache.org on 2013/04/17 09:29:46 UTC

svn commit: r1468783 [5/16] - in /hive/branches/HIVE-4115: ./ beeline/src/java/org/apache/hive/beeline/ beeline/src/test/org/ beeline/src/test/org/apache/ beeline/src/test/org/apache/hive/ beeline/src/test/org/apache/hive/beeline/ beeline/src/test/org/...

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Wed Apr 17 07:29:38 2013
@@ -29,6 +29,7 @@ import java.util.Map;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -556,7 +557,7 @@ class RecordReaderImpl implements Record
       data = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
           OrcProto.Stream.Kind.DATA)), true);
       nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
-          OrcProto.Stream.Kind.NANO_DATA)), false);
+          OrcProto.Stream.Kind.SECONDARY)), false);
     }
 
     @Override
@@ -610,6 +611,52 @@ class RecordReaderImpl implements Record
     }
   }
 
+  private static class DecimalTreeReader extends TreeReader{
+    private InStream valueStream;
+    private RunLengthIntegerReader scaleStream;
+
+    DecimalTreeReader(int columnId) {
+      super(columnId);
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+    ) throws IOException {
+      super.startStripe(streams, encodings);
+      valueStream = streams.get(new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA));
+      scaleStream = new RunLengthIntegerReader(streams.get(
+          new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      valueStream.seek(index[columnId]);
+      scaleStream.seek(index[columnId]);
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      if (valuePresent) {
+        return new HiveDecimal(SerializationUtils.readBigInteger(valueStream),
+            (int) scaleStream.next());
+      }
+      return null;
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      for(int i=0; i < items; i++) {
+        SerializationUtils.readBigInteger(valueStream);
+      }
+      scaleStream.skip(items);
+    }
+  }
+
   private static class StringTreeReader extends TreeReader {
     private DynamicByteArray dictionaryBuffer = null;
     private int dictionarySize;
@@ -1024,6 +1071,8 @@ class RecordReaderImpl implements Record
         return new BinaryTreeReader(columnId);
       case TIMESTAMP:
         return new TimestampTreeReader(columnId);
+      case DECIMAL:
+        return new DecimalTreeReader(columnId);
       case STRUCT:
         return new StructTreeReader(columnId, types, included);
       case LIST:

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java Wed Apr 17 07:29:38 2013
@@ -22,6 +22,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.math.BigInteger;
 
 final class SerializationUtils {
 
@@ -103,4 +104,85 @@ final class SerializationUtils {
     output.write(((int) (ser >> 48)) & 0xff);
     output.write(((int) (ser >> 56)) & 0xff);
   }
+
+  /**
+   * Write the arbitrarily sized signed BigInteger in vint format.
+   *
+   * Signed integers are encoded using the low bit as the sign bit using zigzag
+   * encoding.
+   *
+   * Each byte uses the low 7 bits for data and the high bit for stop/continue.
+   *
+   * Bytes are stored LSB first.
+   * @param output the stream to write to
+   * @param value the value to output
+   * @throws IOException
+   */
+  static void writeBigInteger(OutputStream output,
+                              BigInteger value) throws IOException {
+    // encode the signed number as a positive integer
+    value = value.shiftLeft(1);
+    int sign = value.signum();
+    if (sign < 0) {
+      value = value.negate();
+      value = value.subtract(BigInteger.ONE);
+    }
+    int length = value.bitLength();
+    while (true) {
+      long lowBits = value.longValue() & 0x7fffffffffffffffL;
+      length -= 63;
+      // write out the next 63 bits worth of data
+      for(int i=0; i < 9; ++i) {
+        // if this is the last byte, leave the high bit off
+        if (length <= 0 && (lowBits & ~0x7f) == 0) {
+          output.write((byte) lowBits);
+          return;
+        } else {
+          output.write((byte) (0x80 | (lowBits & 0x7f)));
+          lowBits >>>= 7;
+        }
+      }
+      value = value.shiftRight(63);
+    }
+  }
+
+  /**
+   * Read the signed arbitrary sized BigInteger BigInteger in vint format
+   * @param input the stream to read from
+   * @return the read BigInteger
+   * @throws IOException
+   */
+  static BigInteger readBigInteger(InputStream input) throws IOException {
+    BigInteger result = BigInteger.ZERO;
+    long work = 0;
+    int offset = 0;
+    long b;
+    do {
+      b = input.read();
+      if (b == -1) {
+        throw new EOFException("Reading BigInteger past EOF from " + input);
+      }
+      work |= (0x7f & b) << (offset % 63);
+      offset += 7;
+      // if we've read 63 bits, roll them into the result
+      if (offset == 63) {
+        result = BigInteger.valueOf(work);
+        work = 0;
+      } else if (offset % 63 == 0) {
+        result = result.or(BigInteger.valueOf(work).shiftLeft(offset-63));
+        work = 0;
+      }
+    } while (b >= 0x80);
+    if (work != 0) {
+      result = result.or(BigInteger.valueOf(work).shiftLeft((offset/63)*63));
+    }
+    // convert back to a signed number
+    boolean isNegative = result.testBit(0);
+    if (isNegative) {
+      result = result.add(BigInteger.ONE);
+      result = result.negate();
+    }
+    result = result.shiftRight(1);
+    return result;
+  }
 }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Wed Apr 17 07:29:38 2013
@@ -23,6 +23,7 @@ import com.google.protobuf.CodedOutputSt
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
@@ -306,7 +308,7 @@ class WriterImpl implements Writer {
     private final PositionedOutputStream rowIndexStream;
 
     /**
-     * Create a tree writer
+     * Create a tree writer.
      * @param columnId the column id of the column to write
      * @param inspector the object inspector to use
      * @param streamFactory limited access to the Writer's data.
@@ -867,7 +869,7 @@ class WriterImpl implements Writer {
       this.seconds = new RunLengthIntegerWriter(writer.createStream(id,
           OrcProto.Stream.Kind.DATA), true);
       this.nanos = new RunLengthIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.NANO_DATA), false);
+          OrcProto.Stream.Kind.SECONDARY), false);
       recordPosition(rowIndexPosition);
     }
 
@@ -916,6 +918,51 @@ class WriterImpl implements Writer {
     }
   }
 
+  private static class DecimalTreeWriter extends TreeWriter {
+    private final PositionedOutputStream valueStream;
+    private final RunLengthIntegerWriter scaleStream;
+
+    DecimalTreeWriter(int columnId,
+                        ObjectInspector inspector,
+                        StreamFactory writer,
+                        boolean nullable) throws IOException {
+      super(columnId, inspector, writer, nullable);
+      valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+      scaleStream = new RunLengthIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.SECONDARY), true);
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void write(Object obj) throws IOException {
+      super.write(obj);
+      if (obj != null) {
+        HiveDecimal decimal = ((HiveDecimalObjectInspector) inspector).
+            getPrimitiveJavaObject(obj);
+        SerializationUtils.writeBigInteger(valueStream,
+            decimal.unscaledValue());
+        scaleStream.write(decimal.scale());
+        indexStatistics.updateDecimal(decimal);
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      valueStream.flush();
+      scaleStream.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      valueStream.getPosition(recorder);
+      scaleStream.getPosition(recorder);
+    }
+  }
+
   private static class StructTreeWriter extends TreeWriter {
     private final List<? extends StructField> fields;
     StructTreeWriter(int columnId,
@@ -1145,6 +1192,9 @@ class WriterImpl implements Writer {
           case TIMESTAMP:
             return new TimestampTreeWriter(streamFactory.getNextColumnId(),
                 inspector, streamFactory, nullable);
+          case DECIMAL:
+            return new DecimalTreeWriter(streamFactory.getNextColumnId(),
+                inspector, streamFactory,  nullable);
           default:
             throw new IllegalArgumentException("Bad primitive category " +
               ((PrimitiveObjectInspector) inspector).getPrimitiveCategory());
@@ -1204,6 +1254,9 @@ class WriterImpl implements Writer {
           case TIMESTAMP:
             type.setKind(OrcProto.Type.Kind.TIMESTAMP);
             break;
+          case DECIMAL:
+            type.setKind(OrcProto.Type.Kind.DECIMAL);
+            break;
           default:
             throw new IllegalArgumentException("Unknown primitive category: " +
               ((PrimitiveObjectInspector) treeWriter.inspector).

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Wed Apr 17 07:29:38 2013
@@ -76,6 +76,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.Role;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
@@ -1220,8 +1221,8 @@ public class Hive {
           org.apache.hadoop.hive.metastore.api.Partition newCreatedTpart = newTPart.getTPartition();
           SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo();
           /* Construct list bucketing location mappings from sub-directory name. */
-          Map<List<String>, String> skewedColValueLocationMaps = constructListBucketingLocationMap(
-              newPartPath, skewedInfo);
+          Map<SkewedValueList, String> skewedColValueLocationMaps =
+            constructListBucketingLocationMap(newPartPath, skewedInfo);
           /* Add list bucketing location mappings. */
           skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps);
           newCreatedTpart.getSd().setSkewedInfo(skewedInfo);
@@ -1254,7 +1255,8 @@ public class Hive {
  * @throws IOException
  */
 private void walkDirTree(FileStatus fSta, FileSystem fSys,
-    Map<List<String>, String> skewedColValueLocationMaps, Path newPartPath, SkewedInfo skewedInfo)
+    Map<SkewedValueList, String> skewedColValueLocationMaps,
+    Path newPartPath, SkewedInfo skewedInfo)
     throws IOException {
   /* Base Case. It's leaf. */
   if (!fSta.isDir()) {
@@ -1280,7 +1282,7 @@ private void walkDirTree(FileStatus fSta
  * @param skewedInfo
  */
 private void constructOneLBLocationMap(FileStatus fSta,
-    Map<List<String>, String> skewedColValueLocationMaps,
+    Map<SkewedValueList, String> skewedColValueLocationMaps,
     Path newPartPath, SkewedInfo skewedInfo) {
   Path lbdPath = fSta.getPath().getParent();
   List<String> skewedValue = new ArrayList<String>();
@@ -1303,7 +1305,7 @@ private void constructOneLBLocationMap(F
   }
   if ((skewedValue.size() > 0) && (skewedValue.size() == skewedInfo.getSkewedColNames().size())
       && !skewedColValueLocationMaps.containsKey(skewedValue)) {
-    skewedColValueLocationMaps.put(skewedValue, lbdPath.toString());
+    skewedColValueLocationMaps.put(new SkewedValueList(skewedValue), lbdPath.toString());
   }
 }
 
@@ -1316,9 +1318,10 @@ private void constructOneLBLocationMap(F
    * @throws IOException
    * @throws FileNotFoundException
    */
-  private Map<List<String>, String> constructListBucketingLocationMap(Path newPartPath,
+  private Map<SkewedValueList, String> constructListBucketingLocationMap(Path newPartPath,
       SkewedInfo skewedInfo) throws IOException, FileNotFoundException {
-    Map<List<String>, String> skewedColValueLocationMaps = new HashMap<List<String>, String>();
+    Map<SkewedValueList, String> skewedColValueLocationMaps =
+      new HashMap<SkewedValueList, String>();
     FileSystem fSys = newPartPath.getFileSystem(conf);
     walkDirTree(fSys.getFileStatus(newPartPath), fSys, skewedColValueLocationMaps, newPartPath,
         skewedInfo);

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Wed Apr 17 07:29:38 2013
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -647,18 +648,18 @@ public class Partition implements Serial
 
   public void setSkewedValueLocationMap(List<String> valList, String dirName)
       throws HiveException {
-    Map<List<String>, String> mappings = tPartition.getSd().getSkewedInfo()
+    Map<SkewedValueList, String> mappings = tPartition.getSd().getSkewedInfo()
         .getSkewedColValueLocationMaps();
     if (null == mappings) {
-      mappings = new HashMap<List<String>, String>();
+      mappings = new HashMap<SkewedValueList, String>();
       tPartition.getSd().getSkewedInfo().setSkewedColValueLocationMaps(mappings);
     }
 
     // Add or update new mapping
-    mappings.put(valList, dirName);
+    mappings.put(new SkewedValueList(valList), dirName);
   }
 
-  public Map<List<String>, String> getSkewedColValueLocationMaps() {
+  public Map<SkewedValueList, String> getSkewedColValueLocationMaps() {
     return tPartition.getSd().getSkewedInfo().getSkewedColValueLocationMaps();
   }
 }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Wed Apr 17 07:29:38 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -144,7 +145,7 @@ public class Table implements Serializab
       SkewedInfo skewInfo = new SkewedInfo();
       skewInfo.setSkewedColNames(new ArrayList<String>());
       skewInfo.setSkewedColValues(new ArrayList<List<String>>());
-      skewInfo.setSkewedColValueLocationMaps(new HashMap<List<String>, String>());
+      skewInfo.setSkewedColValueLocationMaps(new HashMap<SkewedValueList, String>());
       sd.setSkewedInfo(skewInfo);
     }
 
@@ -518,20 +519,20 @@ public class Table implements Serializab
 
   public void setSkewedValueLocationMap(List<String> valList, String dirName)
       throws HiveException {
-    Map<List<String>, String> mappings = tTable.getSd().getSkewedInfo()
+    Map<SkewedValueList, String> mappings = tTable.getSd().getSkewedInfo()
         .getSkewedColValueLocationMaps();
     if (null == mappings) {
-      mappings = new HashMap<List<String>, String>();
+      mappings = new HashMap<SkewedValueList, String>();
       tTable.getSd().getSkewedInfo().setSkewedColValueLocationMaps(mappings);
     }
 
     // Add or update new mapping
-    mappings.put(valList, dirName);
+    mappings.put(new SkewedValueList(valList), dirName);
   }
 
-  public Map<List<String>,String> getSkewedColValueLocationMaps() {
+  public Map<SkewedValueList,String> getSkewedColValueLocationMaps() {
     return (tTable.getSd().getSkewedInfo() != null) ? tTable.getSd().getSkewedInfo()
-        .getSkewedColValueLocationMaps() : new HashMap<List<String>, String>();
+        .getSkewedColValueLocationMaps() : new HashMap<SkewedValueList, String>();
   }
 
   public void setSkewedColValues(List<List<String>> skewedValues) throws HiveException {

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java Wed Apr 17 07:29:38 2013
@@ -31,6 +31,7 @@ import org.apache.commons.lang.StringEsc
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.ql.index.HiveIndex;
 import org.apache.hadoop.hive.ql.index.HiveIndex.IndexType;
@@ -62,16 +63,22 @@ public final class MetaDataFormatUtils {
     columnInformation.append(LINE_DELIM);
   }
 
-  public static String getAllColumnsInformation(List<FieldSchema> cols) {
+  public static String getAllColumnsInformation(List<FieldSchema> cols,
+      boolean printHeader) {
     StringBuilder columnInformation = new StringBuilder(DEFAULT_STRINGBUILDER_SIZE);
-    formatColumnsHeader(columnInformation);
+    if(printHeader){
+      formatColumnsHeader(columnInformation);
+    }
     formatAllFields(columnInformation, cols);
     return columnInformation.toString();
   }
 
-  public static String getAllColumnsInformation(List<FieldSchema> cols, List<FieldSchema> partCols) {
+  public static String getAllColumnsInformation(List<FieldSchema> cols, List<FieldSchema> partCols,
+      boolean printHeader) {
     StringBuilder columnInformation = new StringBuilder(DEFAULT_STRINGBUILDER_SIZE);
-    formatColumnsHeader(columnInformation);
+    if(printHeader){
+      formatColumnsHeader(columnInformation);
+    }
     formatAllFields(columnInformation, cols);
 
     if ((partCols != null) && (!partCols.isEmpty())) {
@@ -193,7 +200,7 @@ public final class MetaDataFormatUtils {
         formatOutput("Skewed Values:", skewedColValues.toString(), tableInfo);
       }
 
-      Map<List<String>, String> skewedColMap = storageDesc.getSkewedInfo()
+      Map<SkewedValueList, String> skewedColMap = storageDesc.getSkewedInfo()
           .getSkewedColValueLocationMaps();
       if ((skewedColMap!=null) && (skewedColMap.size() > 0)) {
         formatOutput("Skewed Value to Path:", skewedColMap.toString(),
@@ -201,9 +208,8 @@ public final class MetaDataFormatUtils {
         Map<List<String>, String> truncatedSkewedColMap = new HashMap<List<String>, String>();
         // walk through existing map to truncate path so that test won't mask it
         // then we can verify location is right
-        Set<Entry<List<String>, String>> entries = skewedColMap.entrySet();
-        for (Entry<List<String>, String> entry : entries) {
-          truncatedSkewedColMap.put(entry.getKey(),
+        for (Entry<SkewedValueList, String> entry : skewedColMap.entrySet()) {
+          truncatedSkewedColMap.put(entry.getKey().getSkewedValueList(),
               PlanUtils.removePrefixFromWarehouseConfig(entry.getValue()));
         }
         formatOutput("Skewed Value to Truncated Path:",

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java Wed Apr 17 07:29:38 2013
@@ -19,13 +19,14 @@
 package org.apache.hadoop.hive.ql.metadata.formatting;
 
 import java.io.DataOutputStream;
-import java.io.OutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
@@ -143,10 +144,11 @@ public class TextMetaDataFormatter imple
                   MetaDataPrettyFormatUtils.getAllColumnsInformation(
                       cols, partCols, prettyOutputNumCols)
                 :
-                  MetaDataFormatUtils.getAllColumnsInformation(cols, partCols)
+                  MetaDataFormatUtils.getAllColumnsInformation(cols, partCols, isFormatted)
               );
           } else {
-            outStream.writeBytes(MetaDataFormatUtils.getAllColumnsInformation(cols));
+            outStream.writeBytes(
+                MetaDataFormatUtils.getAllColumnsInformation(cols, isFormatted));
           }
 
           if (tableName.equals(colPath)) {
@@ -455,11 +457,13 @@ public class TextMetaDataFormatter imple
         try {
             outStream.writeBytes(database);
             outStream.write(separator);
-            if (comment != null)
-                outStream.writeBytes(comment);
+            if (comment != null) {
+              outStream.writeBytes(comment);
+            }
             outStream.write(separator);
-            if (location != null)
-                outStream.writeBytes(location);
+            if (location != null) {
+              outStream.writeBytes(location);
+            }
             outStream.write(separator);
             if (params != null && !params.isEmpty()) {
                 outStream.writeBytes(params.toString());

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java Wed Apr 17 07:29:38 2013
@@ -390,11 +390,10 @@ abstract public class AbstractSMBJoinPro
   // Can the join operator be converted to a sort-merge join operator ?
   // It is already verified that the join can be converted to a bucket map join
   protected boolean checkConvertJoinToSMBJoin(
-    JoinOperator joinOperator,
-    SortBucketJoinProcCtx smbJoinContext,
-    ParseContext pGraphContext) throws SemanticException {
+      JoinOperator joinOperator,
+      SortBucketJoinProcCtx smbJoinContext,
+      ParseContext pGraphContext) throws SemanticException {
 
-    boolean tableEligibleForBucketedSortMergeJoin = true;
     QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOperator);
 
     if (joinCtx == null) {
@@ -409,14 +408,15 @@ abstract public class AbstractSMBJoinPro
     List<Order> sortColumnsFirstTable = new ArrayList<Order>();
 
     for (int pos = 0; pos < srcs.length; pos++) {
-      tableEligibleForBucketedSortMergeJoin = tableEligibleForBucketedSortMergeJoin &&
-        isEligibleForBucketSortMergeJoin(smbJoinContext,
-                      pGraphContext,
-                      smbJoinContext.getKeyExprMap().get((byte)pos),
-                      joinCtx,
-                      srcs,
-                      pos,
-                      sortColumnsFirstTable);
+      if (!isEligibleForBucketSortMergeJoin(smbJoinContext,
+          pGraphContext,
+          smbJoinContext.getKeyExprMap().get((byte) pos),
+          joinCtx,
+          srcs,
+          pos,
+          sortColumnsFirstTable)) {
+        return false;
+      }
     }
 
     smbJoinContext.setSrcs(srcs);
@@ -489,9 +489,12 @@ abstract public class AbstractSMBJoinPro
     }
 
     context.setKeyExprMap(keyExprMap);
-    String[] srcs = joinCtx.getBaseSrc();
-    for (int srcPos = 0; srcPos < srcs.length; srcPos++) {
-      srcs[srcPos] = QB.getAppendedAliasFromId(joinCtx.getId(), srcs[srcPos]);
+    // Make a deep copy of the aliases so that they are not changed in the context
+    String[] joinSrcs = joinCtx.getBaseSrc();
+    String[] srcs = new String[joinSrcs.length];
+    for (int srcPos = 0; srcPos < joinSrcs.length; srcPos++) {
+      joinSrcs[srcPos] = QB.getAppendedAliasFromId(joinCtx.getId(), joinSrcs[srcPos]);
+      srcs[srcPos] = new String(joinSrcs[srcPos]);
     }
 
     // Given a candidate map-join, can this join be converted.

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java Wed Apr 17 07:29:38 2013
@@ -25,11 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Stack;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hive.common.ObjectPair;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.ql.exec.ExtractOperator;
@@ -37,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Fi
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
@@ -56,6 +54,7 @@ import org.apache.hadoop.hive.ql.parse.S
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
 
 /**
@@ -64,12 +63,15 @@ import org.apache.hadoop.hive.ql.plan.Se
  * insert overwrite table T1 select * from T2;
  * where T1 and T2 are bucketized/sorted on the same keys, we don't need a reducer to
  * enforce bucketing and sorting.
+ *
+ * It also optimizes queries of the form:
+ * insert overwrite table T1
+ * select * from T1 join T2 on T1.key = T2.key
+ * where T1, T2 and T3 are bucketized/sorted on the same key 'key', we don't need a reducer
+ * to enforce bucketing and sorting
  */
 public class BucketingSortingReduceSinkOptimizer implements Transform {
 
-  private static final Log LOG = LogFactory.getLog(BucketingSortingReduceSinkOptimizer.class
-      .getName());
-
   public BucketingSortingReduceSinkOptimizer() {
   }
 
@@ -77,7 +79,6 @@ public class BucketingSortingReduceSinkO
   public ParseContext transform(ParseContext pctx) throws SemanticException {
 
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-    HiveConf conf = pctx.getConf();
 
     // process reduce sink added by hive.enforce.bucketing or hive.enforce.sorting
     opRules.put(new RuleRegExp("R1",
@@ -90,7 +91,7 @@ public class BucketingSortingReduceSinkO
     Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, null);
     GraphWalker ogw = new DefaultGraphWalker(disp);
 
-    // Create a list of topop nodes
+    // Create a list of top nodes
     ArrayList<Node> topNodes = new ArrayList<Node>();
     topNodes.addAll(pctx.getTopOps().values());
     ogw.startWalking(topNodes, null);
@@ -117,7 +118,6 @@ public class BucketingSortingReduceSinkO
    *
    */
   public class BucketSortReduceSinkProcessor implements NodeProcessor {
-
     protected ParseContext pGraphContext;
 
     public BucketSortReduceSinkProcessor(ParseContext pGraphContext) {
@@ -142,28 +142,33 @@ public class BucketingSortingReduceSinkO
     }
 
     // Get the sort positions and sort order for the table
-    private List<ObjectPair<Integer, Integer>> getSortPositions(List<Order> tabSortCols,
+    // The sort order contains whether the sorting is happening ascending or descending
+    private ObjectPair<List<Integer>, List<Integer>> getSortPositionsOrder(
+        List<Order> tabSortCols,
         List<FieldSchema> tabCols) {
-      List<ObjectPair<Integer, Integer>> posns = new ArrayList<ObjectPair<Integer, Integer>>();
+      List<Integer> sortPositions = new ArrayList<Integer>();
+      List<Integer> sortOrders = new ArrayList<Integer>();
       for (Order sortCol : tabSortCols) {
         int pos = 0;
         for (FieldSchema tabCol : tabCols) {
           if (sortCol.getCol().equals(tabCol.getName())) {
-            posns.add(new ObjectPair<Integer, Integer>(pos, sortCol.getOrder()));
+            sortPositions.add(pos);
+            sortOrders.add(sortCol.getOrder());
             break;
           }
           pos++;
         }
       }
-      return posns;
+      return new ObjectPair<List<Integer>, List<Integer>>(sortPositions, sortOrders);
     }
 
-    // Return true if the parition is bucketed/sorted by the specified positions
+    // Return true if the partition is bucketed/sorted by the specified positions
     // The number of buckets, the sort order should also match along with the
     // columns which are bucketed/sorted
     private boolean checkPartition(Partition partition,
         List<Integer> bucketPositionsDest,
-        List<ObjectPair<Integer, Integer>> sortPositionsDest,
+        List<Integer> sortPositionsDest,
+        List<Integer> sortOrderDest,
         int numBucketsDest) {
       // The bucketing and sorting positions should exactly match
       int numBuckets = partition.getBucketCount();
@@ -173,10 +178,11 @@ public class BucketingSortingReduceSinkO
 
       List<Integer> partnBucketPositions =
           getBucketPositions(partition.getBucketCols(), partition.getTable().getCols());
-      List<ObjectPair<Integer, Integer>> partnSortPositions =
-          getSortPositions(partition.getSortCols(), partition.getTable().getCols());
+      ObjectPair<List<Integer>, List<Integer>> partnSortPositionsOrder =
+          getSortPositionsOrder(partition.getSortCols(), partition.getTable().getCols());
       return bucketPositionsDest.equals(partnBucketPositions) &&
-          sortPositionsDest.equals(partnSortPositions);
+          sortPositionsDest.equals(partnSortPositionsOrder.getFirst()) &&
+          sortOrderDest.equals(partnSortPositionsOrder.getSecond());
     }
 
     // Return true if the table is bucketed/sorted by the specified positions
@@ -184,7 +190,8 @@ public class BucketingSortingReduceSinkO
     // columns which are bucketed/sorted
     private boolean checkTable(Table table,
         List<Integer> bucketPositionsDest,
-        List<ObjectPair<Integer, Integer>> sortPositionsDest,
+        List<Integer> sortPositionsDest,
+        List<Integer> sortOrderDest,
         int numBucketsDest) {
       // The bucketing and sorting positions should exactly match
       int numBuckets = table.getNumBuckets();
@@ -194,12 +201,17 @@ public class BucketingSortingReduceSinkO
 
       List<Integer> tableBucketPositions =
           getBucketPositions(table.getBucketCols(), table.getCols());
-      List<ObjectPair<Integer, Integer>> tableSortPositions =
-          getSortPositions(table.getSortCols(), table.getCols());
+      ObjectPair<List<Integer>, List<Integer>> tableSortPositionsOrder =
+          getSortPositionsOrder(table.getSortCols(), table.getCols());
       return bucketPositionsDest.equals(tableBucketPositions) &&
-          sortPositionsDest.equals(tableSortPositions);
+          sortPositionsDest.equals(tableSortPositionsOrder.getFirst()) &&
+          sortOrderDest.equals(tableSortPositionsOrder.getSecond());
     }
 
+    // Store the bucket path to bucket number mapping in the table scan operator.
+    // Although one mapper per file is used (BucketizedInputHiveInput), it is possible that
+    // any mapper can pick up any file (depending on the size of the files). The bucket number
+    // corresponding to the input file is stored to name the output bucket file appropriately.
     private void storeBucketPathMapping(TableScanOperator tsOp, FileStatus[] srcs) {
       Map<String, Integer> bucketFileNameMapping = new HashMap<String, Integer>();
       for (int pos = 0; pos < srcs.length; pos++) {
@@ -222,12 +234,12 @@ public class BucketingSortingReduceSinkO
       // Store the mapping -> path, bucket number
       // This is needed since for the map-only job, any mapper can process any file.
       // For eg: if mapper 1 is processing the file corresponding to bucket 2, it should
-      // also output the file correspodning to bucket 2 of the output.
+      // also output the file corresponding to bucket 2 of the output.
       storeBucketPathMapping(tsOp, srcs);
     }
 
     // Remove the reduce sink operator
-    // Use bucketized hive input format so that one mapper processes exactly one file
+    // Use BucketizedHiveInputFormat so that one mapper processes exactly one file
     private void removeReduceSink(ReduceSinkOperator rsOp,
         TableScanOperator tsOp,
         FileSinkOperator fsOp) {
@@ -251,6 +263,97 @@ public class BucketingSortingReduceSinkO
       return -1;
     }
 
+    // The output columns for the destination table should match with the join keys
+    // This is to handle queries of the form:
+    // insert overwrite table T3
+    // select T1.key, T1.key2, UDF(T1.value, T2.value)
+    // from T1 join T2 on T1.key = T2.key and T1.key2 = T2.key2
+    // where T1, T2 and T3 are bucketized/sorted on key and key2
+    // Assuming T1 is the table on which the mapper is run, the following is true:
+    // . The number of buckets for T1 and T3 should be same
+    // . The bucketing/sorting columns for T1, T2 and T3 should be same
+    // . The sort order of T1 should match with the sort order for T3.
+    // . If T1 is partitioned, only a single partition of T1 can be selected.
+    // . The select list should contain with (T1.key, T1.key2) or (T2.key, T2.key2)
+    // . After the join, only selects and filters are allowed.
+    private boolean validateSMBJoinKeys(SMBJoinDesc smbJoinDesc,
+        List<ExprNodeColumnDesc> sourceTableBucketCols,
+        List<ExprNodeColumnDesc> sourceTableSortCols,
+        List<Integer> sortOrder) {
+      // The sort-merge join creates the output sorted and bucketized by the same columns.
+      // This can be relaxed in the future if there is a requirement.
+      if (!sourceTableBucketCols.equals(sourceTableSortCols)) {
+        return false;
+      }
+
+      // Get the total number of columns selected, and for each output column, store the
+      // base table it points to. For
+      // insert overwrite table T3
+      // select T1.key, T1.key2, UDF(T1.value, T2.value)
+      // from T1 join T2 on T1.key = T2.key and T1.key2 = T2.key2
+      // the following arrays are created
+      // [0, 0, 0, 1] --> [T1, T1, T1, T2] (table mapping)
+      // [0, 1, 2, 0] --> [T1.0, T1.1, T1.2, T2.0] (table columns mapping)
+      Byte[] tagOrder = smbJoinDesc.getTagOrder();
+      Map<Byte, List<Integer>> retainList = smbJoinDesc.getRetainList();
+      int totalNumberColumns = 0;
+      for (Byte tag : tagOrder) {
+        totalNumberColumns += retainList.get(tag).size();
+      }
+
+      byte[] columnTableMappings = new byte[totalNumberColumns];
+      int[] columnNumberMappings = new int[totalNumberColumns];
+      int currentColumnPosition = 0;
+      for (Byte tag : tagOrder) {
+        for (int pos = 0; pos < retainList.get(tag).size(); pos++) {
+          columnTableMappings[currentColumnPosition] = tag;
+          columnNumberMappings[currentColumnPosition] = pos;
+          currentColumnPosition++;
+        }
+      }
+
+      // All output columns used for bucketing/sorting of the destination table should
+      // belong to the same input table
+      //   insert overwrite table T3
+      //   select T1.key, T2.key2, UDF(T1.value, T2.value)
+      //   from T1 join T2 on T1.key = T2.key and T1.key2 = T2.key2
+      // is not optimized, whereas the insert is optimized if the select list is either changed to
+      // (T1.key, T1.key2, UDF(T1.value, T2.value)) or (T2.key, T2.key2, UDF(T1.value, T2.value))
+      // Get the input table and make sure the keys match
+      List<String> outputColumnNames = smbJoinDesc.getOutputColumnNames();
+      byte tableTag = -1;
+      int[] columnNumbersExprList = new int[sourceTableBucketCols.size()];
+      int currentColPosition = 0;
+      for (ExprNodeColumnDesc bucketCol : sourceTableBucketCols) {
+        String colName = bucketCol.getColumn();
+        int colNumber = outputColumnNames.indexOf(colName);
+        if (colNumber < 0) {
+          return false;
+        }
+        if (tableTag < 0) {
+          tableTag = columnTableMappings[colNumber];
+        }
+        else if (tableTag != columnTableMappings[colNumber]) {
+          return false;
+        }
+        columnNumbersExprList[currentColPosition++] = columnNumberMappings[colNumber];
+      }
+
+      List<ExprNodeDesc> allExprs = smbJoinDesc.getExprs().get(tableTag);
+      List<ExprNodeDesc> keysSelectedTable = smbJoinDesc.getKeys().get(tableTag);
+      currentColPosition = 0;
+      for (ExprNodeDesc keySelectedTable : keysSelectedTable) {
+        if (!(keySelectedTable instanceof ExprNodeColumnDesc)) {
+          return false;
+        }
+        if (!allExprs.get(columnNumbersExprList[currentColPosition++]).isSame(keySelectedTable)) {
+          return false;
+        }
+      }
+
+      return true;
+    }
+
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
@@ -283,14 +386,21 @@ public class BucketingSortingReduceSinkO
       if (destTable == null) {
         return null;
       }
+      int numBucketsDestination = destTable.getNumBuckets();
 
       // Get the positions for sorted and bucketed columns
       // For sorted columns, also get the order (ascending/descending) - that should
       // also match for this to be converted to a map-only job.
+      // Get the positions for sorted and bucketed columns
+      // For sorted columns, also get the order (ascending/descending) - that should
+      // also match for this to be converted to a map-only job.
       List<Integer> bucketPositions =
           getBucketPositions(destTable.getBucketCols(), destTable.getCols());
-      List<ObjectPair<Integer, Integer>> sortPositions =
-          getSortPositions(destTable.getSortCols(), destTable.getCols());
+      ObjectPair<List<Integer>, List<Integer>> sortOrderPositions =
+          getSortPositionsOrder(destTable.getSortCols(), destTable.getCols());
+      List<Integer> sortPositions = sortOrderPositions.getFirst();
+      List<Integer> sortOrder = sortOrderPositions.getSecond();
+      boolean useBucketSortPositions = true;
 
       // Only selects and filters are allowed
       Operator<? extends OperatorDesc> op = rsOp;
@@ -298,119 +408,179 @@ public class BucketingSortingReduceSinkO
       // bucketed/sorted columns for the destination table
       List<ExprNodeColumnDesc> sourceTableBucketCols = new ArrayList<ExprNodeColumnDesc>();
       List<ExprNodeColumnDesc> sourceTableSortCols = new ArrayList<ExprNodeColumnDesc>();
+      op = op.getParentOperators().get(0);
 
       while (true) {
-        if (op.getParentOperators().size() > 1) {
-          return null;
-        }
-
-        op = op.getParentOperators().get(0);
         if (!(op instanceof TableScanOperator) &&
             !(op instanceof FilterOperator) &&
-            !(op instanceof SelectOperator)) {
+            !(op instanceof SelectOperator) &&
+            !(op instanceof SMBMapJoinOperator)) {
           return null;
         }
 
-        // nothing to be done for filters - the output schema does not change.
-        if (op instanceof TableScanOperator) {
-          Table srcTable = pGraphContext.getTopToTable().get(op);
-
-          // Find the positions of the bucketed columns in the table corresponding
-          // to the select list.
-          // Consider the following scenario:
-          // T1(key, value1, value2) bucketed/sorted by key into 2 buckets
-          // T2(dummy, key, value1, value2) bucketed/sorted by key into 2 buckets
-          // A query like: insert overwrite table T2 select 1, key, value1, value2 from T1
-          // should be optimized.
-
-          // Start with the destination: T2, bucketed/sorted position is [1]
-          // At the source T1, the column corresponding to that position is [key], which
-          // maps to column [0] of T1, which is also bucketed/sorted into the same
-          // number of buckets
-          List<Integer> newBucketPositions = new ArrayList<Integer>();
-          for (int pos = 0; pos < bucketPositions.size(); pos++) {
-            ExprNodeColumnDesc col = sourceTableBucketCols.get(pos);
-            String colName = col.getColumn();
-            int bucketPos = findColumnPosition(srcTable.getCols(), colName);
-            if (bucketPos < 0) {
-              return null;
-            }
-            newBucketPositions.add(bucketPos);
+        if (op instanceof SMBMapJoinOperator) {
+          // Bucketing and sorting keys should exactly match
+          if (!(bucketPositions.equals(sortPositions))) {
+            return null;
+          }
+          SMBMapJoinOperator smbOp = (SMBMapJoinOperator) op;
+          SMBJoinDesc smbJoinDesc = smbOp.getConf();
+          int posBigTable = smbJoinDesc.getPosBigTable();
+
+          // join keys dont match the bucketing keys
+          List<ExprNodeDesc> keysBigTable = smbJoinDesc.getKeys().get((byte) posBigTable);
+          if (keysBigTable.size() != bucketPositions.size()) {
+            return null;
           }
 
-          // Find the positions/order of the sorted columns in the table corresponding
-          // to the select list.
-          List<ObjectPair<Integer, Integer>> newSortPositions =
-              new ArrayList<ObjectPair<Integer, Integer>>();
-          for (int pos = 0; pos < sortPositions.size(); pos++) {
-            ExprNodeColumnDesc col = sourceTableSortCols.get(pos);
-            String colName = col.getColumn();
-            int sortPos = findColumnPosition(srcTable.getCols(), colName);
-            if (sortPos < 0) {
-              return null;
-            }
-            newSortPositions.add(
-                new ObjectPair<Integer, Integer>(sortPos, sortPositions.get(pos).getSecond()));
+          if (!validateSMBJoinKeys(smbJoinDesc, sourceTableBucketCols,
+              sourceTableSortCols, sortOrder)) {
+            return null;
           }
 
+          sourceTableBucketCols.clear();
+          sourceTableSortCols.clear();
+          useBucketSortPositions = false;
 
-          if (srcTable.isPartitioned()) {
-            PrunedPartitionList prunedParts = pGraphContext.getOpToPartList().get(op);
-            List<Partition> partitions = prunedParts.getNotDeniedPartns();
-
-            // Support for dynamic partitions can be added later
-            // The following is not optimized:
-            // insert overwrite table T1(ds='1', hr) select key, value, hr from T2 where ds = '1';
-            // where T1 and T2 are bucketed by the same keys and partitioned by ds. hr
-            if ((partitions == null) || (partitions.isEmpty()) || (partitions.size() > 1)) {
+          for (ExprNodeDesc keyBigTable : keysBigTable) {
+            if (!(keyBigTable instanceof ExprNodeColumnDesc)) {
               return null;
             }
-            for (Partition partition : partitions) {
-              if (!checkPartition(partition, newBucketPositions, newSortPositions,
-                  pGraphContext.getFsopToTable().get(fsOp).getNumBuckets())) {
+            sourceTableBucketCols.add((ExprNodeColumnDesc) keyBigTable);
+            sourceTableSortCols.add((ExprNodeColumnDesc) keyBigTable);
+          }
+
+          // since it is a sort-merge join, only follow the big table
+          op = op.getParentOperators().get(posBigTable);
+        } else {
+          // nothing to be done for filters - the output schema does not change.
+          if (op instanceof TableScanOperator) {
+            assert !useBucketSortPositions;
+            Table srcTable = pGraphContext.getTopToTable().get(op);
+
+            // Find the positions of the bucketed columns in the table corresponding
+            // to the select list.
+            // Consider the following scenario:
+            // T1(key, value1, value2) bucketed/sorted by key into 2 buckets
+            // T2(dummy, key, value1, value2) bucketed/sorted by key into 2 buckets
+            // A query like: insert overwrite table T2 select 1, key, value1, value2 from T1
+            // should be optimized.
+
+            // Start with the destination: T2, bucketed/sorted position is [1]
+            // At the source T1, the column corresponding to that position is [key], which
+            // maps to column [0] of T1, which is also bucketed/sorted into the same
+            // number of buckets
+            List<Integer> newBucketPositions = new ArrayList<Integer>();
+            for (int pos = 0; pos < bucketPositions.size(); pos++) {
+              ExprNodeColumnDesc col = sourceTableBucketCols.get(pos);
+              String colName = col.getColumn();
+              int bucketPos = findColumnPosition(srcTable.getCols(), colName);
+              if (bucketPos < 0) {
                 return null;
               }
+              newBucketPositions.add(bucketPos);
             }
 
-            removeReduceSink(rsOp, (TableScanOperator) op, fsOp,
-                partitions.get(0).getSortedPaths());
-            return null;
-          }
-          else {
-            if (!checkTable(srcTable, newBucketPositions, newSortPositions,
-                pGraphContext.getFsopToTable().get(fsOp).getNumBuckets())) {
-              return null;
+            // Find the positions/order of the sorted columns in the table corresponding
+            // to the select list.
+            List<Integer> newSortPositions = new ArrayList<Integer>();
+            for (int pos = 0; pos < sortPositions.size(); pos++) {
+              ExprNodeColumnDesc col = sourceTableSortCols.get(pos);
+              String colName = col.getColumn();
+              int sortPos = findColumnPosition(srcTable.getCols(), colName);
+              if (sortPos < 0) {
+                return null;
+              }
+              newSortPositions.add(sortPos);
             }
 
-            removeReduceSink(rsOp, (TableScanOperator) op, fsOp, srcTable.getSortedPaths());
-            return null;
-          }
-        }
-        // None of the operators is changing the positions
-        else if (op instanceof SelectOperator) {
-          SelectOperator selectOp = (SelectOperator) op;
-          SelectDesc selectDesc = selectOp.getConf();
+            if (srcTable.isPartitioned()) {
+              PrunedPartitionList prunedParts = pGraphContext.getOpToPartList().get(op);
+              List<Partition> partitions = prunedParts.getNotDeniedPartns();
+
+              // Support for dynamic partitions can be added later
+              // The following is not optimized:
+              // insert overwrite table T1(ds='1', hr) select key, value, hr from T2 where ds = '1';
+              // where T1 and T2 are bucketed by the same keys and partitioned by ds. hr
+              if ((partitions == null) || (partitions.isEmpty()) || (partitions.size() > 1)) {
+                return null;
+              }
+              for (Partition partition : partitions) {
+                if (!checkPartition(partition, newBucketPositions, newSortPositions, sortOrder,
+                    numBucketsDestination)) {
+                  return null;
+                }
+              }
 
-          // There may be multiple selects - chose the one closest to the table
-          sourceTableBucketCols.clear();
-          sourceTableSortCols.clear();
+              removeReduceSink(rsOp, (TableScanOperator) op, fsOp,
+                  partitions.get(0).getSortedPaths());
+              return null;
+            }
+            else {
+              if (!checkTable(srcTable, newBucketPositions, newSortPositions, sortOrder,
+                  numBucketsDestination)) {
+                return null;
+              }
 
-          // Only columns can be selected for both sorted and bucketed positions
-          for (int pos : bucketPositions) {
-            ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
-            if (!(selectColList instanceof ExprNodeColumnDesc)) {
+              removeReduceSink(rsOp, (TableScanOperator) op, fsOp, srcTable.getSortedPaths());
               return null;
             }
-            sourceTableBucketCols.add((ExprNodeColumnDesc) selectColList);
           }
+          // None of the operators is changing the positions
+          else if (op instanceof SelectOperator) {
+            SelectOperator selectOp = (SelectOperator) op;
+            SelectDesc selectDesc = selectOp.getConf();
+
+            // Iterate backwards, from the destination table to the top of the tree
+            // Based on the output column names, get the new columns.
+            if (!useBucketSortPositions) {
+              bucketPositions.clear();
+              sortPositions.clear();
+              List<String> outputColumnNames = selectDesc.getOutputColumnNames();
+
+              for (ExprNodeColumnDesc col : sourceTableBucketCols) {
+                String colName = col.getColumn();
+                int colPos = outputColumnNames.indexOf(colName);
+                if (colPos < 0) {
+                  return null;
+                }
+                bucketPositions.add(colPos);
+              }
 
-          for (ObjectPair<Integer, Integer> pos : sortPositions) {
-            ExprNodeDesc selectColList = selectDesc.getColList().get(pos.getFirst());
-            if (!(selectColList instanceof ExprNodeColumnDesc)) {
-              return null;
+              for (ExprNodeColumnDesc col : sourceTableSortCols) {
+                String colName = col.getColumn();
+                int colPos = outputColumnNames.indexOf(colName);
+                if (colPos < 0) {
+                  return null;
+                }
+                sortPositions.add(colPos);
+              }
+            }
+
+            // There may be multiple selects - chose the one closest to the table
+            sourceTableBucketCols.clear();
+            sourceTableSortCols.clear();
+
+            // Only columns can be selected for both sorted and bucketed positions
+            for (int pos : bucketPositions) {
+              ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
+              if (!(selectColList instanceof ExprNodeColumnDesc)) {
+                return null;
+              }
+              sourceTableBucketCols.add((ExprNodeColumnDesc) selectColList);
+            }
+
+            for (int pos : sortPositions) {
+              ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
+              if (!(selectColList instanceof ExprNodeColumnDesc)) {
+                return null;
+              }
+              sourceTableSortCols.add((ExprNodeColumnDesc) selectColList);
             }
-            sourceTableSortCols.add((ExprNodeColumnDesc) selectColList);
+
+            useBucketSortPositions = false;
           }
+          op = op.getParentOperators().get(0);
         }
       }
     }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Wed Apr 17 07:29:38 2013
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -68,7 +67,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PTFDesc;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.ShapeDetails;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
@@ -79,8 +77,6 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
 /**
  * Factory for generating the different node processors used by ColumnPruner.
@@ -163,16 +159,11 @@ public final class ColumnPrunerProcFacto
 
   /**
    * - Pruning can only be done for Windowing. PTFs are black boxes,
-   * we assume all columns are needed.
+   *   we assume all columns are needed.
    * - add column names referenced in WindowFn args and in WindowFn expressions
-   * to the pruned list of the child Select Op.
-   * - Prune the Column names & types serde properties in each of the Shapes in the PTF Chain:
-   *    - the InputDef's output shape
-   *    - Window Tabl Functions: window output shape & output shape.
-   * - Why is pruning the Column names & types in the serde properties enough?
-   *   - because during runtime we rebuild the OIs using these properties.
+   *   to the pruned list of the child Select Op.
    * - finally we set the prunedColList on the ColumnPrunerContx;
-   * and update the RR & signature on the PTFOp.
+   *   and update the RR & signature on the PTFOp.
    */
   public static class ColumnPrunerPTFProc implements NodeProcessor {
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
@@ -194,10 +185,6 @@ public final class ColumnPrunerProcFacto
       //we create a copy of prunedCols to create a list of pruned columns for PTFOperator
       prunedCols = new ArrayList<String>(prunedCols);
       prunedColumnsList(prunedCols, def);
-      setSerdePropsOfShape(def.getInput().getOutputShape(), prunedCols);
-      setSerdePropsOfShape(def.getOutputFromWdwFnProcessing(), prunedCols);
-      setSerdePropsOfShape(def.getOutputShape(), prunedCols);
-
       RowResolver oldRR = cppCtx.getOpToParseCtxMap().get(op).getRowResolver();
       RowResolver newRR = buildPrunedRR(prunedCols, oldRR, sig);
       cppCtx.getPrunedColLists().put(op, prunedInputList(prunedCols, def));
@@ -255,47 +242,6 @@ public final class ColumnPrunerProcFacto
        }
     }
 
-    private List<String> getLowerCasePrunedCols(List<String> prunedCols){
-      List<String> lowerCasePrunedCols = new ArrayList<String>();
-      for (String col : prunedCols) {
-        lowerCasePrunedCols.add(col.toLowerCase());
-      }
-      return lowerCasePrunedCols;
-    }
-
-    /*
-     * reconstruct Column names & types list based on the prunedCols list.
-     */
-    private void setSerdePropsOfShape(ShapeDetails shp, List<String> prunedCols) {
-      List<String> columnNames = Arrays.asList(shp.getSerdeProps().get(
-          org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS).split(","));
-      List<TypeInfo> columnTypes = TypeInfoUtils
-          .getTypeInfosFromTypeString(shp.getSerdeProps().get(
-              org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES));
-      /*
-       * fieldNames in OI are lower-cased. So we compare lower cased names for now.
-       */
-      prunedCols = getLowerCasePrunedCols(prunedCols);
-
-      StringBuilder cNames = new StringBuilder();
-      StringBuilder cTypes = new StringBuilder();
-
-      boolean addComma = false;
-      for(int i=0; i < columnNames.size(); i++) {
-        if ( prunedCols.contains(columnNames.get(i)) ) {
-          cNames.append(addComma ? "," : "");
-          cTypes.append(addComma ? "," : "");
-          cNames.append(columnNames.get(i));
-          cTypes.append(columnTypes.get(i));
-          addComma = true;
-        }
-      }
-      shp.getSerdeProps().put(
-          org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS, cNames.toString());
-      shp.getSerdeProps().put(
-          org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES, cTypes.toString());
-    }
-
     /*
      * from the prunedCols list filter out columns that refer to WindowFns or WindowExprs
      * the returned list is set as the prunedList needed by the PTFOp.
@@ -749,71 +695,39 @@ public final class ColumnPrunerProcFacto
       ReduceSinkOperator reduce, ColumnPrunerProcCtx cppCtx) throws SemanticException {
     ReduceSinkDesc reduceConf = reduce.getConf();
     Map<String, ExprNodeDesc> oldMap = reduce.getColumnExprMap();
-    Map<String, ExprNodeDesc> newMap = new HashMap<String, ExprNodeDesc>();
-    ArrayList<ColumnInfo> sig = new ArrayList<ColumnInfo>();
     RowResolver oldRR = cppCtx.getOpToParseCtxMap().get(reduce).getRowResolver();
-    RowResolver newRR = new RowResolver();
-    ArrayList<String> originalValueOutputColNames = reduceConf
-        .getOutputValueColumnNames();
-    java.util.ArrayList<ExprNodeDesc> originalValueEval = reduceConf
-        .getValueCols();
-    ArrayList<String> newOutputColNames = new ArrayList<String>();
-    java.util.ArrayList<ExprNodeDesc> newValueEval = new ArrayList<ExprNodeDesc>();
-    // ReduceSinkOperators that precede GroupByOperators have the keys in the schema in addition
-    // to the values.  These are not pruned.
-    List<ColumnInfo> oldSchema = oldRR.getRowSchema().getSignature();
-    for (ColumnInfo colInfo : oldSchema) {
-      if (colInfo.getInternalName().startsWith(Utilities.ReduceField.KEY.toString() + ".")) {
-        String[] nm = oldRR.reverseLookup(colInfo.getInternalName());
-        newRR.put(nm[0], nm[1], colInfo);
-        sig.add(colInfo);
-      } else {
-        break;
-      }
-    }
+    ArrayList<ColumnInfo> signature = oldRR.getRowSchema().getSignature();
+
+    List<String> valueColNames = reduceConf.getOutputValueColumnNames();
+    ArrayList<String> newValueColNames = new ArrayList<String>();
+
+    List<ExprNodeDesc> valueExprs = reduceConf.getValueCols();
+    ArrayList<ExprNodeDesc> newValueExprs = new ArrayList<ExprNodeDesc>();
+
     for (int i = 0; i < retainFlags.length; i++) {
-      if (retainFlags[i]) {
-        newValueEval.add(originalValueEval.get(i));
-        String outputCol = originalValueOutputColNames.get(i);
-        newOutputColNames.add(outputCol);
+      String outputCol = valueColNames.get(i);
+      ExprNodeDesc outputColExpr = valueExprs.get(i);
+      if (!retainFlags[i]) {
         String[] nm = oldRR.reverseLookup(outputCol);
         if (nm == null) {
           outputCol = Utilities.ReduceField.VALUE.toString() + "." + outputCol;
           nm = oldRR.reverseLookup(outputCol);
         }
-        newMap.put(outputCol, oldMap.get(outputCol));
-        ColumnInfo colInfo = oldRR.get(nm[0], nm[1]);
-        newRR.put(nm[0], nm[1], colInfo);
-        sig.add(colInfo);
-      }
-    }
-
-    ArrayList<ExprNodeDesc> keyCols = reduceConf.getKeyCols();
-    List<String> keys = new ArrayList<String>();
-    RowResolver parResover = cppCtx.getOpToParseCtxMap().get(
-        reduce.getParentOperators().get(0)).getRowResolver();
-    for (int i = 0; i < keyCols.size(); i++) {
-      keys = Utilities.mergeUniqElems(keys, keyCols.get(i).getCols());
-    }
-    for (int i = 0; i < keys.size(); i++) {
-      String outputCol = keys.get(i);
-      String[] nm = parResover.reverseLookup(outputCol);
-      ColumnInfo colInfo = oldRR.get(nm[0], nm[1]);
-      if (colInfo != null) {
-        String internalName=colInfo.getInternalName();
-        newMap.put(internalName, oldMap.get(internalName));
-        newRR.put(nm[0], nm[1], colInfo);
+        ColumnInfo colInfo = oldRR.getFieldMap(nm[0]).remove(nm[1]);
+        oldRR.getInvRslvMap().remove(colInfo.getInternalName());
+        oldMap.remove(outputCol);
+        signature.remove(colInfo);
+      } else {
+        newValueColNames.add(outputCol);
+        newValueExprs.add(outputColExpr);
       }
     }
 
-    cppCtx.getOpToParseCtxMap().get(reduce).setRowResolver(newRR);
-    reduce.setColumnExprMap(newMap);
-    reduce.getSchema().setSignature(sig);
-    reduceConf.setOutputValueColumnNames(newOutputColNames);
-    reduceConf.setValueCols(newValueEval);
+    reduceConf.setOutputValueColumnNames(newValueColNames);
+    reduceConf.setValueCols(newValueExprs);
     TableDesc newValueTable = PlanUtils.getReduceValueTableDesc(PlanUtils
         .getFieldSchemasFromColumnList(reduceConf.getValueCols(),
-        newOutputColNames, 0, ""));
+        newValueColNames, 0, ""));
     reduceConf.setValueSerializeInfo(newValueTable);
   }
 
@@ -1042,4 +956,4 @@ public final class ColumnPrunerProcFacto
     return new ColumnPrunerMapJoinProc();
   }
 
-}
\ No newline at end of file
+}

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Wed Apr 17 07:29:38 2013
@@ -204,7 +204,9 @@ public class GenMRFileSink1 implements N
           (MapredWork) currTask.getWork(), false, ctx);
       }
 
-      if (!rootTasks.contains(currTask)) {
+      if (!rootTasks.contains(currTask)
+          && (currTask.getParentTasks() == null
+              || currTask.getParentTasks().isEmpty())) {
         rootTasks.add(currTask);
       }
     }
@@ -721,7 +723,9 @@ public class GenMRFileSink1 implements N
               (MapredWork) currTask.getWork(), false, ctx);
         }
         opTaskMap.put(null, currTask);
-        if (!rootTasks.contains(currTask)) {
+        if (!rootTasks.contains(currTask)
+            && (currTask.getParentTasks() == null
+                || currTask.getParentTasks().isEmpty())) {
           rootTasks.add(currTask);
         }
       } else {

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Wed Apr 17 07:29:38 2013
@@ -264,7 +264,9 @@ public class GenMRUnion1 implements Node
     else {
       // is the current task a root task
       if (shouldBeRootTask(currTask)
-          && (!ctx.getRootTasks().contains(currTask))) {
+          && !ctx.getRootTasks().contains(currTask)
+          && (currTask.getParentTasks() == null
+              || currTask.getParentTasks().isEmpty())) {
         ctx.getRootTasks().add(currTask);
       }
 

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Wed Apr 17 07:29:38 2013
@@ -106,7 +106,9 @@ public final class GenMapRedUtils {
 
     List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
 
-    if (!rootTasks.contains(currTask)) {
+    if (!rootTasks.contains(currTask)
+        && (currTask.getParentTasks() == null
+            || currTask.getParentTasks().isEmpty())) {
       rootTasks.add(currTask);
     }
     if (reducer.getClass() == JoinOperator.class) {

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Wed Apr 17 07:29:38 2013
@@ -163,8 +163,11 @@ public final class MapJoinFactory {
       opTaskMap.put(op, currTask);
 
       List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
-      assert (!rootTasks.contains(currTask));
-      rootTasks.add(currTask);
+      if(!rootTasks.contains(currTask)
+         && (currTask.getParentTasks() == null
+             || currTask.getParentTasks().isEmpty())) {
+        rootTasks.add(currTask);
+      }
 
       assert currTopOp != null;
       opProcCtx.getSeenOps().add(currTopOp);

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Wed Apr 17 07:29:38 2013
@@ -397,9 +397,8 @@ public class MapJoinProcessor implements
       byte srcTag = entry.getKey();
       List<ExprNodeDesc> filter = entry.getValue();
 
-      Operator<?> start = oldReduceSinkParentOps.get(srcTag);
-      Operator<?> terminal = newParentOps.get(srcTag);
-      newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, start, terminal));
+      Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
+      newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
     }
     desc.setFilters(filters = newFilters);
 

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java Wed Apr 17 07:29:38 2013
@@ -27,7 +27,6 @@ import java.util.Set;
 import java.util.Stack;
 
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
@@ -84,8 +83,6 @@ public class NonBlockingOpDeDupProc impl
 
       // For SEL-SEL(compute) case, move column exprs/names of child to parent.
       if (!cSEL.getConf().isSelStarNoCompute()) {
-        Operator<?> terminal = ExprNodeDescUtils.getSingleParent(pSEL, null);
-
         Set<String> funcOutputs = getFunctionOutputs(
             pSEL.getConf().getOutputColumnNames(), pSEL.getConf().getColList());
 
@@ -93,7 +90,7 @@ public class NonBlockingOpDeDupProc impl
         if (!funcOutputs.isEmpty() && !checkReferences(sources, funcOutputs)) {
           return null;
         }
-        pSEL.getConf().setColList(ExprNodeDescUtils.backtrack(sources, pSEL, terminal));
+        pSEL.getConf().setColList(ExprNodeDescUtils.backtrack(sources, cSEL, pSEL));
         pSEL.getConf().setOutputColumnNames(cSEL.getConf().getOutputColumnNames());
 
         // updates schema only (this should be the last optimizer modifying operator tree)