You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by am...@apache.org on 2013/04/17 09:29:46 UTC
svn commit: r1468783 [5/16] - in /hive/branches/HIVE-4115: ./
beeline/src/java/org/apache/hive/beeline/ beeline/src/test/org/
beeline/src/test/org/apache/ beeline/src/test/org/apache/hive/
beeline/src/test/org/apache/hive/beeline/ beeline/src/test/org/...
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Wed Apr 17 07:29:38 2013
@@ -29,6 +29,7 @@ import java.util.Map;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -556,7 +557,7 @@ class RecordReaderImpl implements Record
data = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.DATA)), true);
nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.NANO_DATA)), false);
+ OrcProto.Stream.Kind.SECONDARY)), false);
}
@Override
@@ -610,6 +611,52 @@ class RecordReaderImpl implements Record
}
}
+ private static class DecimalTreeReader extends TreeReader{
+ private InStream valueStream;
+ private RunLengthIntegerReader scaleStream;
+
+ DecimalTreeReader(int columnId) {
+ super(columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ valueStream = streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA));
+ scaleStream = new RunLengthIntegerReader(streams.get(
+ new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ valueStream.seek(index[columnId]);
+ scaleStream.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ if (valuePresent) {
+ return new HiveDecimal(SerializationUtils.readBigInteger(valueStream),
+ (int) scaleStream.next());
+ }
+ return null;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ for(int i=0; i < items; i++) {
+ SerializationUtils.readBigInteger(valueStream);
+ }
+ scaleStream.skip(items);
+ }
+ }
+
private static class StringTreeReader extends TreeReader {
private DynamicByteArray dictionaryBuffer = null;
private int dictionarySize;
@@ -1024,6 +1071,8 @@ class RecordReaderImpl implements Record
return new BinaryTreeReader(columnId);
case TIMESTAMP:
return new TimestampTreeReader(columnId);
+ case DECIMAL:
+ return new DecimalTreeReader(columnId);
case STRUCT:
return new StructTreeReader(columnId, types, included);
case LIST:
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java Wed Apr 17 07:29:38 2013
@@ -22,6 +22,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.math.BigInteger;
final class SerializationUtils {
@@ -103,4 +104,85 @@ final class SerializationUtils {
output.write(((int) (ser >> 48)) & 0xff);
output.write(((int) (ser >> 56)) & 0xff);
}
+
+ /**
+ * Write the arbitrarily sized signed BigInteger in vint format.
+ *
+ * Signed integers are encoded using the low bit as the sign bit using zigzag
+ * encoding.
+ *
+ * Each byte uses the low 7 bits for data and the high bit for stop/continue.
+ *
+ * Bytes are stored LSB first.
+ * @param output the stream to write to
+ * @param value the value to output
+ * @throws IOException
+ */
+ static void writeBigInteger(OutputStream output,
+ BigInteger value) throws IOException {
+ // encode the signed number as a positive integer
+ value = value.shiftLeft(1);
+ int sign = value.signum();
+ if (sign < 0) {
+ value = value.negate();
+ value = value.subtract(BigInteger.ONE);
+ }
+ int length = value.bitLength();
+ while (true) {
+ long lowBits = value.longValue() & 0x7fffffffffffffffL;
+ length -= 63;
+ // write out the next 63 bits worth of data
+ for(int i=0; i < 9; ++i) {
+ // if this is the last byte, leave the high bit off
+ if (length <= 0 && (lowBits & ~0x7f) == 0) {
+ output.write((byte) lowBits);
+ return;
+ } else {
+ output.write((byte) (0x80 | (lowBits & 0x7f)));
+ lowBits >>>= 7;
+ }
+ }
+ value = value.shiftRight(63);
+ }
+ }
+
+ /**
+ * Read the signed arbitrary sized BigInteger BigInteger in vint format
+ * @param input the stream to read from
+ * @return the read BigInteger
+ * @throws IOException
+ */
+ static BigInteger readBigInteger(InputStream input) throws IOException {
+ BigInteger result = BigInteger.ZERO;
+ long work = 0;
+ int offset = 0;
+ long b;
+ do {
+ b = input.read();
+ if (b == -1) {
+ throw new EOFException("Reading BigInteger past EOF from " + input);
+ }
+ work |= (0x7f & b) << (offset % 63);
+ offset += 7;
+ // if we've read 63 bits, roll them into the result
+ if (offset == 63) {
+ result = BigInteger.valueOf(work);
+ work = 0;
+ } else if (offset % 63 == 0) {
+ result = result.or(BigInteger.valueOf(work).shiftLeft(offset-63));
+ work = 0;
+ }
+ } while (b >= 0x80);
+ if (work != 0) {
+ result = result.or(BigInteger.valueOf(work).shiftLeft((offset/63)*63));
+ }
+ // convert back to a signed number
+ boolean isNegative = result.testBit(0);
+ if (isNegative) {
+ result = result.add(BigInteger.ONE);
+ result = result.negate();
+ }
+ result = result.shiftRight(1);
+ return result;
+ }
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Wed Apr 17 07:29:38 2013
@@ -23,6 +23,7 @@ import com.google.protobuf.CodedOutputSt
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
@@ -306,7 +308,7 @@ class WriterImpl implements Writer {
private final PositionedOutputStream rowIndexStream;
/**
- * Create a tree writer
+ * Create a tree writer.
* @param columnId the column id of the column to write
* @param inspector the object inspector to use
* @param streamFactory limited access to the Writer's data.
@@ -867,7 +869,7 @@ class WriterImpl implements Writer {
this.seconds = new RunLengthIntegerWriter(writer.createStream(id,
OrcProto.Stream.Kind.DATA), true);
this.nanos = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.NANO_DATA), false);
+ OrcProto.Stream.Kind.SECONDARY), false);
recordPosition(rowIndexPosition);
}
@@ -916,6 +918,51 @@ class WriterImpl implements Writer {
}
}
+ private static class DecimalTreeWriter extends TreeWriter {
+ private final PositionedOutputStream valueStream;
+ private final RunLengthIntegerWriter scaleStream;
+
+ DecimalTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+ scaleStream = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), true);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ HiveDecimal decimal = ((HiveDecimalObjectInspector) inspector).
+ getPrimitiveJavaObject(obj);
+ SerializationUtils.writeBigInteger(valueStream,
+ decimal.unscaledValue());
+ scaleStream.write(decimal.scale());
+ indexStatistics.updateDecimal(decimal);
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ valueStream.flush();
+ scaleStream.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ valueStream.getPosition(recorder);
+ scaleStream.getPosition(recorder);
+ }
+ }
+
private static class StructTreeWriter extends TreeWriter {
private final List<? extends StructField> fields;
StructTreeWriter(int columnId,
@@ -1145,6 +1192,9 @@ class WriterImpl implements Writer {
case TIMESTAMP:
return new TimestampTreeWriter(streamFactory.getNextColumnId(),
inspector, streamFactory, nullable);
+ case DECIMAL:
+ return new DecimalTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
default:
throw new IllegalArgumentException("Bad primitive category " +
((PrimitiveObjectInspector) inspector).getPrimitiveCategory());
@@ -1204,6 +1254,9 @@ class WriterImpl implements Writer {
case TIMESTAMP:
type.setKind(OrcProto.Type.Kind.TIMESTAMP);
break;
+ case DECIMAL:
+ type.setKind(OrcProto.Type.Kind.DECIMAL);
+ break;
default:
throw new IllegalArgumentException("Unknown primitive category: " +
((PrimitiveObjectInspector) treeWriter.inspector).
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Wed Apr 17 07:29:38 2013
@@ -76,6 +76,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.Role;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
@@ -1220,8 +1221,8 @@ public class Hive {
org.apache.hadoop.hive.metastore.api.Partition newCreatedTpart = newTPart.getTPartition();
SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo();
/* Construct list bucketing location mappings from sub-directory name. */
- Map<List<String>, String> skewedColValueLocationMaps = constructListBucketingLocationMap(
- newPartPath, skewedInfo);
+ Map<SkewedValueList, String> skewedColValueLocationMaps =
+ constructListBucketingLocationMap(newPartPath, skewedInfo);
/* Add list bucketing location mappings. */
skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps);
newCreatedTpart.getSd().setSkewedInfo(skewedInfo);
@@ -1254,7 +1255,8 @@ public class Hive {
* @throws IOException
*/
private void walkDirTree(FileStatus fSta, FileSystem fSys,
- Map<List<String>, String> skewedColValueLocationMaps, Path newPartPath, SkewedInfo skewedInfo)
+ Map<SkewedValueList, String> skewedColValueLocationMaps,
+ Path newPartPath, SkewedInfo skewedInfo)
throws IOException {
/* Base Case. It's leaf. */
if (!fSta.isDir()) {
@@ -1280,7 +1282,7 @@ private void walkDirTree(FileStatus fSta
* @param skewedInfo
*/
private void constructOneLBLocationMap(FileStatus fSta,
- Map<List<String>, String> skewedColValueLocationMaps,
+ Map<SkewedValueList, String> skewedColValueLocationMaps,
Path newPartPath, SkewedInfo skewedInfo) {
Path lbdPath = fSta.getPath().getParent();
List<String> skewedValue = new ArrayList<String>();
@@ -1303,7 +1305,7 @@ private void constructOneLBLocationMap(F
}
if ((skewedValue.size() > 0) && (skewedValue.size() == skewedInfo.getSkewedColNames().size())
&& !skewedColValueLocationMaps.containsKey(skewedValue)) {
- skewedColValueLocationMaps.put(skewedValue, lbdPath.toString());
+ skewedColValueLocationMaps.put(new SkewedValueList(skewedValue), lbdPath.toString());
}
}
@@ -1316,9 +1318,10 @@ private void constructOneLBLocationMap(F
* @throws IOException
* @throws FileNotFoundException
*/
- private Map<List<String>, String> constructListBucketingLocationMap(Path newPartPath,
+ private Map<SkewedValueList, String> constructListBucketingLocationMap(Path newPartPath,
SkewedInfo skewedInfo) throws IOException, FileNotFoundException {
- Map<List<String>, String> skewedColValueLocationMaps = new HashMap<List<String>, String>();
+ Map<SkewedValueList, String> skewedColValueLocationMaps =
+ new HashMap<SkewedValueList, String>();
FileSystem fSys = newPartPath.getFileSystem(conf);
walkDirTree(fSys.getFileStatus(newPartPath), fSys, skewedColValueLocationMaps, newPartPath,
skewedInfo);
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Wed Apr 17 07:29:38 2013
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -647,18 +648,18 @@ public class Partition implements Serial
public void setSkewedValueLocationMap(List<String> valList, String dirName)
throws HiveException {
- Map<List<String>, String> mappings = tPartition.getSd().getSkewedInfo()
+ Map<SkewedValueList, String> mappings = tPartition.getSd().getSkewedInfo()
.getSkewedColValueLocationMaps();
if (null == mappings) {
- mappings = new HashMap<List<String>, String>();
+ mappings = new HashMap<SkewedValueList, String>();
tPartition.getSd().getSkewedInfo().setSkewedColValueLocationMaps(mappings);
}
// Add or update new mapping
- mappings.put(valList, dirName);
+ mappings.put(new SkewedValueList(valList), dirName);
}
- public Map<List<String>, String> getSkewedColValueLocationMaps() {
+ public Map<SkewedValueList, String> getSkewedColValueLocationMaps() {
return tPartition.getSd().getSkewedInfo().getSkewedColValueLocationMaps();
}
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Wed Apr 17 07:29:38 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -144,7 +145,7 @@ public class Table implements Serializab
SkewedInfo skewInfo = new SkewedInfo();
skewInfo.setSkewedColNames(new ArrayList<String>());
skewInfo.setSkewedColValues(new ArrayList<List<String>>());
- skewInfo.setSkewedColValueLocationMaps(new HashMap<List<String>, String>());
+ skewInfo.setSkewedColValueLocationMaps(new HashMap<SkewedValueList, String>());
sd.setSkewedInfo(skewInfo);
}
@@ -518,20 +519,20 @@ public class Table implements Serializab
public void setSkewedValueLocationMap(List<String> valList, String dirName)
throws HiveException {
- Map<List<String>, String> mappings = tTable.getSd().getSkewedInfo()
+ Map<SkewedValueList, String> mappings = tTable.getSd().getSkewedInfo()
.getSkewedColValueLocationMaps();
if (null == mappings) {
- mappings = new HashMap<List<String>, String>();
+ mappings = new HashMap<SkewedValueList, String>();
tTable.getSd().getSkewedInfo().setSkewedColValueLocationMaps(mappings);
}
// Add or update new mapping
- mappings.put(valList, dirName);
+ mappings.put(new SkewedValueList(valList), dirName);
}
- public Map<List<String>,String> getSkewedColValueLocationMaps() {
+ public Map<SkewedValueList,String> getSkewedColValueLocationMaps() {
return (tTable.getSd().getSkewedInfo() != null) ? tTable.getSd().getSkewedInfo()
- .getSkewedColValueLocationMaps() : new HashMap<List<String>, String>();
+ .getSkewedColValueLocationMaps() : new HashMap<SkewedValueList, String>();
}
public void setSkewedColValues(List<List<String>> skewedValues) throws HiveException {
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java Wed Apr 17 07:29:38 2013
@@ -31,6 +31,7 @@ import org.apache.commons.lang.StringEsc
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.index.HiveIndex;
import org.apache.hadoop.hive.ql.index.HiveIndex.IndexType;
@@ -62,16 +63,22 @@ public final class MetaDataFormatUtils {
columnInformation.append(LINE_DELIM);
}
- public static String getAllColumnsInformation(List<FieldSchema> cols) {
+ public static String getAllColumnsInformation(List<FieldSchema> cols,
+ boolean printHeader) {
StringBuilder columnInformation = new StringBuilder(DEFAULT_STRINGBUILDER_SIZE);
- formatColumnsHeader(columnInformation);
+ if(printHeader){
+ formatColumnsHeader(columnInformation);
+ }
formatAllFields(columnInformation, cols);
return columnInformation.toString();
}
- public static String getAllColumnsInformation(List<FieldSchema> cols, List<FieldSchema> partCols) {
+ public static String getAllColumnsInformation(List<FieldSchema> cols, List<FieldSchema> partCols,
+ boolean printHeader) {
StringBuilder columnInformation = new StringBuilder(DEFAULT_STRINGBUILDER_SIZE);
- formatColumnsHeader(columnInformation);
+ if(printHeader){
+ formatColumnsHeader(columnInformation);
+ }
formatAllFields(columnInformation, cols);
if ((partCols != null) && (!partCols.isEmpty())) {
@@ -193,7 +200,7 @@ public final class MetaDataFormatUtils {
formatOutput("Skewed Values:", skewedColValues.toString(), tableInfo);
}
- Map<List<String>, String> skewedColMap = storageDesc.getSkewedInfo()
+ Map<SkewedValueList, String> skewedColMap = storageDesc.getSkewedInfo()
.getSkewedColValueLocationMaps();
if ((skewedColMap!=null) && (skewedColMap.size() > 0)) {
formatOutput("Skewed Value to Path:", skewedColMap.toString(),
@@ -201,9 +208,8 @@ public final class MetaDataFormatUtils {
Map<List<String>, String> truncatedSkewedColMap = new HashMap<List<String>, String>();
// walk through existing map to truncate path so that test won't mask it
// then we can verify location is right
- Set<Entry<List<String>, String>> entries = skewedColMap.entrySet();
- for (Entry<List<String>, String> entry : entries) {
- truncatedSkewedColMap.put(entry.getKey(),
+ for (Entry<SkewedValueList, String> entry : skewedColMap.entrySet()) {
+ truncatedSkewedColMap.put(entry.getKey().getSkewedValueList(),
PlanUtils.removePrefixFromWarehouseConfig(entry.getValue()));
}
formatOutput("Skewed Value to Truncated Path:",
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java Wed Apr 17 07:29:38 2013
@@ -19,13 +19,14 @@
package org.apache.hadoop.hive.ql.metadata.formatting;
import java.io.DataOutputStream;
-import java.io.OutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
@@ -143,10 +144,11 @@ public class TextMetaDataFormatter imple
MetaDataPrettyFormatUtils.getAllColumnsInformation(
cols, partCols, prettyOutputNumCols)
:
- MetaDataFormatUtils.getAllColumnsInformation(cols, partCols)
+ MetaDataFormatUtils.getAllColumnsInformation(cols, partCols, isFormatted)
);
} else {
- outStream.writeBytes(MetaDataFormatUtils.getAllColumnsInformation(cols));
+ outStream.writeBytes(
+ MetaDataFormatUtils.getAllColumnsInformation(cols, isFormatted));
}
if (tableName.equals(colPath)) {
@@ -455,11 +457,13 @@ public class TextMetaDataFormatter imple
try {
outStream.writeBytes(database);
outStream.write(separator);
- if (comment != null)
- outStream.writeBytes(comment);
+ if (comment != null) {
+ outStream.writeBytes(comment);
+ }
outStream.write(separator);
- if (location != null)
- outStream.writeBytes(location);
+ if (location != null) {
+ outStream.writeBytes(location);
+ }
outStream.write(separator);
if (params != null && !params.isEmpty()) {
outStream.writeBytes(params.toString());
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java Wed Apr 17 07:29:38 2013
@@ -390,11 +390,10 @@ abstract public class AbstractSMBJoinPro
// Can the join operator be converted to a sort-merge join operator ?
// It is already verified that the join can be converted to a bucket map join
protected boolean checkConvertJoinToSMBJoin(
- JoinOperator joinOperator,
- SortBucketJoinProcCtx smbJoinContext,
- ParseContext pGraphContext) throws SemanticException {
+ JoinOperator joinOperator,
+ SortBucketJoinProcCtx smbJoinContext,
+ ParseContext pGraphContext) throws SemanticException {
- boolean tableEligibleForBucketedSortMergeJoin = true;
QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOperator);
if (joinCtx == null) {
@@ -409,14 +408,15 @@ abstract public class AbstractSMBJoinPro
List<Order> sortColumnsFirstTable = new ArrayList<Order>();
for (int pos = 0; pos < srcs.length; pos++) {
- tableEligibleForBucketedSortMergeJoin = tableEligibleForBucketedSortMergeJoin &&
- isEligibleForBucketSortMergeJoin(smbJoinContext,
- pGraphContext,
- smbJoinContext.getKeyExprMap().get((byte)pos),
- joinCtx,
- srcs,
- pos,
- sortColumnsFirstTable);
+ if (!isEligibleForBucketSortMergeJoin(smbJoinContext,
+ pGraphContext,
+ smbJoinContext.getKeyExprMap().get((byte) pos),
+ joinCtx,
+ srcs,
+ pos,
+ sortColumnsFirstTable)) {
+ return false;
+ }
}
smbJoinContext.setSrcs(srcs);
@@ -489,9 +489,12 @@ abstract public class AbstractSMBJoinPro
}
context.setKeyExprMap(keyExprMap);
- String[] srcs = joinCtx.getBaseSrc();
- for (int srcPos = 0; srcPos < srcs.length; srcPos++) {
- srcs[srcPos] = QB.getAppendedAliasFromId(joinCtx.getId(), srcs[srcPos]);
+ // Make a deep copy of the aliases so that they are not changed in the context
+ String[] joinSrcs = joinCtx.getBaseSrc();
+ String[] srcs = new String[joinSrcs.length];
+ for (int srcPos = 0; srcPos < joinSrcs.length; srcPos++) {
+ joinSrcs[srcPos] = QB.getAppendedAliasFromId(joinCtx.getId(), joinSrcs[srcPos]);
+ srcs[srcPos] = new String(joinSrcs[srcPos]);
}
// Given a candidate map-join, can this join be converted.
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java Wed Apr 17 07:29:38 2013
@@ -25,11 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Stack;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hive.common.ObjectPair;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.ql.exec.ExtractOperator;
@@ -37,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Fi
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
@@ -56,6 +54,7 @@ import org.apache.hadoop.hive.ql.parse.S
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
/**
@@ -64,12 +63,15 @@ import org.apache.hadoop.hive.ql.plan.Se
* insert overwrite table T1 select * from T2;
* where T1 and T2 are bucketized/sorted on the same keys, we don't need a reducer to
* enforce bucketing and sorting.
+ *
+ * It also optimizes queries of the form:
+ * insert overwrite table T1
+ * select * from T1 join T2 on T1.key = T2.key
+ * where T1, T2 and T3 are bucketized/sorted on the same key 'key', we don't need a reducer
+ * to enforce bucketing and sorting
*/
public class BucketingSortingReduceSinkOptimizer implements Transform {
- private static final Log LOG = LogFactory.getLog(BucketingSortingReduceSinkOptimizer.class
- .getName());
-
public BucketingSortingReduceSinkOptimizer() {
}
@@ -77,7 +79,6 @@ public class BucketingSortingReduceSinkO
public ParseContext transform(ParseContext pctx) throws SemanticException {
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- HiveConf conf = pctx.getConf();
// process reduce sink added by hive.enforce.bucketing or hive.enforce.sorting
opRules.put(new RuleRegExp("R1",
@@ -90,7 +91,7 @@ public class BucketingSortingReduceSinkO
Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, null);
GraphWalker ogw = new DefaultGraphWalker(disp);
- // Create a list of topop nodes
+ // Create a list of top nodes
ArrayList<Node> topNodes = new ArrayList<Node>();
topNodes.addAll(pctx.getTopOps().values());
ogw.startWalking(topNodes, null);
@@ -117,7 +118,6 @@ public class BucketingSortingReduceSinkO
*
*/
public class BucketSortReduceSinkProcessor implements NodeProcessor {
-
protected ParseContext pGraphContext;
public BucketSortReduceSinkProcessor(ParseContext pGraphContext) {
@@ -142,28 +142,33 @@ public class BucketingSortingReduceSinkO
}
// Get the sort positions and sort order for the table
- private List<ObjectPair<Integer, Integer>> getSortPositions(List<Order> tabSortCols,
+ // The sort order contains whether the sorting is happening ascending or descending
+ private ObjectPair<List<Integer>, List<Integer>> getSortPositionsOrder(
+ List<Order> tabSortCols,
List<FieldSchema> tabCols) {
- List<ObjectPair<Integer, Integer>> posns = new ArrayList<ObjectPair<Integer, Integer>>();
+ List<Integer> sortPositions = new ArrayList<Integer>();
+ List<Integer> sortOrders = new ArrayList<Integer>();
for (Order sortCol : tabSortCols) {
int pos = 0;
for (FieldSchema tabCol : tabCols) {
if (sortCol.getCol().equals(tabCol.getName())) {
- posns.add(new ObjectPair<Integer, Integer>(pos, sortCol.getOrder()));
+ sortPositions.add(pos);
+ sortOrders.add(sortCol.getOrder());
break;
}
pos++;
}
}
- return posns;
+ return new ObjectPair<List<Integer>, List<Integer>>(sortPositions, sortOrders);
}
- // Return true if the parition is bucketed/sorted by the specified positions
+ // Return true if the partition is bucketed/sorted by the specified positions
// The number of buckets, the sort order should also match along with the
// columns which are bucketed/sorted
private boolean checkPartition(Partition partition,
List<Integer> bucketPositionsDest,
- List<ObjectPair<Integer, Integer>> sortPositionsDest,
+ List<Integer> sortPositionsDest,
+ List<Integer> sortOrderDest,
int numBucketsDest) {
// The bucketing and sorting positions should exactly match
int numBuckets = partition.getBucketCount();
@@ -173,10 +178,11 @@ public class BucketingSortingReduceSinkO
List<Integer> partnBucketPositions =
getBucketPositions(partition.getBucketCols(), partition.getTable().getCols());
- List<ObjectPair<Integer, Integer>> partnSortPositions =
- getSortPositions(partition.getSortCols(), partition.getTable().getCols());
+ ObjectPair<List<Integer>, List<Integer>> partnSortPositionsOrder =
+ getSortPositionsOrder(partition.getSortCols(), partition.getTable().getCols());
return bucketPositionsDest.equals(partnBucketPositions) &&
- sortPositionsDest.equals(partnSortPositions);
+ sortPositionsDest.equals(partnSortPositionsOrder.getFirst()) &&
+ sortOrderDest.equals(partnSortPositionsOrder.getSecond());
}
// Return true if the table is bucketed/sorted by the specified positions
@@ -184,7 +190,8 @@ public class BucketingSortingReduceSinkO
// columns which are bucketed/sorted
private boolean checkTable(Table table,
List<Integer> bucketPositionsDest,
- List<ObjectPair<Integer, Integer>> sortPositionsDest,
+ List<Integer> sortPositionsDest,
+ List<Integer> sortOrderDest,
int numBucketsDest) {
// The bucketing and sorting positions should exactly match
int numBuckets = table.getNumBuckets();
@@ -194,12 +201,17 @@ public class BucketingSortingReduceSinkO
List<Integer> tableBucketPositions =
getBucketPositions(table.getBucketCols(), table.getCols());
- List<ObjectPair<Integer, Integer>> tableSortPositions =
- getSortPositions(table.getSortCols(), table.getCols());
+ ObjectPair<List<Integer>, List<Integer>> tableSortPositionsOrder =
+ getSortPositionsOrder(table.getSortCols(), table.getCols());
return bucketPositionsDest.equals(tableBucketPositions) &&
- sortPositionsDest.equals(tableSortPositions);
+ sortPositionsDest.equals(tableSortPositionsOrder.getFirst()) &&
+ sortOrderDest.equals(tableSortPositionsOrder.getSecond());
}
+ // Store the bucket path to bucket number mapping in the table scan operator.
+ // Although one mapper per file is used (BucketizedInputHiveInput), it is possible that
+ // any mapper can pick up any file (depending on the size of the files). The bucket number
+ // corresponding to the input file is stored to name the output bucket file appropriately.
private void storeBucketPathMapping(TableScanOperator tsOp, FileStatus[] srcs) {
Map<String, Integer> bucketFileNameMapping = new HashMap<String, Integer>();
for (int pos = 0; pos < srcs.length; pos++) {
@@ -222,12 +234,12 @@ public class BucketingSortingReduceSinkO
// Store the mapping -> path, bucket number
// This is needed since for the map-only job, any mapper can process any file.
// For eg: if mapper 1 is processing the file corresponding to bucket 2, it should
- // also output the file correspodning to bucket 2 of the output.
+ // also output the file corresponding to bucket 2 of the output.
storeBucketPathMapping(tsOp, srcs);
}
// Remove the reduce sink operator
- // Use bucketized hive input format so that one mapper processes exactly one file
+ // Use BucketizedHiveInputFormat so that one mapper processes exactly one file
private void removeReduceSink(ReduceSinkOperator rsOp,
TableScanOperator tsOp,
FileSinkOperator fsOp) {
@@ -251,6 +263,97 @@ public class BucketingSortingReduceSinkO
return -1;
}
+ // The output columns for the destination table should match with the join keys
+ // This is to handle queries of the form:
+ // insert overwrite table T3
+ // select T1.key, T1.key2, UDF(T1.value, T2.value)
+ // from T1 join T2 on T1.key = T2.key and T1.key2 = T2.key2
+ // where T1, T2 and T3 are bucketized/sorted on key and key2
+ // Assuming T1 is the table on which the mapper is run, the following is true:
+ // . The number of buckets for T1 and T3 should be same
+ // . The bucketing/sorting columns for T1, T2 and T3 should be same
+ // . The sort order of T1 should match with the sort order for T3.
+ // . If T1 is partitioned, only a single partition of T1 can be selected.
+ // . The select list should contain with (T1.key, T1.key2) or (T2.key, T2.key2)
+ // . After the join, only selects and filters are allowed.
+ private boolean validateSMBJoinKeys(SMBJoinDesc smbJoinDesc,
+ List<ExprNodeColumnDesc> sourceTableBucketCols,
+ List<ExprNodeColumnDesc> sourceTableSortCols,
+ List<Integer> sortOrder) {
+ // The sort-merge join creates the output sorted and bucketized by the same columns.
+ // This can be relaxed in the future if there is a requirement.
+ if (!sourceTableBucketCols.equals(sourceTableSortCols)) {
+ return false;
+ }
+
+ // Get the total number of columns selected, and for each output column, store the
+ // base table it points to. For
+ // insert overwrite table T3
+ // select T1.key, T1.key2, UDF(T1.value, T2.value)
+ // from T1 join T2 on T1.key = T2.key and T1.key2 = T2.key2
+ // the following arrays are created
+ // [0, 0, 0, 1] --> [T1, T1, T1, T2] (table mapping)
+ // [0, 1, 2, 0] --> [T1.0, T1.1, T1.2, T2.0] (table columns mapping)
+ Byte[] tagOrder = smbJoinDesc.getTagOrder();
+ Map<Byte, List<Integer>> retainList = smbJoinDesc.getRetainList();
+ int totalNumberColumns = 0;
+ for (Byte tag : tagOrder) {
+ totalNumberColumns += retainList.get(tag).size();
+ }
+
+ byte[] columnTableMappings = new byte[totalNumberColumns];
+ int[] columnNumberMappings = new int[totalNumberColumns];
+ int currentColumnPosition = 0;
+ for (Byte tag : tagOrder) {
+ for (int pos = 0; pos < retainList.get(tag).size(); pos++) {
+ columnTableMappings[currentColumnPosition] = tag;
+ columnNumberMappings[currentColumnPosition] = pos;
+ currentColumnPosition++;
+ }
+ }
+
+ // All output columns used for bucketing/sorting of the destination table should
+ // belong to the same input table
+ // insert overwrite table T3
+ // select T1.key, T2.key2, UDF(T1.value, T2.value)
+ // from T1 join T2 on T1.key = T2.key and T1.key2 = T2.key2
+ // is not optimized, whereas the insert is optimized if the select list is either changed to
+ // (T1.key, T1.key2, UDF(T1.value, T2.value)) or (T2.key, T2.key2, UDF(T1.value, T2.value))
+ // Get the input table and make sure the keys match
+ List<String> outputColumnNames = smbJoinDesc.getOutputColumnNames();
+ byte tableTag = -1;
+ int[] columnNumbersExprList = new int[sourceTableBucketCols.size()];
+ int currentColPosition = 0;
+ for (ExprNodeColumnDesc bucketCol : sourceTableBucketCols) {
+ String colName = bucketCol.getColumn();
+ int colNumber = outputColumnNames.indexOf(colName);
+ if (colNumber < 0) {
+ return false;
+ }
+ if (tableTag < 0) {
+ tableTag = columnTableMappings[colNumber];
+ }
+ else if (tableTag != columnTableMappings[colNumber]) {
+ return false;
+ }
+ columnNumbersExprList[currentColPosition++] = columnNumberMappings[colNumber];
+ }
+
+ List<ExprNodeDesc> allExprs = smbJoinDesc.getExprs().get(tableTag);
+ List<ExprNodeDesc> keysSelectedTable = smbJoinDesc.getKeys().get(tableTag);
+ currentColPosition = 0;
+ for (ExprNodeDesc keySelectedTable : keysSelectedTable) {
+ if (!(keySelectedTable instanceof ExprNodeColumnDesc)) {
+ return false;
+ }
+ if (!allExprs.get(columnNumbersExprList[currentColPosition++]).isSame(keySelectedTable)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
@@ -283,14 +386,21 @@ public class BucketingSortingReduceSinkO
if (destTable == null) {
return null;
}
+ int numBucketsDestination = destTable.getNumBuckets();
// Get the positions for sorted and bucketed columns
// For sorted columns, also get the order (ascending/descending) - that should
// also match for this to be converted to a map-only job.
+ // Get the positions for sorted and bucketed columns
+ // For sorted columns, also get the order (ascending/descending) - that should
+ // also match for this to be converted to a map-only job.
List<Integer> bucketPositions =
getBucketPositions(destTable.getBucketCols(), destTable.getCols());
- List<ObjectPair<Integer, Integer>> sortPositions =
- getSortPositions(destTable.getSortCols(), destTable.getCols());
+ ObjectPair<List<Integer>, List<Integer>> sortOrderPositions =
+ getSortPositionsOrder(destTable.getSortCols(), destTable.getCols());
+ List<Integer> sortPositions = sortOrderPositions.getFirst();
+ List<Integer> sortOrder = sortOrderPositions.getSecond();
+ boolean useBucketSortPositions = true;
// Only selects and filters are allowed
Operator<? extends OperatorDesc> op = rsOp;
@@ -298,119 +408,179 @@ public class BucketingSortingReduceSinkO
// bucketed/sorted columns for the destination table
List<ExprNodeColumnDesc> sourceTableBucketCols = new ArrayList<ExprNodeColumnDesc>();
List<ExprNodeColumnDesc> sourceTableSortCols = new ArrayList<ExprNodeColumnDesc>();
+ op = op.getParentOperators().get(0);
while (true) {
- if (op.getParentOperators().size() > 1) {
- return null;
- }
-
- op = op.getParentOperators().get(0);
if (!(op instanceof TableScanOperator) &&
!(op instanceof FilterOperator) &&
- !(op instanceof SelectOperator)) {
+ !(op instanceof SelectOperator) &&
+ !(op instanceof SMBMapJoinOperator)) {
return null;
}
- // nothing to be done for filters - the output schema does not change.
- if (op instanceof TableScanOperator) {
- Table srcTable = pGraphContext.getTopToTable().get(op);
-
- // Find the positions of the bucketed columns in the table corresponding
- // to the select list.
- // Consider the following scenario:
- // T1(key, value1, value2) bucketed/sorted by key into 2 buckets
- // T2(dummy, key, value1, value2) bucketed/sorted by key into 2 buckets
- // A query like: insert overwrite table T2 select 1, key, value1, value2 from T1
- // should be optimized.
-
- // Start with the destination: T2, bucketed/sorted position is [1]
- // At the source T1, the column corresponding to that position is [key], which
- // maps to column [0] of T1, which is also bucketed/sorted into the same
- // number of buckets
- List<Integer> newBucketPositions = new ArrayList<Integer>();
- for (int pos = 0; pos < bucketPositions.size(); pos++) {
- ExprNodeColumnDesc col = sourceTableBucketCols.get(pos);
- String colName = col.getColumn();
- int bucketPos = findColumnPosition(srcTable.getCols(), colName);
- if (bucketPos < 0) {
- return null;
- }
- newBucketPositions.add(bucketPos);
+ if (op instanceof SMBMapJoinOperator) {
+ // Bucketing and sorting keys should exactly match
+ if (!(bucketPositions.equals(sortPositions))) {
+ return null;
+ }
+ SMBMapJoinOperator smbOp = (SMBMapJoinOperator) op;
+ SMBJoinDesc smbJoinDesc = smbOp.getConf();
+ int posBigTable = smbJoinDesc.getPosBigTable();
+
+ // join keys dont match the bucketing keys
+ List<ExprNodeDesc> keysBigTable = smbJoinDesc.getKeys().get((byte) posBigTable);
+ if (keysBigTable.size() != bucketPositions.size()) {
+ return null;
}
- // Find the positions/order of the sorted columns in the table corresponding
- // to the select list.
- List<ObjectPair<Integer, Integer>> newSortPositions =
- new ArrayList<ObjectPair<Integer, Integer>>();
- for (int pos = 0; pos < sortPositions.size(); pos++) {
- ExprNodeColumnDesc col = sourceTableSortCols.get(pos);
- String colName = col.getColumn();
- int sortPos = findColumnPosition(srcTable.getCols(), colName);
- if (sortPos < 0) {
- return null;
- }
- newSortPositions.add(
- new ObjectPair<Integer, Integer>(sortPos, sortPositions.get(pos).getSecond()));
+ if (!validateSMBJoinKeys(smbJoinDesc, sourceTableBucketCols,
+ sourceTableSortCols, sortOrder)) {
+ return null;
}
+ sourceTableBucketCols.clear();
+ sourceTableSortCols.clear();
+ useBucketSortPositions = false;
- if (srcTable.isPartitioned()) {
- PrunedPartitionList prunedParts = pGraphContext.getOpToPartList().get(op);
- List<Partition> partitions = prunedParts.getNotDeniedPartns();
-
- // Support for dynamic partitions can be added later
- // The following is not optimized:
- // insert overwrite table T1(ds='1', hr) select key, value, hr from T2 where ds = '1';
- // where T1 and T2 are bucketed by the same keys and partitioned by ds. hr
- if ((partitions == null) || (partitions.isEmpty()) || (partitions.size() > 1)) {
+ for (ExprNodeDesc keyBigTable : keysBigTable) {
+ if (!(keyBigTable instanceof ExprNodeColumnDesc)) {
return null;
}
- for (Partition partition : partitions) {
- if (!checkPartition(partition, newBucketPositions, newSortPositions,
- pGraphContext.getFsopToTable().get(fsOp).getNumBuckets())) {
+ sourceTableBucketCols.add((ExprNodeColumnDesc) keyBigTable);
+ sourceTableSortCols.add((ExprNodeColumnDesc) keyBigTable);
+ }
+
+ // since it is a sort-merge join, only follow the big table
+ op = op.getParentOperators().get(posBigTable);
+ } else {
+ // nothing to be done for filters - the output schema does not change.
+ if (op instanceof TableScanOperator) {
+ assert !useBucketSortPositions;
+ Table srcTable = pGraphContext.getTopToTable().get(op);
+
+ // Find the positions of the bucketed columns in the table corresponding
+ // to the select list.
+ // Consider the following scenario:
+ // T1(key, value1, value2) bucketed/sorted by key into 2 buckets
+ // T2(dummy, key, value1, value2) bucketed/sorted by key into 2 buckets
+ // A query like: insert overwrite table T2 select 1, key, value1, value2 from T1
+ // should be optimized.
+
+ // Start with the destination: T2, bucketed/sorted position is [1]
+ // At the source T1, the column corresponding to that position is [key], which
+ // maps to column [0] of T1, which is also bucketed/sorted into the same
+ // number of buckets
+ List<Integer> newBucketPositions = new ArrayList<Integer>();
+ for (int pos = 0; pos < bucketPositions.size(); pos++) {
+ ExprNodeColumnDesc col = sourceTableBucketCols.get(pos);
+ String colName = col.getColumn();
+ int bucketPos = findColumnPosition(srcTable.getCols(), colName);
+ if (bucketPos < 0) {
return null;
}
+ newBucketPositions.add(bucketPos);
}
- removeReduceSink(rsOp, (TableScanOperator) op, fsOp,
- partitions.get(0).getSortedPaths());
- return null;
- }
- else {
- if (!checkTable(srcTable, newBucketPositions, newSortPositions,
- pGraphContext.getFsopToTable().get(fsOp).getNumBuckets())) {
- return null;
+ // Find the positions/order of the sorted columns in the table corresponding
+ // to the select list.
+ List<Integer> newSortPositions = new ArrayList<Integer>();
+ for (int pos = 0; pos < sortPositions.size(); pos++) {
+ ExprNodeColumnDesc col = sourceTableSortCols.get(pos);
+ String colName = col.getColumn();
+ int sortPos = findColumnPosition(srcTable.getCols(), colName);
+ if (sortPos < 0) {
+ return null;
+ }
+ newSortPositions.add(sortPos);
}
- removeReduceSink(rsOp, (TableScanOperator) op, fsOp, srcTable.getSortedPaths());
- return null;
- }
- }
- // None of the operators is changing the positions
- else if (op instanceof SelectOperator) {
- SelectOperator selectOp = (SelectOperator) op;
- SelectDesc selectDesc = selectOp.getConf();
+ if (srcTable.isPartitioned()) {
+ PrunedPartitionList prunedParts = pGraphContext.getOpToPartList().get(op);
+ List<Partition> partitions = prunedParts.getNotDeniedPartns();
+
+ // Support for dynamic partitions can be added later
+ // The following is not optimized:
+ // insert overwrite table T1(ds='1', hr) select key, value, hr from T2 where ds = '1';
+ // where T1 and T2 are bucketed by the same keys and partitioned by ds. hr
+ if ((partitions == null) || (partitions.isEmpty()) || (partitions.size() > 1)) {
+ return null;
+ }
+ for (Partition partition : partitions) {
+ if (!checkPartition(partition, newBucketPositions, newSortPositions, sortOrder,
+ numBucketsDestination)) {
+ return null;
+ }
+ }
- // There may be multiple selects - chose the one closest to the table
- sourceTableBucketCols.clear();
- sourceTableSortCols.clear();
+ removeReduceSink(rsOp, (TableScanOperator) op, fsOp,
+ partitions.get(0).getSortedPaths());
+ return null;
+ }
+ else {
+ if (!checkTable(srcTable, newBucketPositions, newSortPositions, sortOrder,
+ numBucketsDestination)) {
+ return null;
+ }
- // Only columns can be selected for both sorted and bucketed positions
- for (int pos : bucketPositions) {
- ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
- if (!(selectColList instanceof ExprNodeColumnDesc)) {
+ removeReduceSink(rsOp, (TableScanOperator) op, fsOp, srcTable.getSortedPaths());
return null;
}
- sourceTableBucketCols.add((ExprNodeColumnDesc) selectColList);
}
+ // None of the operators is changing the positions
+ else if (op instanceof SelectOperator) {
+ SelectOperator selectOp = (SelectOperator) op;
+ SelectDesc selectDesc = selectOp.getConf();
+
+ // Iterate backwards, from the destination table to the top of the tree
+ // Based on the output column names, get the new columns.
+ if (!useBucketSortPositions) {
+ bucketPositions.clear();
+ sortPositions.clear();
+ List<String> outputColumnNames = selectDesc.getOutputColumnNames();
+
+ for (ExprNodeColumnDesc col : sourceTableBucketCols) {
+ String colName = col.getColumn();
+ int colPos = outputColumnNames.indexOf(colName);
+ if (colPos < 0) {
+ return null;
+ }
+ bucketPositions.add(colPos);
+ }
- for (ObjectPair<Integer, Integer> pos : sortPositions) {
- ExprNodeDesc selectColList = selectDesc.getColList().get(pos.getFirst());
- if (!(selectColList instanceof ExprNodeColumnDesc)) {
- return null;
+ for (ExprNodeColumnDesc col : sourceTableSortCols) {
+ String colName = col.getColumn();
+ int colPos = outputColumnNames.indexOf(colName);
+ if (colPos < 0) {
+ return null;
+ }
+ sortPositions.add(colPos);
+ }
+ }
+
+ // There may be multiple selects - chose the one closest to the table
+ sourceTableBucketCols.clear();
+ sourceTableSortCols.clear();
+
+ // Only columns can be selected for both sorted and bucketed positions
+ for (int pos : bucketPositions) {
+ ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
+ if (!(selectColList instanceof ExprNodeColumnDesc)) {
+ return null;
+ }
+ sourceTableBucketCols.add((ExprNodeColumnDesc) selectColList);
+ }
+
+ for (int pos : sortPositions) {
+ ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
+ if (!(selectColList instanceof ExprNodeColumnDesc)) {
+ return null;
+ }
+ sourceTableSortCols.add((ExprNodeColumnDesc) selectColList);
}
- sourceTableSortCols.add((ExprNodeColumnDesc) selectColList);
+
+ useBucketSortPositions = false;
}
+ op = op.getParentOperators().get(0);
}
}
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Wed Apr 17 07:29:38 2013
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.optimizer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -68,7 +67,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PTFDesc;
import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.ShapeDetails;
import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
@@ -79,8 +77,6 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
/**
* Factory for generating the different node processors used by ColumnPruner.
@@ -163,16 +159,11 @@ public final class ColumnPrunerProcFacto
/**
* - Pruning can only be done for Windowing. PTFs are black boxes,
- * we assume all columns are needed.
+ * we assume all columns are needed.
* - add column names referenced in WindowFn args and in WindowFn expressions
- * to the pruned list of the child Select Op.
- * - Prune the Column names & types serde properties in each of the Shapes in the PTF Chain:
- * - the InputDef's output shape
- * - Window Tabl Functions: window output shape & output shape.
- * - Why is pruning the Column names & types in the serde properties enough?
- * - because during runtime we rebuild the OIs using these properties.
+ * to the pruned list of the child Select Op.
* - finally we set the prunedColList on the ColumnPrunerContx;
- * and update the RR & signature on the PTFOp.
+ * and update the RR & signature on the PTFOp.
*/
public static class ColumnPrunerPTFProc implements NodeProcessor {
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
@@ -194,10 +185,6 @@ public final class ColumnPrunerProcFacto
//we create a copy of prunedCols to create a list of pruned columns for PTFOperator
prunedCols = new ArrayList<String>(prunedCols);
prunedColumnsList(prunedCols, def);
- setSerdePropsOfShape(def.getInput().getOutputShape(), prunedCols);
- setSerdePropsOfShape(def.getOutputFromWdwFnProcessing(), prunedCols);
- setSerdePropsOfShape(def.getOutputShape(), prunedCols);
-
RowResolver oldRR = cppCtx.getOpToParseCtxMap().get(op).getRowResolver();
RowResolver newRR = buildPrunedRR(prunedCols, oldRR, sig);
cppCtx.getPrunedColLists().put(op, prunedInputList(prunedCols, def));
@@ -255,47 +242,6 @@ public final class ColumnPrunerProcFacto
}
}
- private List<String> getLowerCasePrunedCols(List<String> prunedCols){
- List<String> lowerCasePrunedCols = new ArrayList<String>();
- for (String col : prunedCols) {
- lowerCasePrunedCols.add(col.toLowerCase());
- }
- return lowerCasePrunedCols;
- }
-
- /*
- * reconstruct Column names & types list based on the prunedCols list.
- */
- private void setSerdePropsOfShape(ShapeDetails shp, List<String> prunedCols) {
- List<String> columnNames = Arrays.asList(shp.getSerdeProps().get(
- org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS).split(","));
- List<TypeInfo> columnTypes = TypeInfoUtils
- .getTypeInfosFromTypeString(shp.getSerdeProps().get(
- org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES));
- /*
- * fieldNames in OI are lower-cased. So we compare lower cased names for now.
- */
- prunedCols = getLowerCasePrunedCols(prunedCols);
-
- StringBuilder cNames = new StringBuilder();
- StringBuilder cTypes = new StringBuilder();
-
- boolean addComma = false;
- for(int i=0; i < columnNames.size(); i++) {
- if ( prunedCols.contains(columnNames.get(i)) ) {
- cNames.append(addComma ? "," : "");
- cTypes.append(addComma ? "," : "");
- cNames.append(columnNames.get(i));
- cTypes.append(columnTypes.get(i));
- addComma = true;
- }
- }
- shp.getSerdeProps().put(
- org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS, cNames.toString());
- shp.getSerdeProps().put(
- org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES, cTypes.toString());
- }
-
/*
* from the prunedCols list filter out columns that refer to WindowFns or WindowExprs
* the returned list is set as the prunedList needed by the PTFOp.
@@ -749,71 +695,39 @@ public final class ColumnPrunerProcFacto
ReduceSinkOperator reduce, ColumnPrunerProcCtx cppCtx) throws SemanticException {
ReduceSinkDesc reduceConf = reduce.getConf();
Map<String, ExprNodeDesc> oldMap = reduce.getColumnExprMap();
- Map<String, ExprNodeDesc> newMap = new HashMap<String, ExprNodeDesc>();
- ArrayList<ColumnInfo> sig = new ArrayList<ColumnInfo>();
RowResolver oldRR = cppCtx.getOpToParseCtxMap().get(reduce).getRowResolver();
- RowResolver newRR = new RowResolver();
- ArrayList<String> originalValueOutputColNames = reduceConf
- .getOutputValueColumnNames();
- java.util.ArrayList<ExprNodeDesc> originalValueEval = reduceConf
- .getValueCols();
- ArrayList<String> newOutputColNames = new ArrayList<String>();
- java.util.ArrayList<ExprNodeDesc> newValueEval = new ArrayList<ExprNodeDesc>();
- // ReduceSinkOperators that precede GroupByOperators have the keys in the schema in addition
- // to the values. These are not pruned.
- List<ColumnInfo> oldSchema = oldRR.getRowSchema().getSignature();
- for (ColumnInfo colInfo : oldSchema) {
- if (colInfo.getInternalName().startsWith(Utilities.ReduceField.KEY.toString() + ".")) {
- String[] nm = oldRR.reverseLookup(colInfo.getInternalName());
- newRR.put(nm[0], nm[1], colInfo);
- sig.add(colInfo);
- } else {
- break;
- }
- }
+ ArrayList<ColumnInfo> signature = oldRR.getRowSchema().getSignature();
+
+ List<String> valueColNames = reduceConf.getOutputValueColumnNames();
+ ArrayList<String> newValueColNames = new ArrayList<String>();
+
+ List<ExprNodeDesc> valueExprs = reduceConf.getValueCols();
+ ArrayList<ExprNodeDesc> newValueExprs = new ArrayList<ExprNodeDesc>();
+
for (int i = 0; i < retainFlags.length; i++) {
- if (retainFlags[i]) {
- newValueEval.add(originalValueEval.get(i));
- String outputCol = originalValueOutputColNames.get(i);
- newOutputColNames.add(outputCol);
+ String outputCol = valueColNames.get(i);
+ ExprNodeDesc outputColExpr = valueExprs.get(i);
+ if (!retainFlags[i]) {
String[] nm = oldRR.reverseLookup(outputCol);
if (nm == null) {
outputCol = Utilities.ReduceField.VALUE.toString() + "." + outputCol;
nm = oldRR.reverseLookup(outputCol);
}
- newMap.put(outputCol, oldMap.get(outputCol));
- ColumnInfo colInfo = oldRR.get(nm[0], nm[1]);
- newRR.put(nm[0], nm[1], colInfo);
- sig.add(colInfo);
- }
- }
-
- ArrayList<ExprNodeDesc> keyCols = reduceConf.getKeyCols();
- List<String> keys = new ArrayList<String>();
- RowResolver parResover = cppCtx.getOpToParseCtxMap().get(
- reduce.getParentOperators().get(0)).getRowResolver();
- for (int i = 0; i < keyCols.size(); i++) {
- keys = Utilities.mergeUniqElems(keys, keyCols.get(i).getCols());
- }
- for (int i = 0; i < keys.size(); i++) {
- String outputCol = keys.get(i);
- String[] nm = parResover.reverseLookup(outputCol);
- ColumnInfo colInfo = oldRR.get(nm[0], nm[1]);
- if (colInfo != null) {
- String internalName=colInfo.getInternalName();
- newMap.put(internalName, oldMap.get(internalName));
- newRR.put(nm[0], nm[1], colInfo);
+ ColumnInfo colInfo = oldRR.getFieldMap(nm[0]).remove(nm[1]);
+ oldRR.getInvRslvMap().remove(colInfo.getInternalName());
+ oldMap.remove(outputCol);
+ signature.remove(colInfo);
+ } else {
+ newValueColNames.add(outputCol);
+ newValueExprs.add(outputColExpr);
}
}
- cppCtx.getOpToParseCtxMap().get(reduce).setRowResolver(newRR);
- reduce.setColumnExprMap(newMap);
- reduce.getSchema().setSignature(sig);
- reduceConf.setOutputValueColumnNames(newOutputColNames);
- reduceConf.setValueCols(newValueEval);
+ reduceConf.setOutputValueColumnNames(newValueColNames);
+ reduceConf.setValueCols(newValueExprs);
TableDesc newValueTable = PlanUtils.getReduceValueTableDesc(PlanUtils
.getFieldSchemasFromColumnList(reduceConf.getValueCols(),
- newOutputColNames, 0, ""));
+ newValueColNames, 0, ""));
reduceConf.setValueSerializeInfo(newValueTable);
}
@@ -1042,4 +956,4 @@ public final class ColumnPrunerProcFacto
return new ColumnPrunerMapJoinProc();
}
-}
\ No newline at end of file
+}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Wed Apr 17 07:29:38 2013
@@ -204,7 +204,9 @@ public class GenMRFileSink1 implements N
(MapredWork) currTask.getWork(), false, ctx);
}
- if (!rootTasks.contains(currTask)) {
+ if (!rootTasks.contains(currTask)
+ && (currTask.getParentTasks() == null
+ || currTask.getParentTasks().isEmpty())) {
rootTasks.add(currTask);
}
}
@@ -721,7 +723,9 @@ public class GenMRFileSink1 implements N
(MapredWork) currTask.getWork(), false, ctx);
}
opTaskMap.put(null, currTask);
- if (!rootTasks.contains(currTask)) {
+ if (!rootTasks.contains(currTask)
+ && (currTask.getParentTasks() == null
+ || currTask.getParentTasks().isEmpty())) {
rootTasks.add(currTask);
}
} else {
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Wed Apr 17 07:29:38 2013
@@ -264,7 +264,9 @@ public class GenMRUnion1 implements Node
else {
// is the current task a root task
if (shouldBeRootTask(currTask)
- && (!ctx.getRootTasks().contains(currTask))) {
+ && !ctx.getRootTasks().contains(currTask)
+ && (currTask.getParentTasks() == null
+ || currTask.getParentTasks().isEmpty())) {
ctx.getRootTasks().add(currTask);
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Wed Apr 17 07:29:38 2013
@@ -106,7 +106,9 @@ public final class GenMapRedUtils {
List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
- if (!rootTasks.contains(currTask)) {
+ if (!rootTasks.contains(currTask)
+ && (currTask.getParentTasks() == null
+ || currTask.getParentTasks().isEmpty())) {
rootTasks.add(currTask);
}
if (reducer.getClass() == JoinOperator.class) {
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Wed Apr 17 07:29:38 2013
@@ -163,8 +163,11 @@ public final class MapJoinFactory {
opTaskMap.put(op, currTask);
List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
- assert (!rootTasks.contains(currTask));
- rootTasks.add(currTask);
+ if(!rootTasks.contains(currTask)
+ && (currTask.getParentTasks() == null
+ || currTask.getParentTasks().isEmpty())) {
+ rootTasks.add(currTask);
+ }
assert currTopOp != null;
opProcCtx.getSeenOps().add(currTopOp);
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Wed Apr 17 07:29:38 2013
@@ -397,9 +397,8 @@ public class MapJoinProcessor implements
byte srcTag = entry.getKey();
List<ExprNodeDesc> filter = entry.getValue();
- Operator<?> start = oldReduceSinkParentOps.get(srcTag);
- Operator<?> terminal = newParentOps.get(srcTag);
- newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, start, terminal));
+ Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
+ newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
}
desc.setFilters(filters = newFilters);
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java Wed Apr 17 07:29:38 2013
@@ -27,7 +27,6 @@ import java.util.Set;
import java.util.Stack;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
@@ -84,8 +83,6 @@ public class NonBlockingOpDeDupProc impl
// For SEL-SEL(compute) case, move column exprs/names of child to parent.
if (!cSEL.getConf().isSelStarNoCompute()) {
- Operator<?> terminal = ExprNodeDescUtils.getSingleParent(pSEL, null);
-
Set<String> funcOutputs = getFunctionOutputs(
pSEL.getConf().getOutputColumnNames(), pSEL.getConf().getColList());
@@ -93,7 +90,7 @@ public class NonBlockingOpDeDupProc impl
if (!funcOutputs.isEmpty() && !checkReferences(sources, funcOutputs)) {
return null;
}
- pSEL.getConf().setColList(ExprNodeDescUtils.backtrack(sources, pSEL, terminal));
+ pSEL.getConf().setColList(ExprNodeDescUtils.backtrack(sources, cSEL, pSEL));
pSEL.getConf().setOutputColumnNames(cSEL.getConf().getOutputColumnNames());
// updates schema only (this should be the last optimizer modifying operator tree)