You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/01/16 07:46:07 UTC
svn commit: r899891 [2/31] - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/...
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Sat Jan 16 06:44:01 2010
@@ -19,280 +19,381 @@
package org.apache.hadoop.hive.ql.exec.persistence;
import java.io.File;
-import java.io.RandomAccessFile;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* Simple persistent container for rows.
- *
+ *
* This container interface only accepts adding or appending new rows and
* iterating through the rows in the order of their insertions.
- *
- * The iterator interface is a lightweight first()/next() API rather than
- * the Java Iterator interface. This way we do not need to create
- * an Iterator object every time we want to start a new iteration. Below is
- * simple example of how to convert a typical Java's Iterator code to the LW
- * iterator iterface.
*
- * Itereator itr = rowContainer.iterator();
- * while (itr.hasNext()) {
- * v = itr.next();
- * // do anything with v
- * }
+ * The iterator interface is a lightweight first()/next() API rather than the
+ * Java Iterator interface. This way we do not need to create an Iterator object
+ * every time we want to start a new iteration. Below is simple example of how
+ * to convert a typical Java's Iterator code to the LW iterator iterface.
+ *
+ * Itereator itr = rowContainer.iterator(); while (itr.hasNext()) { v =
+ * itr.next(); // do anything with v }
*
* can be rewritten to:
*
- * for ( v = rowContainer.first();
- * v != null;
- * v = rowContainer.next()) {
- * // do anything with v
- * }
- *
- * The adding and iterating operations can be interleaving.
- *
+ * for ( v = rowContainer.first(); v != null; v = rowContainer.next()) { // do
+ * anything with v }
+ *
+ * Once the first is called, it will not be able to write again. So there can
+ * not be any writes after read. It can be read multiple times, but it does not
+ * support multiple reader interleaving reading.
+ *
*/
-public class RowContainer<Row extends List> {
+public class RowContainer<Row extends List<Object>> {
protected Log LOG = LogFactory.getLog(this.getClass().getName());
// max # of rows can be put into one block
private static final int BLOCKSIZE = 25000;
- private static final int BLKMETA_LEN = 100; // default # of block metadata: (offset,length) pair
- private Row[] lastBlock; // the last block that add() should append to
- private Row[] currBlock; // the current block where the cursor is in
+ private Row[] currentWriteBlock; // the last block that add() should append to
+ private Row[] currentReadBlock; // the current block where the cursor is in
+ // since currentReadBlock may assigned to currentWriteBlock, we need to store
+ // orginal read block
+ private Row[] firstReadBlockPointer;
private int blockSize; // number of objects in the block before it is spilled to disk
- private int numBlocks; // total # of blocks
+ private int numFlushedBlocks; // total # of blocks
private int size; // total # of elements in the RowContainer
private File tmpFile; // temporary file holding the spilled blocks
- private RandomAccessFile rFile; // random access file holding the data
- private long[] off_len; // offset length pair: i-th position is offset, (i+1)-th position is length
+ Path tempOutPath = null;
+ private File parentFile;
private int itrCursor; // iterator cursor in the currBlock
+ private int readBlockSize; //size of current read block
private int addCursor; // append cursor in the lastBlock
- private int pBlock; // pointer to the iterator block
private SerDe serde; // serialization/deserialization for the row
private ObjectInspector standardOI; // object inspector for the row
- private ArrayList dummyRow; // representing empty row (no columns since value art is null)
- public RowContainer() {
- this(BLOCKSIZE);
+ private List<Object> keyObject;
+
+ private tableDesc tblDesc;
+
+ boolean firstCalled = false; //once called first, it will never be able to write again.
+ int acutalSplitNum = 0;
+ int currentSplitPointer = 0;
+ org.apache.hadoop.mapred.RecordReader rr = null; //record reader
+ RecordWriter rw = null;
+ InputFormat<WritableComparable, Writable> inputFormat = null;
+ InputSplit[] inputSplits = null;
+ private Row dummyRow = null;
+
+ Writable val = null; //cached to use serialize data
+
+ JobConf jobCloneUsingLocalFs = null;
+ private LocalFileSystem localFs;
+
+ public RowContainer(Configuration jc) throws HiveException {
+ this(BLOCKSIZE, jc);
}
- public RowContainer(int blockSize) {
+ public RowContainer(int blockSize, Configuration jc) throws HiveException {
// no 0-sized block
this.blockSize = blockSize == 0 ? BLOCKSIZE : blockSize;
this.size = 0;
this.itrCursor = 0;
this.addCursor = 0;
- this.numBlocks = 0;
- this.pBlock = 0;
+ this.numFlushedBlocks = 0;
this.tmpFile = null;
- this.lastBlock = (Row[]) new ArrayList[blockSize];
- this.currBlock = this.lastBlock;
- this.off_len = new long[BLKMETA_LEN * 2];
+ this.currentWriteBlock = (Row[]) new ArrayList[blockSize];
+ this.currentReadBlock = this.currentWriteBlock;
+ this.firstReadBlockPointer = currentReadBlock;
this.serde = null;
this.standardOI= null;
- this.dummyRow = new ArrayList(0);
+ try {
+ this.localFs = FileSystem.getLocal(jc);
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+ this.jobCloneUsingLocalFs = new JobConf(jc);
+ HiveConf.setVar(jobCloneUsingLocalFs, HiveConf.ConfVars.HADOOPFS, Utilities.HADOOP_LOCAL_FS);
}
- public RowContainer(int blockSize, SerDe sd, ObjectInspector oi) {
- this(blockSize);
+ public RowContainer(int blockSize, SerDe sd, ObjectInspector oi, Configuration jc) throws HiveException {
+ this(blockSize, jc);
setSerDe(sd, oi);
}
public void setSerDe(SerDe sd, ObjectInspector oi) {
- assert serde != null : "serde is null";
- assert oi != null : "oi is null";
this.serde = sd;
this.standardOI = oi;
}
public void add(Row t) throws HiveException {
- if ( addCursor >= blockSize ) { // spill the current block to tmp file
- spillBlock(lastBlock);
- addCursor = 0;
- if ( numBlocks == 1 )
- lastBlock = (Row[]) new ArrayList[blockSize];
- }
- lastBlock[addCursor++] = t;
+ if(this.tblDesc != null) {
+ if ( addCursor >= blockSize ) { // spill the current block to tmp file
+ spillBlock(currentWriteBlock, addCursor);
+ addCursor = 0;
+ if ( numFlushedBlocks == 1 )
+ currentWriteBlock = (Row[]) new ArrayList[blockSize];
+ }
+ currentWriteBlock[addCursor++] = t;
+ } else if(t != null) {
+ // the tableDesc will be null in the case that all columns in that table
+ // is not used. we use a dummy row to denote all rows in that table, and
+ // the dummy row is added by caller.
+ this.dummyRow = t;
+ }
++size;
}
- public Row first() {
+ public Row first() throws HiveException {
if ( size == 0 )
return null;
- if ( pBlock > 0 ) {
- pBlock = 0;
- currBlock = getBlock(0);
- assert currBlock != null: "currBlock == null";
- }
- if ( currBlock == null && lastBlock != null ) {
- currBlock = lastBlock;
- }
- assert pBlock == 0: "pBlock != 0 ";
- itrCursor = 1;
- return currBlock[0];
- }
-
- public Row next() {
- assert pBlock<= numBlocks: "pBlock " + pBlock + " > numBlocks" + numBlocks; // pBlock should not be greater than numBlocks;
- if ( pBlock < numBlocks ) {
- if ( itrCursor < blockSize ) {
- return currBlock[itrCursor++];
- } else if ( ++pBlock < numBlocks ) {
- currBlock = getBlock(pBlock);
- assert currBlock != null: "currBlock == null";
- itrCursor = 1;
- return currBlock[0];
- } else {
- itrCursor = 0;
- currBlock = lastBlock;
- }
- }
- // last block (pBlock == numBlocks)
- if ( itrCursor < addCursor )
- return currBlock[itrCursor++];
- else
- return null;
- }
-
- private void spillBlock(Row[] block) throws HiveException {
try {
- if ( tmpFile == null ) {
- tmpFile = File.createTempFile("RowContainer", ".tmp", new File("/tmp"));
- LOG.info("RowContainer created temp file " + tmpFile.getAbsolutePath());
- // Delete the temp file if the JVM terminate normally through Hadoop job kill command.
- // Caveat: it won't be deleted if JVM is killed by 'kill -9'.
- tmpFile.deleteOnExit();
- rFile = new RandomAccessFile(tmpFile, "rw");
+ firstCalled = true;
+ // when we reach here, we must have some data already (because size >0).
+ // We need to see if there are any data flushed into file system. If not, we can
+ // directly read from the current write block. Otherwise, we need to read
+ // from the beginning of the underlying file.
+ this.itrCursor = 0;
+ closeWriter();
+ closeReader();
+
+ if(tblDesc == null) {
+ this.itrCursor ++;
+ return dummyRow;
}
- byte[] buf = serialize(block);
- long offset = rFile.length();
- long len = buf.length;
- // append the block at the end
- rFile.seek(offset);
- rFile.write(buf);
- // maintain block metadata
- addBlockMetadata(offset, len);
+ this.currentReadBlock = this.firstReadBlockPointer;
+ if (this.numFlushedBlocks == 0) {
+ this.readBlockSize = this.addCursor;
+ this.currentReadBlock = this.currentWriteBlock;
+ } else {
+ if (inputSplits == null) {
+ if (this.inputFormat == null)
+ inputFormat = (InputFormat<WritableComparable, Writable>) ReflectionUtils.newInstance(tblDesc.getInputFileFormatClass(),
+ jobCloneUsingLocalFs);
+
+ HiveConf.setVar(jobCloneUsingLocalFs,
+ HiveConf.ConfVars.HADOOPMAPREDINPUTDIR,
+ org.apache.hadoop.util.StringUtils.escapeString(parentFile.getAbsolutePath()));
+ inputSplits = inputFormat.getSplits(jobCloneUsingLocalFs, 1);
+ acutalSplitNum = inputSplits.length;
+ }
+ currentSplitPointer = 0;
+ rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer],
+ jobCloneUsingLocalFs, Reporter.NULL);
+ currentSplitPointer++;
+
+ nextBlock();
+ }
+ // we are guaranteed that we can get data here (since 'size' is not zero)
+ Row ret = currentReadBlock[itrCursor++];
+ removeKeys(ret);
+ return ret;
} catch (Exception e) {
- LOG.debug(e.toString());
throw new HiveException(e);
}
+
}
-
- /**
- * Maintain the blocks meta data: number of blocks, and the block (offset, length)
- * pair.
- * @param offset offset of the tmp file where the block was serialized.
- * @param len the length of the serialized block in the temp file.
- */
- private void addBlockMetadata(long offset, long len) {
- if ( (numBlocks+1) * 2 >= off_len.length ) { // expand (offset, len) array
- off_len = Arrays.copyOf(off_len, off_len.length*2);
- }
- off_len[numBlocks*2] = offset;
- off_len[numBlocks*2+1] = len;
- ++numBlocks;
- }
-
- /**
- * Serialize the object into a byte array.
- * @param obj object needed to be serialized
- * @return the byte array that contains the serialized array.
- * @throws IOException
- */
- private byte[] serialize(Row[] obj) throws HiveException {
- assert(serde != null && standardOI != null);
+
+ public Row next() throws HiveException {
+
+ if(!firstCalled)
+ throw new RuntimeException("Call first() then call next().");
+
+ if ( size == 0 )
+ return null;
- ByteArrayOutputStream baos;
- DataOutputStream oos;
+ if(tblDesc == null) {
+ if(this.itrCursor < size) {
+ this.itrCursor++;
+ return dummyRow;
+ }
+ return null;
+ }
+ Row ret;
+ if(itrCursor < this.readBlockSize) {
+ ret = this.currentReadBlock[itrCursor++];
+ removeKeys(ret);
+ return ret;
+ }
+ else {
+ nextBlock();
+ if ( this.readBlockSize == 0) {
+ if (currentWriteBlock != null && currentReadBlock != currentWriteBlock) {
+ this.itrCursor = 0;
+ this.readBlockSize = this.addCursor;
+ this.firstReadBlockPointer = this.currentReadBlock;
+ currentReadBlock = currentWriteBlock;
+ } else {
+ return null;
+ }
+ }
+ return next();
+ }
+ }
+
+ private void removeKeys(Row ret) {
+ if (this.keyObject != null
+ && this.currentReadBlock != this.currentWriteBlock) {
+ int len = this.keyObject.size();
+ int rowSize = ((ArrayList)ret).size();
+ for(int i=0;i<len;i++) {
+ ((ArrayList) ret).remove(rowSize - i - 1);
+ }
+ }
+ }
+
+ ArrayList<Object> row = new ArrayList<Object>(2);
+ private void spillBlock(Row[] block, int length) throws HiveException {
try {
- baos = new ByteArrayOutputStream();
- oos = new DataOutputStream(baos);
+ if ( tmpFile == null ) {
+
+ String suffix = ".tmp";
+ if(this.keyObject != null)
+ suffix = "." + this.keyObject.toString() + suffix;
+
+ while(true) {
+ String parentId = "hive-rowcontainer" + Utilities.randGen.nextInt();
+ parentFile = new File("/tmp/"+ parentId);
+ boolean success = parentFile.mkdir();
+ if(success)
+ break;
+ LOG.debug("retry creating tmp row-container directory...");
+ }
+
+ tmpFile = File.createTempFile("RowContainer", suffix, parentFile);
+ LOG.info("RowContainer created temp file " + tmpFile.getAbsolutePath());
+ // Delete the temp file if the JVM terminate normally through Hadoop job kill command.
+ // Caveat: it won't be deleted if JVM is killed by 'kill -9'.
+ parentFile.deleteOnExit();
+ tmpFile.deleteOnExit();
+
+ // rFile = new RandomAccessFile(tmpFile, "rw");
+ HiveOutputFormat<?, ?> hiveOutputFormat = tblDesc.getOutputFileFormatClass().newInstance();
+ tempOutPath = new Path(tmpFile.toString());
+ rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs, hiveOutputFormat,
+ serde.getSerializedClass(), false, tblDesc.getProperties(), tempOutPath);
+ } else if (rw == null) {
+ throw new HiveException("RowContainer has already been closed for writing.");
+ }
- // # of rows
- oos.writeInt(obj.length);
+ row.clear();
+ row.add(null);
+ row.add(null);
- // if serde or OI is null, meaning the join value is null, we don't need
- // to serialize anything to disk, just need to keep the length.
- if ( serde != null && standardOI != null ) {
- for ( int i = 0; i < obj.length; ++i ) {
- Writable outVal = serde.serialize(obj[i], standardOI);
- outVal.write(oos);
+ if (this.keyObject != null) {
+ row.set(1, this.keyObject);
+ for (int i = 0; i < length; ++i) {
+ Row currentValRow = block[i];
+ row.set(0, currentValRow);
+ Writable outVal = serde.serialize(row, standardOI);
+ rw.write(outVal);
+ }
+ }else {
+ for ( int i = 0; i < length; ++i ) {
+ Row currentValRow = block[i];
+ Writable outVal = serde.serialize(currentValRow, standardOI);
+ rw.write(outVal);
}
}
- oos.close();
+
+ if(block == this.currentWriteBlock)
+ this.addCursor = 0;
+
+ this.numFlushedBlocks ++;
} catch (Exception e) {
- e.printStackTrace();
+ clear();
+ LOG.error(e.toString(), e);
throw new HiveException(e);
}
- return baos.toByteArray();
}
/**
- * Deserialize an object from a byte array
- * @param buf the byte array containing the serialized object.
- * @return the serialized object.
+ * Get the number of elements in the RowContainer.
+ * @return number of elements in the RowContainer
*/
- private Row[] deserialize(byte[] buf) throws HiveException {
- ByteArrayInputStream bais;
- DataInputStream ois;
+ public int size() {
+ return size;
+ }
+
+ private boolean nextBlock() throws HiveException {
+ itrCursor = 0;
+ this.readBlockSize = 0;
+ if (this.numFlushedBlocks == 0) return false;
try {
- bais = new ByteArrayInputStream(buf);
- ois = new DataInputStream(bais);
- int sz = ois.readInt();
- assert sz == blockSize:
- "deserialized size " + sz + " is not the same as block size " + blockSize;
- Row[] ret = (Row[]) new ArrayList[sz];
+ if(val == null)
+ val = serde.getSerializedClass().newInstance();
+ boolean nextSplit = true;
+ int i = 0;
- // if serde or OI is null, meaning the join value is null, we don't need
- // to serialize anything to disk, just need to keep the length.
- for ( int i = 0; i < sz; ++i ) {
- if ( serde != null && standardOI != null ) {
- Writable val = serde.getSerializedClass().newInstance();
- val.readFields(ois);
-
- ret[i] = (Row) ObjectInspectorUtils.copyToStandardObject(
- serde.deserialize(val),
- serde.getObjectInspector(),
- ObjectInspectorCopyOption.WRITABLE);
- } else {
- ret[i] = (Row) dummyRow;
+ if(rr != null) {
+ Object key = rr.createKey();
+ while (i < this.currentReadBlock.length && rr.next(key, val)) {
+ nextSplit = false;
+ this.currentReadBlock[i++] = (Row) ObjectInspectorUtils.copyToStandardObject(
+ serde.deserialize(val),
+ serde.getObjectInspector(),
+ ObjectInspectorCopyOption.WRITABLE);
}
}
- return ret;
+
+ if (nextSplit && this.currentSplitPointer < this.acutalSplitNum) {
+ //open record reader to read next split
+ rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
+ Reporter.NULL);
+ currentSplitPointer++;
+ return nextBlock();
+ }
+
+ this.readBlockSize = i;
+ return this.readBlockSize > 0;
} catch (Exception e) {
- e.printStackTrace();
+ LOG.error(e.getMessage(),e);
+ try {
+ this.clear();
+ } catch (HiveException e1) {
+ LOG.error(e.getMessage(),e);
+ }
throw new HiveException(e);
}
}
- /**
- * Get the number of elements in the RowContainer.
- * @return number of elements in the RowContainer
- */
- public int size() {
- return size;
+ public void copyToDFSDirecory(FileSystem destFs, Path destPath) throws IOException, HiveException {
+ if (addCursor > 0)
+ this.spillBlock(this.currentWriteBlock, addCursor);
+ if(tempOutPath == null || tempOutPath.toString().trim().equals(""))
+ return;
+ this.closeWriter();
+ LOG.info("RowContainer copied temp file " + tmpFile.getAbsolutePath()+ " to dfs directory " + destPath.toString());
+ destFs.copyFromLocalFile(true,tempOutPath, new Path(destPath, new Path(tempOutPath.getName())));
+ clear();
}
/**
@@ -301,33 +402,72 @@
public void clear() throws HiveException {
itrCursor = 0;
addCursor = 0;
- numBlocks = 0;
- pBlock = 0;
- size = 0;
+ numFlushedBlocks = 0;
+ this.readBlockSize = 0;
+ this.acutalSplitNum = 0;
+ this.currentSplitPointer = -1;
+ this.firstCalled = false;
+ this.inputSplits = null;
+ tempOutPath = null;
+ addCursor = 0;
+
+ size = 0;
try {
- if ( rFile != null )
- rFile.close();
- if ( tmpFile != null )
- tmpFile.delete();
+ if (rw != null)
+ rw.close(false);
+ if (rr != null)
+ rr.close();
} catch (Exception e) {
LOG.error(e.toString());
throw new HiveException(e);
+ } finally {
+ rw = null;
+ rr = null;
+ tmpFile = null;
+ deleteLocalFile(parentFile, true);
+ parentFile = null;
}
- tmpFile = null;
}
-
- private Row[] getBlock(int block) {
- long offset = off_len[block*2];
- long len = off_len[block*2+1];
- byte[] buf = new byte[(int)len];
- try {
- rFile.seek(offset);
- rFile.readFully(buf);
- currBlock = deserialize(buf);
+
+ private void deleteLocalFile(File file, boolean recursive) {
+ try{
+ if (file != null) {
+ if(!file.exists())
+ return;
+ if(file.isDirectory() && recursive) {
+ File[] files = file.listFiles();
+ for (int i = 0; i < files.length; i++)
+ deleteLocalFile(files[i], true);
+ }
+ boolean deleteSuccess = file.delete();
+ if(!deleteSuccess)
+ LOG.error("Error deleting tmp file:" + file.getAbsolutePath());
+ }
} catch (Exception e) {
- LOG.error(e.toString());
- return null;
+ LOG.error("Error deleting tmp file:" + file.getAbsolutePath(), e);
}
- return currBlock;
}
+
+ private void closeWriter() throws IOException {
+ if (this.rw != null) {
+ this.rw.close(false);
+ this.rw = null;
+ }
+ }
+
+ private void closeReader() throws IOException {
+ if (this.rr != null) {
+ this.rr.close();
+ this.rr = null;
+ }
+ }
+
+ public void setKeyObject(List<Object> dummyKey) {
+ this.keyObject = dummyKey;
+ }
+
+ public void setTableDesc(tableDesc tblDesc) {
+ this.tblDesc = tblDesc;
+ }
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Sat Jan 16 06:44:01 2010
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
@@ -30,7 +31,14 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
@@ -187,4 +195,44 @@
}
return true;
}
+
+
+ public static RecordWriter getHiveRecordWriter(JobConf jc,
+ tableDesc tableInfo, Class<? extends Writable> outputClass,
+ fileSinkDesc conf, Path outPath) throws HiveException {
+ try {
+ HiveOutputFormat<?, ?> hiveOutputFormat = tableInfo.getOutputFileFormatClass().newInstance();
+ boolean isCompressed = conf.getCompressed();
+ JobConf jc_output = jc;
+ if (isCompressed) {
+ jc_output = new JobConf(jc);
+ String codecStr = conf.getCompressCodec();
+ if (codecStr != null && !codecStr.trim().equals("")) {
+ Class<? extends CompressionCodec> codec = (Class<? extends CompressionCodec>) Class.forName(codecStr);
+ FileOutputFormat.setOutputCompressorClass(jc_output, codec);
+ }
+ String type = conf.getCompressType();
+ if (type != null && !type.trim().equals("")) {
+ CompressionType style = CompressionType.valueOf(type);
+ SequenceFileOutputFormat.setOutputCompressionType(jc, style);
+ }
+ }
+ return getRecordWriter(jc_output, hiveOutputFormat, outputClass,
+ isCompressed, tableInfo.getProperties(), outPath);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ public static RecordWriter getRecordWriter(JobConf jc,
+ HiveOutputFormat<?, ?> hiveOutputFormat,
+ final Class<? extends Writable> valueClass, boolean isCompressed,
+ Properties tableProp, Path outPath) throws IOException, HiveException {
+ if (hiveOutputFormat != null) {
+ return hiveOutputFormat.getHiveRecordWriter(jc, outPath, valueClass,
+ isCompressed, tableProp, null);
+ }
+ return null;
+ }
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Sat Jan 16 06:44:01 2010
@@ -205,8 +205,10 @@
List<Task<? extends Serializable>> mvTasks = ctx.getMvTask();
Task<? extends Serializable> mvTask = findMoveTask(mvTasks, newOutput);
- if (mvTask != null)
- cndTsk.addDependentTask(mvTask);
+ if (mvTask != null) {
+ for(Task<? extends Serializable> tsk : cndTsk.getListTasks())
+ tsk.addDependentTask(mvTask);
+ }
}
private Task<? extends Serializable> findMoveTask(List<Task<? extends Serializable>> mvTasks, FileSinkOperator fsOp) {
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin;
+import org.apache.hadoop.hive.ql.plan.ConditionalWork;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.fetchWork;
+import org.apache.hadoop.hive.ql.plan.joinDesc;
+import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.mapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.partitionDesc;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
+import org.apache.hadoop.hive.ql.plan.tableScanDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+public class GenMRSkewJoinProcessor {
+
+ public GenMRSkewJoinProcessor() {
+ }
+
+ /**
+ * Create tasks for processing skew joins. The idea is (HIVE-964) to use
+ * separated jobs and map-joins to handle skew joins.
+ * <p>
+ * <ul>
+ * <li>
+ * Number of mr jobs to handle skew keys is the number of table minus 1 (we
+ * can stream the last table, so big keys in the last table will not be a
+ * problem).
+ * <li>
+ * At runtime in Join, we output big keys in one table into one corresponding
+ * directories, and all same keys in other tables into different dirs(one for
+ * each table). The directories will look like:
+ * <ul>
+ * <li>
+ * dir-T1-bigkeys(containing big keys in T1), dir-T2-keys(containing keys
+ * which is big in T1),dir-T3-keys(containing keys which is big in T1), ...
+ * <li>
+ * dir-T1-keys(containing keys which is big in T2), dir-T2-bigkeys(containing
+ * big keys in T2),dir-T3-keys(containing keys which is big in T2), ...
+ * <li>
+ * dir-T1-keys(containing keys which is big in T3), dir-T2-keys(containing big
+ * keys in T3),dir-T3-bigkeys(containing keys which is big in T3), ... .....
+ * </ul>
+ * </ul>
+ * For each table, we launch one mapjoin job, taking the directory containing
+ * big keys in this table and corresponding dirs in other tables as input.
+ * (Actally one job for one row in the above.)
+ *
+ * <p>
+ * For more discussions, please check
+ * https://issues.apache.org/jira/browse/HIVE-964.
+ *
+ */
+ public static void processSkewJoin(JoinOperator joinOp,
+ Task<? extends Serializable> currTask, ParseContext parseCtx)
+ throws SemanticException {
+
+ // We are trying to adding map joins to handle skew keys, and map join right
+ // now does not work with outer joins
+ if (!GenMRSkewJoinProcessor.skewJoinEnabled(parseCtx.getConf(), joinOp))
+ return;
+
+ String baseTmpDir = parseCtx.getContext().getMRTmpFileURI();
+
+ joinDesc joinDescriptor = joinOp.getConf();
+ Map<Byte, List<exprNodeDesc>> joinValues = joinDescriptor.getExprs();
+ int numAliases = joinValues.size();
+
+ Map<Byte, String> bigKeysDirMap = new HashMap<Byte, String>();
+ Map<Byte, Map<Byte, String>> smallKeysDirMap = new HashMap<Byte, Map<Byte, String>>();
+ Map<Byte, String> skewJoinJobResultsDir = new HashMap<Byte, String>();
+ Byte[] tags = joinDescriptor.getTagOrder();
+ for (int i = 0; i < numAliases; i++) {
+ Byte alias = tags[i];
+ String bigKeysDir = getBigKeysDir(baseTmpDir, alias);
+ bigKeysDirMap.put(alias, bigKeysDir);
+ Map<Byte, String> smallKeysMap = new HashMap<Byte, String>();
+ smallKeysDirMap.put(alias, smallKeysMap);
+ for(Byte src2 : tags) {
+ if(!src2.equals(alias))
+ smallKeysMap.put(src2, getSmallKeysDir(baseTmpDir, alias, src2));
+ }
+ skewJoinJobResultsDir.put(alias, getBigKeysSkewJoinResultDir(baseTmpDir, alias));
+ }
+
+ joinDescriptor.setHandleSkewJoin(true);
+ joinDescriptor.setBigKeysDirMap(bigKeysDirMap);
+ joinDescriptor.setSmallKeysDirMap(smallKeysDirMap);
+ joinDescriptor.setSkewKeyDefinition(HiveConf.getIntVar(parseCtx.getConf(), HiveConf.ConfVars.HIVESKEWJOINKEY));
+
+ Map<String, Task<? extends Serializable>> bigKeysDirToTaskMap = new HashMap<String, Task<? extends Serializable>>();
+ List<Serializable> listWorks = new ArrayList<Serializable>();
+ List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
+ mapredWork currPlan = (mapredWork) currTask.getWork();
+
+ tableDesc keyTblDesc = (tableDesc) currPlan.getKeyDesc().clone();
+ List<String> joinKeys = Utilities.getColumnNames(keyTblDesc.getProperties());
+ List<String> joinKeyTypes = Utilities.getColumnTypes(keyTblDesc.getProperties());
+
+ Map<Byte, tableDesc> tableDescList = new HashMap<Byte, tableDesc>();
+ Map<Byte, List<exprNodeDesc>> newJoinValues = new HashMap<Byte, List<exprNodeDesc>>();
+ Map<Byte, List<exprNodeDesc>> newJoinKeys = new HashMap<Byte, List<exprNodeDesc>>();
+ List<tableDesc> newJoinValueTblDesc = new ArrayList<tableDesc>();// used for create mapJoinDesc, should be in order
+
+ for (int i = 0; i < tags.length; i++) // fill with null, otherwise we will expect NPE
+ newJoinValueTblDesc.add(null);
+
+ for (int i = 0; i < numAliases; i++) {
+ Byte alias = tags[i];
+ List<exprNodeDesc> valueCols = joinValues.get(alias);
+ String colNames = "";
+ String colTypes = "";
+ int columnSize = valueCols.size();
+ List<exprNodeDesc> newValueExpr = new ArrayList<exprNodeDesc>();
+ List<exprNodeDesc> newKeyExpr = new ArrayList<exprNodeDesc>();
+
+ boolean first = true;
+ for (int k = 0; k < columnSize; k++) {
+ TypeInfo type = valueCols.get(k).getTypeInfo();
+ String newColName = i + "_VALUE_" + k; // any name, it does not matter.
+ newValueExpr.add(new exprNodeColumnDesc(type, newColName, ""+i, false));
+ if(!first) {
+ colNames = colNames + ",";
+ colTypes = colTypes +",";
+ }
+ first = false;
+ colNames = colNames + newColName;
+ colTypes = colTypes + valueCols.get(k).getTypeString();
+ }
+
+ //we are putting join keys at last part of the spilled table
+ for (int k = 0; k < joinKeys.size(); k++) {
+ if(!first) {
+ colNames = colNames + ",";
+ colTypes = colTypes +",";
+ }
+ first = false;
+ colNames = colNames + joinKeys.get(k);
+ colTypes = colTypes + joinKeyTypes.get(k);
+ newKeyExpr.add(new exprNodeColumnDesc(TypeInfoFactory.getPrimitiveTypeInfo(joinKeyTypes.get(k)), joinKeys.get(k), ""+i, false));
+ }
+
+ newJoinValues.put(alias, newValueExpr);
+ newJoinKeys.put(alias, newKeyExpr);
+ tableDescList.put(alias, Utilities.getTableDesc(colNames, colTypes));
+
+ //construct value table Desc
+ String valueColNames ="";
+ String valueColTypes ="";
+ first = true;
+ for (int k = 0; k < columnSize; k++) {
+ String newColName = i + "_VALUE_" + k; // any name, it does not matter.
+ if(!first) {
+ valueColNames = valueColNames + ",";
+ valueColTypes = valueColTypes + ",";
+ }
+ valueColNames = valueColNames + newColName;
+ valueColTypes = valueColTypes + valueCols.get(k).getTypeString();
+ first = false;
+ }
+ newJoinValueTblDesc.set(Byte.valueOf((byte)i), Utilities.getTableDesc(valueColNames, valueColTypes));
+ }
+
+ joinDescriptor.setSkewKeysValuesTables(tableDescList);
+ joinDescriptor.setKeyTableDesc(keyTblDesc);
+
+ for (int i = 0; i < numAliases -1; i++) {
+ Byte src = tags[i];
+ mapredWork newPlan = PlanUtils.getMapRedWork();
+ mapredWork clonePlan = null;
+ try {
+ String xmlPlan = currPlan.toXML();
+ StringBuffer sb = new StringBuffer(xmlPlan);
+ ByteArrayInputStream bis;
+ bis = new ByteArrayInputStream(sb.toString().getBytes("UTF-8"));
+ clonePlan = Utilities.deserializeMapRedWork(bis, parseCtx.getConf());
+ } catch (UnsupportedEncodingException e) {
+ throw new SemanticException(e);
+ }
+
+ Operator<? extends Serializable>[] parentOps = new TableScanOperator[tags.length];
+ for (int k = 0; k < tags.length; k++) {
+ Operator<? extends Serializable> ts = OperatorFactory.get(tableScanDesc.class, (RowSchema) null);
+ parentOps[k] = ts;
+ }
+ Operator<? extends Serializable> tblScan_op = parentOps[i];
+
+ ArrayList<String> aliases = new ArrayList<String>();
+ String alias = src.toString();
+ aliases.add(alias);
+ String bigKeyDirPath = bigKeysDirMap.get(src);
+ newPlan.getPathToAliases().put(bigKeyDirPath, aliases);
+ newPlan.getAliasToWork().put(alias, tblScan_op);
+ partitionDesc part = new partitionDesc(tableDescList.get(src), null);
+ newPlan.getPathToPartitionInfo().put(bigKeyDirPath, part);
+ newPlan.getAliasToPartnInfo().put(alias, part);
+
+ Operator<? extends Serializable> reducer = clonePlan.getReducer();
+ assert reducer instanceof JoinOperator;
+ JoinOperator cloneJoinOp = (JoinOperator) reducer;
+
+ mapJoinDesc mapJoinDescriptor = new mapJoinDesc(newJoinKeys,
+ keyTblDesc, newJoinValues, newJoinValueTblDesc, joinDescriptor.getOutputColumnNames(),
+ i, joinDescriptor.getConds());
+ mapJoinDescriptor.setNoOuterJoin(joinDescriptor.isNoOuterJoin());
+ mapJoinDescriptor.setTagOrder(tags);
+ mapJoinDescriptor.setHandleSkewJoin(false);
+
+ mapredLocalWork localPlan = new mapredLocalWork(
+ new LinkedHashMap<String, Operator<? extends Serializable>>(),
+ new LinkedHashMap<String, fetchWork>());
+ Map<Byte, String> smallTblDirs = smallKeysDirMap.get(src);
+
+ for (int j = 0; j < numAliases; j++) {
+ if (j == i)
+ continue;
+ Byte small_alias = tags[j];
+ Operator<? extends Serializable> tblScan_op2 = parentOps[j];
+ localPlan.getAliasToWork().put(small_alias.toString(), tblScan_op2);
+ Path tblDir = new Path(smallTblDirs.get(small_alias));
+ localPlan.getAliasToFetchWork().put(small_alias.toString(),
+ new fetchWork(tblDir.toString(), tableDescList.get(small_alias)));
+ }
+
+ newPlan.setMapLocalWork(localPlan);
+
+ // construct a map join and set it as the child operator of tblScan_op
+ MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild(mapJoinDescriptor, (RowSchema) null, parentOps);
+ // change the children of the original join operator to point to the map
+ // join operator
+ List<Operator<? extends Serializable>> childOps = cloneJoinOp.getChildOperators();
+ for (Operator<? extends Serializable> childOp : childOps)
+ childOp.replaceParent(cloneJoinOp, mapJoinOp);
+ mapJoinOp.setChildOperators(childOps);
+
+ HiveConf jc = new HiveConf(parseCtx.getConf(), GenMRSkewJoinProcessor.class);
+ HiveConf.setVar(jc, HiveConf.ConfVars.HIVEINPUTFORMAT,
+ org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.class.getCanonicalName());
+ Task<? extends Serializable> skewJoinMapJoinTask = TaskFactory.get(newPlan, jc);
+ bigKeysDirToTaskMap.put(bigKeyDirPath, skewJoinMapJoinTask);
+ listWorks.add(skewJoinMapJoinTask.getWork());
+ listTasks.add(skewJoinMapJoinTask);
+ }
+
+ ConditionalWork cndWork = new ConditionalWork(listWorks);
+ ConditionalTask cndTsk = (ConditionalTask)TaskFactory.get(cndWork, parseCtx.getConf());
+ cndTsk.setListTasks(listTasks);
+ cndTsk.setResolver(new ConditionalResolverSkewJoin());
+ cndTsk.setResolverCtx(new ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap));
+ List<Task<? extends Serializable>> oldChildTasks = currTask.getChildTasks();
+ currTask.setChildTasks(new ArrayList<Task<? extends Serializable>>());
+ currTask.addDependentTask(cndTsk);
+
+ if (oldChildTasks != null) {
+ for(Task<? extends Serializable> tsk : cndTsk.getListTasks())
+ for (Task<? extends Serializable> oldChild : oldChildTasks)
+ tsk.addDependentTask(oldChild);
+ }
+ return;
+ }
+
+ public static boolean skewJoinEnabled(HiveConf conf, JoinOperator joinOp) {
+
+ if (conf != null && !conf.getBoolVar(HiveConf.ConfVars.HIVESKEWJOIN))
+ return false;
+
+ if(!joinOp.getConf().isNoOuterJoin())
+ return false;
+
+ byte pos = 0;
+ for(Byte tag: joinOp.getConf().getTagOrder()) {
+ if(tag != pos)
+ return false;
+ pos++;
+ }
+
+ return true;
+ }
+
+ private static String skewJoinPrefix = "hive_skew_join";
+ private static String UNDERLINE = "_";
+ private static String BIGKEYS = "bigkeys";
+ private static String SMALLKEYS = "smallkeys";
+ private static String RESULTS = "results";
+ static String getBigKeysDir(String baseDir, Byte srcTbl) {
+ return baseDir + File.separator + skewJoinPrefix + UNDERLINE + BIGKEYS
+ + UNDERLINE + srcTbl;
+ }
+
+ static String getBigKeysSkewJoinResultDir(String baseDir, Byte srcTbl) {
+ return baseDir + File.separator + skewJoinPrefix + UNDERLINE + BIGKEYS
+ + UNDERLINE + RESULTS + UNDERLINE+ srcTbl;
+ }
+
+ static String getSmallKeysDir(String baseDir, Byte srcTblBigTbl,
+ Byte srcTblSmallTbl) {
+ return baseDir + File.separator + skewJoinPrefix + UNDERLINE + SMALLKEYS
+ + UNDERLINE + srcTblBigTbl + UNDERLINE + srcTblSmallTbl;
+ }
+
+}
\ No newline at end of file
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+
+/**
+ * physical context used by physical resolvers.
+ */
+public class PhysicalContext {
+
+ protected HiveConf conf;
+ private ParseContext parseContext;
+ private Context context;
+ protected List<Task<? extends Serializable>> rootTasks;
+ protected Task<? extends Serializable> fetchTask;
+
+ public PhysicalContext(HiveConf conf, ParseContext parseContext,
+ Context context, List<Task<? extends Serializable>> rootTasks,
+ Task<? extends Serializable> fetchTask) {
+ super();
+ this.conf = conf;
+ this.parseContext = parseContext;
+ this.context = context;
+ this.rootTasks = rootTasks;
+ this.fetchTask = fetchTask;
+ }
+
+ public HiveConf getConf() {
+ return conf;
+ }
+
+ public void setConf(HiveConf conf) {
+ this.conf = conf;
+ }
+
+ public ParseContext getParseContext() {
+ return parseContext;
+ }
+
+ public void setParseContext(ParseContext parseContext) {
+ this.parseContext = parseContext;
+ }
+
+ public Context getContext() {
+ return context;
+ }
+
+ public void setContext(Context context) {
+ this.context = context;
+ }
+
+}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * A hierarchy physical optimizer, which contains a list of
+ * PhysicalPlanResolver. Each resolver has its own set of optimization rule.
+ */
+public class PhysicalOptimizer {
+ private PhysicalContext pctx;
+ private List<PhysicalPlanResolver> resolvers;
+
+ public PhysicalOptimizer(PhysicalContext pctx, HiveConf hiveConf) {
+ super();
+ this.pctx = pctx;
+ initialize(hiveConf);
+ }
+
+ /**
+ * create the list of physical plan resolvers
+ *
+ * @param hiveConf
+ */
+ private void initialize(HiveConf hiveConf) {
+ resolvers = new ArrayList<PhysicalPlanResolver>();
+ if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVESKEWJOIN))
+ resolvers.add(new SkewJoinResolver());
+ }
+
+ /**
+ * invoke all the resolvers one-by-one, and alter the physical plan
+ *
+ * @return PhysicalContext
+ * @throws HiveException
+ */
+ public PhysicalContext optimize() throws SemanticException {
+ for (PhysicalPlanResolver r : resolvers)
+ pctx = r.resolve(pctx);
+ return pctx;
+ }
+
+}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalPlanResolver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalPlanResolver.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalPlanResolver.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalPlanResolver.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * Physical plan optimization interface. Each resolver has its own set of
+ * optimization rules and transformations.
+ */
+public interface PhysicalPlanResolver {
+
+ /**
+ * All physical plan resolvers have to implement this entry method.
+ * @param pctx
+ * @return
+ */
+ public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException;
+
+}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.physical.SkewJoinResolver.SkewJoinProcCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * Node processor factory for skew join resolver.
+ */
+public class SkewJoinProcFactory {
+
+ public static NodeProcessor getDefaultProc() {
+ return new SkewJoinDefaultProcessor();
+ }
+
+ public static NodeProcessor getJoinProc() {
+ return new SkewJoinJoinProcessor();
+ }
+
+ public static class SkewJoinJoinProcessor implements NodeProcessor {
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException {
+ SkewJoinProcCtx context = (SkewJoinProcCtx)ctx;
+ JoinOperator op = (JoinOperator) nd;
+ ParseContext parseContext = context.getParseCtx();
+ Task<? extends Serializable> currentTsk = context.getCurrentTask();
+ GenMRSkewJoinProcessor.processSkewJoin(op, currentTsk, parseContext);
+ return null;
+ }
+ }
+
+ public static class SkewJoinDefaultProcessor implements NodeProcessor{
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+ return null;
+ }
+ }
+}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+
+/**
+ * An implementation of PhysicalPlanResolver. It iterator each task with a rule
+ * dispatcher for its reducer operator tree, for task with join op in reducer, it
+ * will try to add a conditional task associated a list of skew join tasks.
+ */
+public class SkewJoinResolver implements PhysicalPlanResolver{
+ @Override
+ public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
+ Dispatcher disp = new SkewJoinTaskDispatcher(pctx);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pctx.rootTasks);
+ ogw.startWalking(topNodes, null);
+ return null;
+ }
+
+ /**
+ * Iterator a task with a rule dispatcher for its reducer operator tree,
+ */
+ class SkewJoinTaskDispatcher implements Dispatcher{
+
+ private PhysicalContext physicalContext;
+
+ public SkewJoinTaskDispatcher(PhysicalContext context) {
+ super();
+ this.physicalContext = context;
+ }
+
+ @Override
+ public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
+ throws SemanticException {
+ Task<? extends Serializable> task = (Task<? extends Serializable>)nd;
+
+ if (!task.isMapRedTask()
+ || task instanceof ConditionalTask ||((mapredWork) task.getWork()).getReducer() == null)
+ return null;
+
+ SkewJoinProcCtx skewJoinProcContext = new SkewJoinProcCtx(task, this.physicalContext.getParseContext());
+
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(new RuleRegExp("R1", "JOIN%"), SkewJoinProcFactory.getJoinProc());
+
+ // The dispatcher fires the processor corresponding to the closest matching rule and passes the context along
+ Dispatcher disp = new DefaultRuleDispatcher(SkewJoinProcFactory.getDefaultProc(), opRules, skewJoinProcContext);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ // iterator the reducer operator tree
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.add(((mapredWork) task.getWork()).getReducer());
+ ogw.startWalking(topNodes, null);
+ return null;
+ }
+
+ public PhysicalContext getPhysicalContext() {
+ return physicalContext;
+ }
+
+ public void setPhysicalContext(PhysicalContext physicalContext) {
+ this.physicalContext = physicalContext;
+ }
+ }
+
+ /**
+ * A container of current task and parse context.
+ */
+ public static class SkewJoinProcCtx implements NodeProcessorCtx {
+ private Task<? extends Serializable> currentTask;
+ private ParseContext parseCtx;
+
+ public SkewJoinProcCtx(Task<? extends Serializable> task, ParseContext parseCtx) {
+ this.currentTask = task;
+ this.parseCtx = parseCtx;
+ }
+
+ public Task<? extends Serializable> getCurrentTask() {
+ return currentTask;
+ }
+
+ public void setCurrentTask(Task<? extends Serializable> currentTask) {
+ this.currentTask = currentTask;
+ }
+
+ public ParseContext getParseCtx() {
+ return parseCtx;
+ }
+
+ public void setParseCtx(ParseContext parseCtx) {
+ this.parseCtx = parseCtx;
+ }
+ }
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Sat Jan 16 06:44:01 2010
@@ -78,6 +78,7 @@
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory;
import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1;
import org.apache.hadoop.hive.ql.optimizer.GenMROperator;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext;
@@ -91,6 +92,9 @@
import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory;
import org.apache.hadoop.hive.ql.optimizer.Optimizer;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.optimizer.physical.GenMRSkewJoinProcessor;
+import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
+import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
import org.apache.hadoop.hive.ql.plan.DDLWork;
@@ -4993,7 +4997,7 @@
opRules.put(new RuleRegExp(new String("R9"), "UNION%.*MAPJOIN%"), MapJoinFactory.getUnionMapJoin());
opRules.put(new RuleRegExp(new String("R10"), "MAPJOIN%.*MAPJOIN%"), MapJoinFactory.getMapJoinMapJoin());
opRules.put(new RuleRegExp(new String("R11"), "MAPJOIN%SEL%"), MapJoinFactory.getMapJoin());
-
+
// The dispatcher fires the processor corresponding to the closest matching rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules, procCtx);
@@ -5007,10 +5011,14 @@
// For each task, go over all operators recursively
for (Task<? extends Serializable> rootTask: rootTasks)
breakTaskTree(rootTask);
-
+
// For each task, set the key descriptor for the reducer
for (Task<? extends Serializable> rootTask: rootTasks)
setKeyDescTaskTree(rootTask);
+
+ PhysicalContext physicalContext = new PhysicalContext(conf, getParseContext(), ctx, rootTasks, fetchTask);
+ PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(physicalContext, conf);
+ physicalOptimizer.optimize();
// For each operator, generate the counters if needed
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEJOBPROGRESS))
@@ -5039,7 +5047,7 @@
}
}
}
-
+
/**
* Find all leaf tasks of the list of root tasks.
*/
@@ -5104,7 +5112,7 @@
for (Operator<? extends Serializable> child: op.getChildOperators())
generateCountersOperator(child);
}
-
+
// loop over all the tasks recursviely
private void breakTaskTree(Task<? extends Serializable> task) {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolver.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolver.java Sat Jan 16 06:44:01 2010
@@ -18,7 +18,11 @@
package org.apache.hadoop.hive.ql.plan;
+import java.io.Serializable;
+import java.util.List;
+
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Task;
/**
* Conditional task resolution interface. This is invoked at run time to get the task to invoke.
@@ -31,5 +35,5 @@
* @param ctx opaque context
* @return position of the task
*/
- public int getTaskId(HiveConf conf, Object ctx);
+ public List<Task<? extends Serializable>> getTasks(HiveConf conf, Object ctx);
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java Sat Jan 16 06:44:01 2010
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
@@ -82,10 +83,11 @@
}
}
- public int getTaskId(HiveConf conf, Object objCtx) {
+ public List<Task<? extends Serializable>> getTasks(HiveConf conf, Object objCtx) {
ConditionalResolverMergeFilesCtx ctx = (ConditionalResolverMergeFilesCtx)objCtx;
String dirName = ctx.getDir();
+ List<Task<? extends Serializable>> resTsks = new ArrayList<Task<? extends Serializable>>();
// check if a map-reduce job is needed to merge the files
// If the current size is smaller than the target, merge
long trgtSize = conf.getLongVar(HiveConf.ConfVars.HIVEMERGEMAPFILESSIZE);
@@ -114,13 +116,14 @@
reducers = Math.max(1, reducers);
reducers = Math.min(maxReducers, reducers);
work.setNumReduceTasks(reducers);
-
- return 1;
+ resTsks.add(tsk);
+ return resTsks;
}
}
} catch (IOException e) {
e.printStackTrace();
}
- return 0;
+ resTsks.add(ctx.getListTasks().get(0));
+ return resTsks;
}
}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Task;
+
+public class ConditionalResolverSkewJoin implements ConditionalResolver, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public static class ConditionalResolverSkewJoinCtx implements Serializable {
+ private static final long serialVersionUID = 1L;
+ // we store big keys in one table into one dir, and same keys in other
+ // tables into corresponding different dirs (one dir per table).
+ // this map stores mapping from "big key dir" to its corresponding mapjoin
+ // task.
+ Map<String, Task<? extends Serializable>> dirToTaskMap;
+
+ public ConditionalResolverSkewJoinCtx(
+ Map<String, Task<? extends Serializable>> dirToTaskMap) {
+ super();
+ this.dirToTaskMap = dirToTaskMap;
+ }
+
+ public Map<String, Task<? extends Serializable>> getDirToTaskMap() {
+ return dirToTaskMap;
+ }
+
+ public void setDirToTaskMap(
+ Map<String, Task<? extends Serializable>> dirToTaskMap) {
+ this.dirToTaskMap = dirToTaskMap;
+ }
+ }
+
+ public ConditionalResolverSkewJoin(){
+ }
+
+ @Override
+ public List<Task<? extends Serializable>> getTasks(HiveConf conf, Object objCtx) {
+ ConditionalResolverSkewJoinCtx ctx = (ConditionalResolverSkewJoinCtx)objCtx;
+ List<Task<? extends Serializable>> resTsks = new ArrayList<Task<? extends Serializable>>();
+
+ Map<String, Task<? extends Serializable>> dirToTaskMap = ctx.getDirToTaskMap();
+ Iterator<Entry<String, Task<? extends Serializable>>> bigKeysPathsIter = dirToTaskMap.entrySet().iterator();
+ try {
+ while(bigKeysPathsIter.hasNext()) {
+ Entry<String, Task<? extends Serializable>> entry = bigKeysPathsIter.next();
+ String path = entry.getKey();
+ Path dirPath = new Path(path);
+ FileSystem inpFs = dirPath.getFileSystem(conf);
+ FileStatus[] fstatus = inpFs.listStatus(dirPath);
+ if (fstatus.length > 0)
+ resTsks.add(entry.getValue());
+ }
+ }catch (IOException e) {
+ e.printStackTrace();
+ }
+ return resTsks;
+ }
+
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalWork.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalWork.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalWork.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalWork.java Sat Jan 16 06:44:01 2010
@@ -36,7 +36,6 @@
/**
* @return the listWorks
*/
- @explain(displayName="list of dependent Tasks")
public List<? extends Serializable> getListWorks() {
return listWorks;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/joinDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/joinDesc.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/joinDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/joinDesc.java Sat Jan 16 06:44:01 2010
@@ -28,6 +28,7 @@
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -46,6 +47,13 @@
public static final int UNIQUE_JOIN = 4;
public static final int LEFT_SEMI_JOIN = 5;
+ //used to handle skew join
+ private boolean handleSkewJoin = false;
+ private int skewKeyDefinition = -1;
+ private Map<Byte, String> bigKeysDirMap;
+ private Map<Byte, Map<Byte, String>> smallKeysDirMap;
+ private Map<Byte, tableDesc> skewKeysValuesTables;
+
// alias to key mapping
private Map<Byte, List<exprNodeDesc>> exprs;
@@ -61,6 +69,7 @@
protected joinCond[] conds;
protected Byte[] tagOrder;
+ private tableDesc keyTableDesc;
public joinDesc() { }
@@ -187,4 +196,95 @@
public void setTagOrder(Byte[] tagOrder) {
this.tagOrder = tagOrder;
}
+
+ @explain(displayName="handleSkewJoin")
+ public boolean getHandleSkewJoin() {
+ return handleSkewJoin;
+ }
+
+ /**
+ * set to handle skew join in this join op
+ * @param handleSkewJoin
+ */
+ public void setHandleSkewJoin(boolean handleSkewJoin) {
+ this.handleSkewJoin = handleSkewJoin;
+ }
+
+ /**
+ * @return mapping from tbl to dir for big keys
+ */
+ public Map<Byte, String> getBigKeysDirMap() {
+ return bigKeysDirMap;
+ }
+
+ /**
+ * set the mapping from tbl to dir for big keys
+ * @param bigKeysDirMap
+ */
+ public void setBigKeysDirMap(Map<Byte, String> bigKeysDirMap) {
+ this.bigKeysDirMap = bigKeysDirMap;
+ }
+
+ /**
+ * @return mapping from tbl to dir for small keys
+ */
+ public Map<Byte, Map<Byte, String>> getSmallKeysDirMap() {
+ return smallKeysDirMap;
+ }
+
+ /**
+ * set the mapping from tbl to dir for small keys
+ * @param bigKeysDirMap
+ */
+ public void setSmallKeysDirMap(Map<Byte, Map<Byte, String>> smallKeysDirMap) {
+ this.smallKeysDirMap = smallKeysDirMap;
+ }
+
+ /**
+ * @return skew key definition. If we see a key's associated entries' number
+ * is bigger than this, we will define this key as a skew key.
+ */
+ public int getSkewKeyDefinition() {
+ return skewKeyDefinition;
+ }
+
+ /**
+ * set skew key definition
+ * @param skewKeyDefinition
+ */
+ public void setSkewKeyDefinition(int skewKeyDefinition) {
+ this.skewKeyDefinition = skewKeyDefinition;
+ }
+
+ /**
+ * @return the table desc for storing skew keys and their corresponding value;
+ */
+ public Map<Byte, tableDesc> getSkewKeysValuesTables() {
+ return skewKeysValuesTables;
+ }
+
+ /**
+ * @param skewKeysValuesTable set the table desc for storing skew keys and their corresponding value;
+ */
+ public void setSkewKeysValuesTables(Map<Byte, tableDesc> skewKeysValuesTables) {
+ this.skewKeysValuesTables = skewKeysValuesTables;
+ }
+
+ public boolean isNoOuterJoin() {
+ for (org.apache.hadoop.hive.ql.plan.joinCond cond : conds) {
+ if (cond.getType() == joinDesc.FULL_OUTER_JOIN
+ || (cond.getType() == joinDesc.LEFT_OUTER_JOIN)
+ || cond.getType() == joinDesc.RIGHT_OUTER_JOIN)
+ return false;
+ }
+ return true;
+ }
+
+ public void setKeyTableDesc(tableDesc keyTblDesc) {
+ this.keyTableDesc = keyTblDesc;
+ }
+
+ public tableDesc getKeyTableDesc() {
+ return keyTableDesc;
+ }
}
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/skewjoin.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/skewjoin.q?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/skewjoin.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/skewjoin.q Sat Jan 16 06:44:01 2010
@@ -0,0 +1,138 @@
+set hive.optimize.skewjoin = true;
+set hive.skewjoin.key = 2;
+
+DROP TABLE T1;
+DROP TABLE T2;
+DROP TABLE T3;
+DROP TABLE T4;
+DROP TABLE dest_j1;
+
+CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE;
+CREATE TABLE T2(key STRING, val STRING) STORED AS TEXTFILE;
+CREATE TABLE T3(key STRING, val STRING) STORED AS TEXTFILE;
+CREATE TABLE T4(key STRING, val STRING) STORED AS TEXTFILE;
+CREATE TABLE dest_j1(key INT, value STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+LOAD DATA LOCAL INPATH '../data/files/T3.txt' INTO TABLE T3;
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T4;
+
+
+EXPLAIN
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+INSERT OVERWRITE TABLE dest_j1 SELECT src1.key, src2.value;
+
+FROM src src1 JOIN src src2 ON (src1.key = src2.key)
+INSERT OVERWRITE TABLE dest_j1 SELECT src1.key, src2.value;
+
+SELECT sum(hash(key)), sum(hash(value)) FROM dest_j1;
+
+
+EXPLAIN
+SELECT /*+ STREAMTABLE(a) */ *
+FROM T1 a JOIN T2 b ON a.key = b.key
+ JOIN T3 c ON b.key = c.key
+ JOIN T4 d ON c.key = d.key;
+
+SELECT /*+ STREAMTABLE(a) */ *
+FROM T1 a JOIN T2 b ON a.key = b.key
+ JOIN T3 c ON b.key = c.key
+ JOIN T4 d ON c.key = d.key;
+
+EXPLAIN
+SELECT /*+ STREAMTABLE(a,c) */ *
+FROM T1 a JOIN T2 b ON a.key = b.key
+ JOIN T3 c ON b.key = c.key
+ JOIN T4 d ON c.key = d.key;
+
+SELECT /*+ STREAMTABLE(a,c) */ *
+FROM T1 a JOIN T2 b ON a.key = b.key
+ JOIN T3 c ON b.key = c.key
+ JOIN T4 d ON c.key = d.key;
+
+
+EXPLAIN FROM T1 a JOIN src c ON c.key+1=a.key SELECT /*+ STREAMTABLE(a) */ sum(hash(a.key)), sum(hash(a.val)), sum(hash(c.key));
+FROM T1 a JOIN src c ON c.key+1=a.key SELECT /*+ STREAMTABLE(a) */ sum(hash(a.key)), sum(hash(a.val)), sum(hash(c.key));
+
+EXPLAIN FROM
+(SELECT src.* FROM src) x
+JOIN
+(SELECT src.* FROM src) Y
+ON (x.key = Y.key)
+SELECT sum(hash(Y.key)), sum(hash(Y.value));
+
+FROM
+(SELECT src.* FROM src) x
+JOIN
+(SELECT src.* FROM src) Y
+ON (x.key = Y.key)
+SELECT sum(hash(Y.key)), sum(hash(Y.value));
+
+
+EXPLAIN FROM
+(SELECT src.* FROM src) x
+JOIN
+(SELECT src.* FROM src) Y
+ON (x.key = Y.key and substring(x.value, 5)=substring(y.value, 5)+1)
+SELECT sum(hash(Y.key)), sum(hash(Y.value));
+
+FROM
+(SELECT src.* FROM src) x
+JOIN
+(SELECT src.* FROM src) Y
+ON (x.key = Y.key and substring(x.value, 5)=substring(y.value, 5)+1)
+SELECT sum(hash(Y.key)), sum(hash(Y.value));
+
+
+EXPLAIN
+SELECT sum(hash(src1.c1)), sum(hash(src2.c4))
+FROM
+(SELECT src.key as c1, src.value as c2 from src) src1
+JOIN
+(SELECT src.key as c3, src.value as c4 from src) src2
+ON src1.c1 = src2.c3 AND src1.c1 < 100
+JOIN
+(SELECT src.key as c5, src.value as c6 from src) src3
+ON src1.c1 = src3.c5 AND src3.c5 < 80;
+
+SELECT sum(hash(src1.c1)), sum(hash(src2.c4))
+FROM
+(SELECT src.key as c1, src.value as c2 from src) src1
+JOIN
+(SELECT src.key as c3, src.value as c4 from src) src2
+ON src1.c1 = src2.c3 AND src1.c1 < 100
+JOIN
+(SELECT src.key as c5, src.value as c6 from src) src3
+ON src1.c1 = src3.c5 AND src3.c5 < 80;
+
+EXPLAIN
+SELECT /*+ mapjoin(v)*/ sum(hash(k.key)), sum(hash(v.val)) FROM T1 k LEFT OUTER JOIN T1 v ON k.key+1=v.key;
+SELECT /*+ mapjoin(v)*/ sum(hash(k.key)), sum(hash(v.val)) FROM T1 k LEFT OUTER JOIN T1 v ON k.key+1=v.key;
+
+select /*+ mapjoin(k)*/ sum(hash(k.key)), sum(hash(v.val)) from T1 k join T1 v on k.key=v.val;
+
+select /*+ mapjoin(k)*/ sum(hash(k.key)), sum(hash(v.val)) from T1 k join T1 v on k.key=v.key;
+
+select sum(hash(k.key)), sum(hash(v.val)) from T1 k join T1 v on k.key=v.key;
+
+select count(1) from T1 a join T1 b on a.key = b.key;
+
+FROM T1 a LEFT OUTER JOIN T2 c ON c.key+1=a.key SELECT sum(hash(a.key)), sum(hash(a.val)), sum(hash(c.key));
+
+FROM T1 a RIGHT OUTER JOIN T2 c ON c.key+1=a.key SELECT /*+ STREAMTABLE(a) */ sum(hash(a.key)), sum(hash(a.val)), sum(hash(c.key));
+
+FROM T1 a FULL OUTER JOIN T2 c ON c.key+1=a.key SELECT /*+ STREAMTABLE(a) */ sum(hash(a.key)), sum(hash(a.val)), sum(hash(c.key));
+
+SELECT sum(hash(src1.key)), sum(hash(src1.val)), sum(hash(src2.key)) FROM T1 src1 LEFT OUTER JOIN T2 src2 ON src1.key+1 = src2.key RIGHT OUTER JOIN T2 src3 ON src2.key = src3.key;
+
+SELECT sum(hash(src1.key)), sum(hash(src1.val)), sum(hash(src2.key)) FROM T1 src1 JOIN T2 src2 ON src1.key+1 = src2.key JOIN T2 src3 ON src2.key = src3.key;
+
+select /*+ mapjoin(v)*/ sum(hash(k.key)), sum(hash(v.val)) from T1 k left outer join T1 v on k.key+1=v.key;
+
+
+DROP TABLE dest_j1;
+DROP TABLE T1;
+DROP TABLE T2;
+DROP TABLE T3;
+DROP TABLE T4;
\ No newline at end of file
Modified: hadoop/hive/trunk/ql/src/test/results/clientpositive/case_sensitivity.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/case_sensitivity.q.out?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/case_sensitivity.q.out (original)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/case_sensitivity.q.out Sat Jan 16 06:44:01 2010
@@ -16,8 +16,10 @@
STAGE DEPENDENCIES:
Stage-1 is a root stage
- Stage-4 depends on stages: Stage-1
- Stage-0 depends on stages: Stage-4
+ Stage-4 depends on stages: Stage-1 , consists of Stage-3, Stage-2
+ Stage-3
+ Stage-0 depends on stages: Stage-3, Stage-2
+ Stage-2
STAGE PLANS:
Stage: Stage-1
@@ -52,35 +54,12 @@
Stage: Stage-4
Conditional Operator
- list of dependent Tasks:
- Move Operator
- files:
- hdfs directory: true
- destination: file:/data/users/njain/hive5/hive5/build/ql/tmp/596729572/10000
- Map Reduce
- Alias -> Map Operator Tree:
- file:/data/users/njain/hive5/hive5/build/ql/tmp/790017029/10002
- Reduce Output Operator
- sort order:
- Map-reduce partition columns:
- expr: rand()
- type: double
- tag: -1
- value expressions:
- expr: key
- type: int
- expr: value
- type: string
- Reduce Operator Tree:
- Extract
- File Output Operator
- compressed: false
- GlobalTableId: 0
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: dest1
+
+ Stage: Stage-3
+ Move Operator
+ files:
+ hdfs directory: true
+ destination: file:/data/users/heyongqiang/hive-trunk/.ptest_2/build/ql/tmp/1283457982/10000
Stage: Stage-0
Move Operator
@@ -92,6 +71,32 @@
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
+ Stage: Stage-2
+ Map Reduce
+ Alias -> Map Operator Tree:
+ file:/data/users/heyongqiang/hive-trunk/.ptest_2/build/ql/tmp/34390708/10002
+ Reduce Output Operator
+ sort order:
+ Map-reduce partition columns:
+ expr: rand()
+ type: double
+ tag: -1
+ value expressions:
+ expr: key
+ type: int
+ expr: value
+ type: string
+ Reduce Operator Tree:
+ Extract
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: dest1
+
PREHOOK: query: FROM SRC_THRIFT
INSERT OVERWRITE TABLE dest1 SELECT src_Thrift.LINT[1], src_thrift.lintstring[0].MYSTRING where src_thrift.liNT[0] > 0
@@ -106,11 +111,11 @@
PREHOOK: query: SELECT DEST1.* FROM Dest1
PREHOOK: type: QUERY
PREHOOK: Input: default@dest1
-PREHOOK: Output: file:/data/users/njain/hive5/hive5/build/ql/tmp/423054543/10000
+PREHOOK: Output: file:/data/users/heyongqiang/hive-trunk/.ptest_2/build/ql/tmp/2134218659/10000
POSTHOOK: query: SELECT DEST1.* FROM Dest1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@dest1
-POSTHOOK: Output: file:/data/users/njain/hive5/hive5/build/ql/tmp/423054543/10000
+POSTHOOK: Output: file:/data/users/heyongqiang/hive-trunk/.ptest_2/build/ql/tmp/2134218659/10000
2 1
4 8
6 27
Modified: hadoop/hive/trunk/ql/src/test/results/clientpositive/cast1.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/cast1.q.out?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/cast1.q.out (original)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/cast1.q.out Sat Jan 16 06:44:01 2010
@@ -14,8 +14,10 @@
STAGE DEPENDENCIES:
Stage-1 is a root stage
- Stage-4 depends on stages: Stage-1
- Stage-0 depends on stages: Stage-4
+ Stage-4 depends on stages: Stage-1 , consists of Stage-3, Stage-2
+ Stage-3
+ Stage-0 depends on stages: Stage-3, Stage-2
+ Stage-2
STAGE PLANS:
Stage: Stage-1
@@ -60,45 +62,12 @@
Stage: Stage-4
Conditional Operator
- list of dependent Tasks:
- Move Operator
- files:
- hdfs directory: true
- destination: file:/data/users/njain/hive5/hive5/build/ql/tmp/1071125214/10000
- Map Reduce
- Alias -> Map Operator Tree:
- file:/data/users/njain/hive5/hive5/build/ql/tmp/517302853/10002
- Reduce Output Operator
- sort order:
- Map-reduce partition columns:
- expr: rand()
- type: double
- tag: -1
- value expressions:
- expr: c1
- type: int
- expr: c2
- type: double
- expr: c3
- type: double
- expr: c4
- type: double
- expr: c5
- type: int
- expr: c6
- type: string
- expr: c7
- type: int
- Reduce Operator Tree:
- Extract
- File Output Operator
- compressed: false
- GlobalTableId: 0
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: dest1
+
+ Stage: Stage-3
+ Move Operator
+ files:
+ hdfs directory: true
+ destination: file:/data/users/heyongqiang/hive-trunk/.ptest_0/build/ql/tmp/1062243767/10000
Stage: Stage-0
Move Operator
@@ -110,6 +79,42 @@
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
+ Stage: Stage-2
+ Map Reduce
+ Alias -> Map Operator Tree:
+ file:/data/users/heyongqiang/hive-trunk/.ptest_0/build/ql/tmp/493733948/10002
+ Reduce Output Operator
+ sort order:
+ Map-reduce partition columns:
+ expr: rand()
+ type: double
+ tag: -1
+ value expressions:
+ expr: c1
+ type: int
+ expr: c2
+ type: double
+ expr: c3
+ type: double
+ expr: c4
+ type: double
+ expr: c5
+ type: int
+ expr: c6
+ type: string
+ expr: c7
+ type: int
+ Reduce Operator Tree:
+ Extract
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: dest1
+
PREHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 SELECT 3 + 2, 3.0 + 2, 3 + 2.0, 3.0 + 2.0, 3 + CAST(2.0 AS INT) + CAST(CAST(0 AS SMALLINT) AS INT), CAST(1 AS BOOLEAN), CAST(TRUE AS INT) WHERE src.key = 86
PREHOOK: type: QUERY
@@ -122,9 +127,9 @@
PREHOOK: query: select dest1.* FROM dest1
PREHOOK: type: QUERY
PREHOOK: Input: default@dest1
-PREHOOK: Output: file:/data/users/njain/hive5/hive5/build/ql/tmp/822287214/10000
+PREHOOK: Output: file:/data/users/heyongqiang/hive-trunk/.ptest_0/build/ql/tmp/1057577384/10000
POSTHOOK: query: select dest1.* FROM dest1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@dest1
-POSTHOOK: Output: file:/data/users/njain/hive5/hive5/build/ql/tmp/822287214/10000
+POSTHOOK: Output: file:/data/users/heyongqiang/hive-trunk/.ptest_0/build/ql/tmp/1057577384/10000
5 5.0 5.0 5.0 5 true 1