You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC
svn commit: r901644 [35/37] - in /hadoop/hive/trunk: ./
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/
ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/jav...
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TransactionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TransactionManager.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TransactionManager.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TransactionManager.java Thu Jan 21 10:37:58 2010
@@ -65,379 +65,371 @@
package org.apache.hadoop.hive.ql.util.jdbm.recman;
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.TreeSet;
/**
- * This class manages the transaction log that belongs to every
- * {@link RecordFile}. The transaction log is either clean, or
- * in progress. In the latter case, the transaction manager
- * takes care of a roll forward.
+ * This class manages the transaction log that belongs to every
+ * {@link RecordFile}. The transaction log is either clean, or in progress. In
+ * the latter case, the transaction manager takes care of a roll forward.
*<p>
- * Implementation note: this is a proof-of-concept implementation
- * which hasn't been optimized for speed. For instance, all sorts
- * of streams are created for every transaction.
+ * Implementation note: this is a proof-of-concept implementation which hasn't
+ * been optimized for speed. For instance, all sorts of streams are created for
+ * every transaction.
*/
// TODO: Handle the case where we are recovering lg9 and lg0, were we
// should start with lg9 instead of lg0!
public final class TransactionManager {
- private RecordFile owner;
+ private final RecordFile owner;
- // streams for transaction log.
- private FileOutputStream fos;
- private ObjectOutputStream oos;
-
- /**
- * By default, we keep 10 transactions in the log file before
- * synchronizing it with the main database file.
- */
- static final int DEFAULT_TXNS_IN_LOG = 10;
-
- /**
- * Maximum number of transactions before the log file is
- * synchronized with the main database file.
- */
- private int _maxTxns = DEFAULT_TXNS_IN_LOG;
-
- /**
- * In-core copy of transactions. We could read everything back from
- * the log file, but the RecordFile needs to keep the dirty blocks in
- * core anyway, so we might as well point to them and spare us a lot
- * of hassle.
- */
- private ArrayList[] txns = new ArrayList[DEFAULT_TXNS_IN_LOG];
- private int curTxn = -1;
-
- /** Extension of a log file. */
- static final String extension = ".lg";
-
- /** log file name */
- private String logFileName;
-
- /**
- * Instantiates a transaction manager instance. If recovery
- * needs to be performed, it is done.
- *
- * @param owner the RecordFile instance that owns this transaction mgr.
- */
- TransactionManager(RecordFile owner) throws IOException {
- this.owner = owner;
- logFileName = null;
- recover();
- open();
- }
-
-
- /**
- * Synchronize log file data with the main database file.
- * <p>
- * After this call, the main database file is guaranteed to be
- * consistent and guaranteed to be the only file needed for
- * backup purposes.
- */
- public void synchronizeLog()
- throws IOException
- {
- synchronizeLogFromMemory();
- }
-
-
- /**
- * Set the maximum number of transactions to record in
- * the log (and keep in memory) before the log is
- * synchronized with the main database file.
- * <p>
- * This method must be called while there are no
- * pending transactions in the log.
- */
- public void setMaximumTransactionsInLog( int maxTxns )
- throws IOException
- {
- if ( maxTxns <= 0 ) {
- throw new IllegalArgumentException(
- "Argument 'maxTxns' must be greater than 0." );
- }
- if ( curTxn != -1 ) {
- throw new IllegalStateException(
- "Cannot change setting while transactions are pending in the log" );
+ // streams for transaction log.
+ private FileOutputStream fos;
+ private ObjectOutputStream oos;
+
+ /**
+ * By default, we keep 10 transactions in the log file before synchronizing it
+ * with the main database file.
+ */
+ static final int DEFAULT_TXNS_IN_LOG = 10;
+
+ /**
+ * Maximum number of transactions before the log file is synchronized with the
+ * main database file.
+ */
+ private int _maxTxns = DEFAULT_TXNS_IN_LOG;
+
+ /**
+ * In-core copy of transactions. We could read everything back from the log
+ * file, but the RecordFile needs to keep the dirty blocks in core anyway, so
+ * we might as well point to them and spare us a lot of hassle.
+ */
+ private ArrayList[] txns = new ArrayList[DEFAULT_TXNS_IN_LOG];
+ private int curTxn = -1;
+
+ /** Extension of a log file. */
+ static final String extension = ".lg";
+
+ /** log file name */
+ private String logFileName;
+
+ /**
+ * Instantiates a transaction manager instance. If recovery needs to be
+ * performed, it is done.
+ *
+ * @param owner
+ * the RecordFile instance that owns this transaction mgr.
+ */
+ TransactionManager(RecordFile owner) throws IOException {
+ this.owner = owner;
+ logFileName = null;
+ recover();
+ open();
+ }
+
+ /**
+ * Synchronize log file data with the main database file.
+ * <p>
+ * After this call, the main database file is guaranteed to be consistent and
+ * guaranteed to be the only file needed for backup purposes.
+ */
+ public void synchronizeLog() throws IOException {
+ synchronizeLogFromMemory();
+ }
+
+ /**
+ * Set the maximum number of transactions to record in the log (and keep in
+ * memory) before the log is synchronized with the main database file.
+ * <p>
+ * This method must be called while there are no pending transactions in the
+ * log.
+ */
+ public void setMaximumTransactionsInLog(int maxTxns) throws IOException {
+ if (maxTxns <= 0) {
+ throw new IllegalArgumentException(
+ "Argument 'maxTxns' must be greater than 0.");
+ }
+ if (curTxn != -1) {
+ throw new IllegalStateException(
+ "Cannot change setting while transactions are pending in the log");
+ }
+ _maxTxns = maxTxns;
+ txns = new ArrayList[maxTxns];
+ }
+
+ /** Builds logfile name */
+ private String makeLogName() {
+ return owner.getFileName() + extension;
+ }
+
+ /** Synchs in-core transactions to data file and opens a fresh log */
+ private void synchronizeLogFromMemory() throws IOException {
+ close();
+
+ TreeSet blockList = new TreeSet(new BlockIoComparator());
+
+ int numBlocks = 0;
+ int writtenBlocks = 0;
+ for (int i = 0; i < _maxTxns; i++) {
+ if (txns[i] == null) {
+ continue;
+ }
+ // Add each block to the blockList, replacing the old copy of this
+ // block if necessary, thus avoiding writing the same block twice
+ for (Iterator k = txns[i].iterator(); k.hasNext();) {
+ BlockIo block = (BlockIo) k.next();
+ if (blockList.contains(block)) {
+ block.decrementTransactionCount();
+ } else {
+ writtenBlocks++;
+ blockList.add(block);
}
- _maxTxns = maxTxns;
- txns = new ArrayList[ maxTxns ];
- }
+ numBlocks++;
+ }
-
- /** Builds logfile name */
- private String makeLogName() {
- return owner.getFileName() + extension;
+ txns[i] = null;
}
+ // Write the blocks from the blockList to disk
+ synchronizeBlocks(blockList.iterator(), true);
-
- /** Synchs in-core transactions to data file and opens a fresh log */
- private void synchronizeLogFromMemory() throws IOException {
- close();
-
- TreeSet blockList = new TreeSet( new BlockIoComparator() );
-
- int numBlocks = 0;
- int writtenBlocks = 0;
- for (int i = 0; i < _maxTxns; i++) {
- if (txns[i] == null)
- continue;
- // Add each block to the blockList, replacing the old copy of this
- // block if necessary, thus avoiding writing the same block twice
- for (Iterator k = txns[i].iterator(); k.hasNext(); ) {
- BlockIo block = (BlockIo)k.next();
- if ( blockList.contains( block ) ) {
- block.decrementTransactionCount();
- }
- else {
- writtenBlocks++;
- boolean result = blockList.add( block );
- }
- numBlocks++;
- }
-
- txns[i] = null;
- }
- // Write the blocks from the blockList to disk
- synchronizeBlocks(blockList.iterator(), true);
-
- owner.sync();
- open();
+ owner.sync();
+ open();
+ }
+
+ /** Opens the log file */
+ private void open() throws IOException {
+ logFileName = makeLogName();
+ fos = new FileOutputStream(logFileName);
+ oos = new ObjectOutputStream(fos);
+ oos.writeShort(Magic.LOGFILE_HEADER);
+ oos.flush();
+ curTxn = -1;
+ }
+
+ /** Startup recovery on all files */
+ private void recover() throws IOException {
+ String logName = makeLogName();
+ File logFile = new File(logName);
+ if (!logFile.exists()) {
+ return;
+ }
+ if (logFile.length() == 0) {
+ logFile.delete();
+ return;
+ }
+
+ FileInputStream fis = new FileInputStream(logFile);
+ ObjectInputStream ois = new ObjectInputStream(fis);
+
+ try {
+ if (ois.readShort() != Magic.LOGFILE_HEADER) {
+ throw new Error("Bad magic on log file");
+ }
+ } catch (IOException e) {
+ // corrupted/empty logfile
+ logFile.delete();
+ return;
}
+ while (true) {
+ ArrayList blocks = null;
+ try {
+ blocks = (ArrayList) ois.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new Error("Unexcepted exception: " + e);
+ } catch (IOException e) {
+ // corrupted logfile, ignore rest of transactions
+ break;
+ }
+ synchronizeBlocks(blocks.iterator(), false);
- /** Opens the log file */
- private void open() throws IOException {
- logFileName = makeLogName();
- fos = new FileOutputStream(logFileName);
- oos = new ObjectOutputStream(fos);
- oos.writeShort(Magic.LOGFILE_HEADER);
- oos.flush();
- curTxn = -1;
- }
-
- /** Startup recovery on all files */
- private void recover() throws IOException {
- String logName = makeLogName();
- File logFile = new File(logName);
- if (!logFile.exists())
- return;
- if (logFile.length() == 0) {
- logFile.delete();
- return;
- }
-
- FileInputStream fis = new FileInputStream(logFile);
- ObjectInputStream ois = new ObjectInputStream(fis);
-
- try {
- if (ois.readShort() != Magic.LOGFILE_HEADER)
- throw new Error("Bad magic on log file");
- } catch (IOException e) {
- // corrupted/empty logfile
- logFile.delete();
- return;
- }
-
- while (true) {
- ArrayList blocks = null;
- try {
- blocks = (ArrayList) ois.readObject();
- } catch (ClassNotFoundException e) {
- throw new Error("Unexcepted exception: " + e);
- } catch (IOException e) {
- // corrupted logfile, ignore rest of transactions
- break;
- }
- synchronizeBlocks(blocks.iterator(), false);
-
- // ObjectInputStream must match exactly each
- // ObjectOutputStream created during writes
- try {
- ois = new ObjectInputStream(fis);
- } catch (IOException e) {
- // corrupted logfile, ignore rest of transactions
- break;
- }
- }
- owner.sync();
- logFile.delete();
- }
-
- /** Synchronizes the indicated blocks with the owner. */
- private void synchronizeBlocks(Iterator blockIterator, boolean fromCore)
- throws IOException {
- // write block vector elements to the data file.
- while ( blockIterator.hasNext() ) {
- BlockIo cur = (BlockIo)blockIterator.next();
- owner.synch(cur);
- if (fromCore) {
- cur.decrementTransactionCount();
- if (!cur.isInTransaction()) {
- owner.releaseFromTransaction(cur, true);
- }
- }
- }
+ // ObjectInputStream must match exactly each
+ // ObjectOutputStream created during writes
+ try {
+ ois = new ObjectInputStream(fis);
+ } catch (IOException e) {
+ // corrupted logfile, ignore rest of transactions
+ break;
+ }
}
-
-
- /** Set clean flag on the blocks. */
- private void setClean(ArrayList blocks)
- throws IOException {
- for (Iterator k = blocks.iterator(); k.hasNext(); ) {
- BlockIo cur = (BlockIo) k.next();
- cur.setClean();
+ owner.sync();
+ logFile.delete();
+ }
+
+ /** Synchronizes the indicated blocks with the owner. */
+ private void synchronizeBlocks(Iterator blockIterator, boolean fromCore)
+ throws IOException {
+ // write block vector elements to the data file.
+ while (blockIterator.hasNext()) {
+ BlockIo cur = (BlockIo) blockIterator.next();
+ owner.synch(cur);
+ if (fromCore) {
+ cur.decrementTransactionCount();
+ if (!cur.isInTransaction()) {
+ owner.releaseFromTransaction(cur, true);
}
+ }
}
+ }
- /** Discards the indicated blocks and notify the owner. */
- private void discardBlocks(ArrayList blocks)
- throws IOException {
- for (Iterator k = blocks.iterator(); k.hasNext(); ) {
- BlockIo cur = (BlockIo) k.next();
- cur.decrementTransactionCount();
- if (!cur.isInTransaction()) {
- owner.releaseFromTransaction(cur, false);
- }
- }
+ /** Set clean flag on the blocks. */
+ private void setClean(ArrayList blocks) throws IOException {
+ for (Iterator k = blocks.iterator(); k.hasNext();) {
+ BlockIo cur = (BlockIo) k.next();
+ cur.setClean();
+ }
+ }
+
+ /** Discards the indicated blocks and notify the owner. */
+ private void discardBlocks(ArrayList blocks) throws IOException {
+ for (Iterator k = blocks.iterator(); k.hasNext();) {
+ BlockIo cur = (BlockIo) k.next();
+ cur.decrementTransactionCount();
+ if (!cur.isInTransaction()) {
+ owner.releaseFromTransaction(cur, false);
+ }
}
+ }
- /**
- * Starts a transaction. This can block if all slots have been filled
- * with full transactions, waiting for the synchronization thread to
- * clean out slots.
- */
- void start() throws IOException {
- curTxn++;
- if (curTxn == _maxTxns) {
- synchronizeLogFromMemory();
- curTxn = 0;
- }
- txns[curTxn] = new ArrayList();
+ /**
+ * Starts a transaction. This can block if all slots have been filled with
+ * full transactions, waiting for the synchronization thread to clean out
+ * slots.
+ */
+ void start() throws IOException {
+ curTxn++;
+ if (curTxn == _maxTxns) {
+ synchronizeLogFromMemory();
+ curTxn = 0;
+ }
+ txns[curTxn] = new ArrayList();
+ }
+
+ /**
+ * Indicates the block is part of the transaction.
+ */
+ void add(BlockIo block) throws IOException {
+ block.incrementTransactionCount();
+ txns[curTxn].add(block);
+ }
+
+ /**
+ * Commits the transaction to the log file.
+ */
+ void commit() throws IOException {
+ oos.writeObject(txns[curTxn]);
+ sync();
+
+ // set clean flag to indicate blocks have been written to log
+ setClean(txns[curTxn]);
+
+ // open a new ObjectOutputStream in order to store
+ // newer states of BlockIo
+ oos = new ObjectOutputStream(fos);
+ }
+
+ /** Flushes and syncs */
+ private void sync() throws IOException {
+ oos.flush();
+ fos.flush();
+ fos.getFD().sync();
+ }
+
+ /**
+ * Shutdowns the transaction manager. Resynchronizes outstanding logs.
+ */
+ void shutdown() throws IOException {
+ synchronizeLogFromMemory();
+ close();
+ }
+
+ /**
+ * Closes open files.
+ */
+ private void close() throws IOException {
+ sync();
+ oos.close();
+ fos.close();
+ oos = null;
+ fos = null;
+ }
+
+ public void removeLogFile() {
+ // if file is not closed yet, just return
+ if (oos != null) {
+ return;
+ }
+ if (logFileName != null) {
+ File file = new File(logFileName);
+ file.delete();
+ logFileName = null;
+ }
+ }
+
+ /**
+ * Force closing the file without synchronizing pending transaction data. Used
+ * for testing purposes only.
+ */
+ void forceClose() throws IOException {
+ oos.close();
+ fos.close();
+ oos = null;
+ fos = null;
+ }
+
+ /**
+ * Use the disk-based transaction log to synchronize the data file.
+ * Outstanding memory logs are discarded because they are believed to be
+ * inconsistent.
+ */
+ void synchronizeLogFromDisk() throws IOException {
+ close();
+
+ for (int i = 0; i < _maxTxns; i++) {
+ if (txns[i] == null) {
+ continue;
+ }
+ discardBlocks(txns[i]);
+ txns[i] = null;
}
- /**
- * Indicates the block is part of the transaction.
- */
- void add(BlockIo block) throws IOException {
- block.incrementTransactionCount();
- txns[curTxn].add(block);
- }
-
- /**
- * Commits the transaction to the log file.
- */
- void commit() throws IOException {
- oos.writeObject(txns[curTxn]);
- sync();
-
- // set clean flag to indicate blocks have been written to log
- setClean(txns[curTxn]);
-
- // open a new ObjectOutputStream in order to store
- // newer states of BlockIo
- oos = new ObjectOutputStream(fos);
- }
-
- /** Flushes and syncs */
- private void sync() throws IOException {
- oos.flush();
- fos.flush();
- fos.getFD().sync();
- }
-
- /**
- * Shutdowns the transaction manager. Resynchronizes outstanding
- * logs.
- */
- void shutdown() throws IOException {
- synchronizeLogFromMemory();
- close();
- }
-
- /**
- * Closes open files.
- */
- private void close() throws IOException {
- sync();
- oos.close();
- fos.close();
- oos = null;
- fos = null;
- }
-
- public void removeLogFile() {
- // if file is not closed yet, just return
- if ( oos != null )
- return;
- if ( logFileName != null ) {
- File file = new File(logFileName) ;
- file.delete();
- logFileName = null;
+ recover();
+ open();
+ }
+
+ /**
+ * INNER CLASS. Comparator class for use by the tree set used to store the
+ * blocks to write for this transaction. The BlockIo objects are ordered by
+ * their blockIds.
+ */
+ public static class BlockIoComparator implements Comparator {
+
+ public int compare(Object o1, Object o2) {
+ BlockIo block1 = (BlockIo) o1;
+ BlockIo block2 = (BlockIo) o2;
+ int result = 0;
+ if (block1.getBlockId() == block2.getBlockId()) {
+ result = 0;
+ } else if (block1.getBlockId() < block2.getBlockId()) {
+ result = -1;
+ } else {
+ result = 1;
}
+ return result;
}
- /**
- * Force closing the file without synchronizing pending transaction data.
- * Used for testing purposes only.
- */
- void forceClose() throws IOException {
- oos.close();
- fos.close();
- oos = null;
- fos = null;
- }
-
- /**
- * Use the disk-based transaction log to synchronize the data file.
- * Outstanding memory logs are discarded because they are believed
- * to be inconsistent.
- */
- void synchronizeLogFromDisk() throws IOException {
- close();
-
- for ( int i=0; i < _maxTxns; i++ ) {
- if (txns[i] == null)
- continue;
- discardBlocks(txns[i]);
- txns[i] = null;
- }
-
- recover();
- open();
+ @Override
+ public boolean equals(Object obj) {
+ return super.equals(obj);
}
-
-
- /** INNER CLASS.
- * Comparator class for use by the tree set used to store the blocks
- * to write for this transaction. The BlockIo objects are ordered by
- * their blockIds.
- */
- public static class BlockIoComparator
- implements Comparator
- {
-
- public int compare( Object o1, Object o2 ) {
- BlockIo block1 = (BlockIo)o1;
- BlockIo block2 = (BlockIo)o2;
- int result = 0;
- if ( block1.getBlockId() == block2.getBlockId() ) {
- result = 0;
- }
- else if ( block1.getBlockId() < block2.getBlockId() ) {
- result = -1;
- }
- else {
- result = 1;
- }
- return result;
- }
-
- public boolean equals(Object obj) {
- return super.equals(obj);
- }
- } // class BlockIOComparator
+ } // class BlockIOComparator
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TranslationPage.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TranslationPage.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TranslationPage.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TranslationPage.java Thu Jan 21 10:37:58 2010
@@ -66,44 +66,45 @@
package org.apache.hadoop.hive.ql.util.jdbm.recman;
/**
- * Class describing a page that holds translations from physical rowids
- * to logical rowids. In fact, the page just holds physical rowids - the
- * page's block is the block for the logical rowid, the offset serve
- * as offset for the rowids.
+ * Class describing a page that holds translations from physical rowids to
+ * logical rowids. In fact, the page just holds physical rowids - the page's
+ * block is the block for the logical rowid, the offset serve as offset for the
+ * rowids.
*/
final class TranslationPage extends PageHeader {
- // offsets
- static final short O_TRANS = PageHeader.SIZE; // short count
- static final short ELEMS_PER_PAGE =
- (RecordFile.BLOCK_SIZE - O_TRANS) / PhysicalRowId.SIZE;
-
- // slots we returned.
- final PhysicalRowId[] slots = new PhysicalRowId[ELEMS_PER_PAGE];
+ // offsets
+ static final short O_TRANS = PageHeader.SIZE; // short count
+ static final short ELEMS_PER_PAGE = (RecordFile.BLOCK_SIZE - O_TRANS)
+ / PhysicalRowId.SIZE;
- /**
- * Constructs a data page view from the indicated block.
- */
- TranslationPage(BlockIo block) {
- super(block);
- }
+ // slots we returned.
+ final PhysicalRowId[] slots = new PhysicalRowId[ELEMS_PER_PAGE];
+
+ /**
+ * Constructs a data page view from the indicated block.
+ */
+ TranslationPage(BlockIo block) {
+ super(block);
+ }
- /**
- * Factory method to create or return a data page for the
- * indicated block.
- */
- static TranslationPage getTranslationPageView(BlockIo block) {
- BlockView view = block.getView();
- if (view != null && view instanceof TranslationPage)
- return (TranslationPage) view;
- else
- return new TranslationPage(block);
+ /**
+ * Factory method to create or return a data page for the indicated block.
+ */
+ static TranslationPage getTranslationPageView(BlockIo block) {
+ BlockView view = block.getView();
+ if (view != null && view instanceof TranslationPage) {
+ return (TranslationPage) view;
+ } else {
+ return new TranslationPage(block);
}
+ }
- /** Returns the value of the indicated rowid on the page */
- PhysicalRowId get(short offset) {
- int slot = (offset - O_TRANS) / PhysicalRowId.SIZE;
- if (slots[slot] == null)
- slots[slot] = new PhysicalRowId(block, offset);
- return slots[slot];
+ /** Returns the value of the indicated rowid on the page */
+ PhysicalRowId get(short offset) {
+ int slot = (offset - O_TRANS) / PhysicalRowId.SIZE;
+ if (slots[slot] == null) {
+ slots[slot] = new PhysicalRowId(block, offset);
}
+ return slots[slot];
+ }
}
Modified: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (original)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java Thu Jan 21 10:37:58 2010
@@ -38,11 +38,9 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.cli.CliDriver;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -62,28 +60,27 @@
import org.apache.hadoop.hive.serde2.thrift.test.Complex;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.MiniMRCluster;
-
import org.apache.thrift.protocol.TBinaryProtocol;
public class QTestUtil {
private String testWarehouse;
- private String tmpdir = System.getProperty("test.tmp.dir");
- private Path tmppath = new Path(tmpdir);
+ private final String tmpdir = System.getProperty("test.tmp.dir");
+ private final Path tmppath = new Path(tmpdir);
- private String testFiles;
- private String outDir;
- private String logDir;
- private TreeMap<String, String> qMap;
- private LinkedList<String> srcTables;
+ private final String testFiles;
+ private final String outDir;
+ private final String logDir;
+ private final TreeMap<String, String> qMap;
+ private final LinkedList<String> srcTables;
private ParseDriver pd;
private Hive db;
- private HiveConf conf;
+ private final HiveConf conf;
private Driver drv;
private SemanticAnalyzer sem;
private FileSystem fs;
@@ -96,16 +93,15 @@
public boolean deleteDirectory(File path) {
if (path.exists()) {
File[] files = path.listFiles();
- for(int i=0; i<files.length; i++) {
- if(files[i].isDirectory()) {
- deleteDirectory(files[i]);
- }
- else {
- files[i].delete();
- }
+ for (File file : files) {
+ if (file.isDirectory()) {
+ deleteDirectory(file);
+ } else {
+ file.delete();
+ }
}
}
- return(path.delete());
+ return (path.delete());
}
public void copyDirectoryToLocal(Path src, Path dest) throws Exception {
@@ -114,9 +110,9 @@
FileSystem destFs = dest.getFileSystem(conf);
if (srcFs.exists(src)) {
FileStatus[] files = srcFs.listStatus(src);
- for(int i=0; i<files.length; i++) {
- String name = files[i].getPath().getName();
- Path dfs_path = files[i].getPath();
+ for (FileStatus file : files) {
+ String name = file.getPath().getName();
+ Path dfs_path = file.getPath();
Path local_path = new Path(dest, name);
// If this is a source table we do not copy it out
@@ -124,13 +120,12 @@
continue;
}
- if(files[i].isDir()) {
+ if (file.isDir()) {
if (!destFs.exists(local_path)) {
destFs.mkdirs(local_path);
}
copyDirectoryToLocal(dfs_path, local_path);
- }
- else {
+ } else {
srcFs.copyToLocalFile(dfs_path, local_path);
}
}
@@ -141,13 +136,12 @@
static Pattern reduceTok = Pattern.compile("(.*)(reduce_[^\\.]*)((\\..*)?)");
public void normalizeNames(File path) throws Exception {
- if(path.isDirectory()) {
+ if (path.isDirectory()) {
File[] files = path.listFiles();
- for(int i=0; i<files.length; i++) {
- normalizeNames(files[i]);
+ for (File file : files) {
+ normalizeNames(file);
}
- }
- else {
+ } else {
// System.out.println("Trying to match: " + path.getPath());
Matcher m = reduceTok.matcher(path.getName());
if (m.matches()) {
@@ -169,7 +163,8 @@
this(outDir, logDir, false);
}
- public QTestUtil(String outDir, String logDir, boolean miniMr) throws Exception {
+ public QTestUtil(String outDir, String logDir, boolean miniMr)
+ throws Exception {
this.outDir = outDir;
this.logDir = logDir;
conf = new HiveConf(Driver.class);
@@ -184,17 +179,19 @@
// hive.metastore.warehouse.dir needs to be set relative to the jobtracker
String fsName = conf.get("fs.default.name");
assert fsName != null;
- conf.set("hive.metastore.warehouse.dir", fsName.concat("/build/ql/test/data/warehouse/"));
+ conf.set("hive.metastore.warehouse.dir", fsName
+ .concat("/build/ql/test/data/warehouse/"));
conf.set("mapred.job.tracker", "localhost:" + mr.getJobTrackerPort());
}
// System.out.println(conf.toString());
- testFiles = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
+ testFiles = conf.get("test.data.files").replace('\\', '/')
+ .replace("c:", "");
String ow = System.getProperty("test.output.overwrite");
overWrite = false;
- if ((ow != null) && ow.equalsIgnoreCase("true")){
+ if ((ow != null) && ow.equalsIgnoreCase("true")) {
overWrite = true;
}
@@ -230,7 +227,7 @@
StringBuffer qsb = new StringBuffer();
// Read the entire query
- while(dis.available() != 0) {
+ while (dis.available() != 0) {
qsb.append(dis.readLine() + "\n");
}
qMap.put(qf.getName(), qsb.toString());
@@ -239,14 +236,13 @@
public void cleanUp() throws Exception {
String warehousePath = ((new URI(testWarehouse)).getPath());
// Drop any tables that remain due to unsuccessful runs
- for(String s: new String [] {"src", "src1", "src_json", "src_thrift", "src_sequencefile",
- "srcpart", "srcbucket","srcbucket2", "dest1", "dest2",
- "dest3", "dest4", "dest4_sequencefile",
- "dest_j1", "dest_j2", "dest_g1", "dest_g2",
- "fetchtask_ioexception"}) {
+ for (String s : new String[] { "src", "src1", "src_json", "src_thrift",
+ "src_sequencefile", "srcpart", "srcbucket", "srcbucket2", "dest1",
+ "dest2", "dest3", "dest4", "dest4_sequencefile", "dest_j1", "dest_j2",
+ "dest_g1", "dest_g2", "fetchtask_ioexception" }) {
db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, s);
}
- for(String s: new String [] {"dest4.out", "union.out"}) {
+ for (String s : new String[] { "dest4.out", "union.out" }) {
deleteDirectory(new File(warehousePath, s));
}
FunctionRegistry.unregisterTemporaryUDF("test_udaf");
@@ -255,8 +251,9 @@
private void runLoadCmd(String loadCmd) throws Exception {
int ecode = 0;
ecode = drv.run(loadCmd);
- if(ecode != 0) {
- throw new Exception("load command: " + loadCmd + " failed with exit code= " + ecode);
+ if (ecode != 0) {
+ throw new Exception("load command: " + loadCmd
+ + " failed with exit code= " + ecode);
}
return;
@@ -265,8 +262,9 @@
private void runCreateTableCmd(String createTableCmd) throws Exception {
int ecode = 0;
ecode = drv.run(createTableCmd);
- if(ecode != 0) {
- throw new Exception("create table command: " + createTableCmd + " failed with exit code= " + ecode);
+ if (ecode != 0) {
+ throw new Exception("create table command: " + createTableCmd
+ + " failed with exit code= " + ecode);
}
return;
@@ -282,105 +280,120 @@
LinkedList<String> part_cols = new LinkedList<String>();
part_cols.add("ds");
part_cols.add("hr");
- db.createTable("srcpart", cols, part_cols, TextInputFormat.class, IgnoreKeyTextOutputFormat.class);
+ db.createTable("srcpart", cols, part_cols, TextInputFormat.class,
+ IgnoreKeyTextOutputFormat.class);
srcTables.add("srcpart");
Path fpath;
Path newfpath;
HashMap<String, String> part_spec = new HashMap<String, String>();
- for (String ds: new String[]{"2008-04-08", "2008-04-09"}) {
- for (String hr: new String[]{"11", "12"}) {
+ for (String ds : new String[] { "2008-04-08", "2008-04-09" }) {
+ for (String hr : new String[] { "11", "12" }) {
part_spec.clear();
part_spec.put("ds", ds);
part_spec.put("hr", hr);
// System.out.println("Loading partition with spec: " + part_spec);
- //db.createPartition(srcpart, part_spec);
+ // db.createPartition(srcpart, part_spec);
fpath = new Path(testFiles, "kv1.txt");
newfpath = new Path(tmppath, "kv1.txt");
fs.copyFromLocalFile(false, true, fpath, newfpath);
fpath = newfpath;
- //db.loadPartition(fpath, srcpart.getName(), part_spec, true);
- runLoadCmd("LOAD DATA INPATH '" + newfpath.toString() +
- "' OVERWRITE INTO TABLE srcpart PARTITION (ds='" + ds + "',hr='" + hr +"')");
+ // db.loadPartition(fpath, srcpart.getName(), part_spec, true);
+ runLoadCmd("LOAD DATA INPATH '" + newfpath.toString()
+ + "' OVERWRITE INTO TABLE srcpart PARTITION (ds='" + ds + "',hr='"
+ + hr + "')");
}
}
ArrayList<String> bucketCols = new ArrayList<String>();
bucketCols.add("key");
runCreateTableCmd("CREATE TABLE srcbucket(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE");
- //db.createTable("srcbucket", cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class, 2, bucketCols);
+ // db.createTable("srcbucket", cols, null, TextInputFormat.class,
+ // IgnoreKeyTextOutputFormat.class, 2, bucketCols);
srcTables.add("srcbucket");
- for (String fname: new String [] {"srcbucket0.txt", "srcbucket1.txt"}) {
+ for (String fname : new String[] { "srcbucket0.txt", "srcbucket1.txt" }) {
fpath = new Path(testFiles, fname);
newfpath = new Path(tmppath, fname);
fs.copyFromLocalFile(false, true, fpath, newfpath);
- runLoadCmd("LOAD DATA INPATH '" + newfpath.toString() + "' INTO TABLE srcbucket");
+ runLoadCmd("LOAD DATA INPATH '" + newfpath.toString()
+ + "' INTO TABLE srcbucket");
}
-
+
runCreateTableCmd("CREATE TABLE srcbucket2(key int, value string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE");
- //db.createTable("srcbucket", cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class, 2, bucketCols);
+ // db.createTable("srcbucket", cols, null, TextInputFormat.class,
+ // IgnoreKeyTextOutputFormat.class, 2, bucketCols);
srcTables.add("srcbucket2");
- for (String fname: new String [] {"srcbucket20.txt", "srcbucket21.txt", "srcbucket22.txt", "srcbucket23.txt"}) {
+ for (String fname : new String[] { "srcbucket20.txt", "srcbucket21.txt",
+ "srcbucket22.txt", "srcbucket23.txt" }) {
fpath = new Path(testFiles, fname);
newfpath = new Path(tmppath, fname);
fs.copyFromLocalFile(false, true, fpath, newfpath);
- runLoadCmd("LOAD DATA INPATH '" + newfpath.toString() + "' INTO TABLE srcbucket2");
+ runLoadCmd("LOAD DATA INPATH '" + newfpath.toString()
+ + "' INTO TABLE srcbucket2");
}
- for (String tname: new String [] {"src", "src1"}) {
- db.createTable(tname, cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class);
+ for (String tname : new String[] { "src", "src1" }) {
+ db.createTable(tname, cols, null, TextInputFormat.class,
+ IgnoreKeyTextOutputFormat.class);
srcTables.add(tname);
}
- db.createTable("src_sequencefile", cols, null, SequenceFileInputFormat.class, SequenceFileOutputFormat.class);
+ db.createTable("src_sequencefile", cols, null,
+ SequenceFileInputFormat.class, SequenceFileOutputFormat.class);
srcTables.add("src_sequencefile");
Table srcThrift = new Table("src_thrift");
srcThrift.setInputFormatClass(SequenceFileInputFormat.class.getName());
srcThrift.setOutputFormatClass(SequenceFileOutputFormat.class.getName());
srcThrift.setSerializationLib(ThriftDeserializer.class.getName());
- srcThrift.setSerdeParam(Constants.SERIALIZATION_CLASS, Complex.class.getName());
- srcThrift.setSerdeParam(Constants.SERIALIZATION_FORMAT, TBinaryProtocol.class.getName());
+ srcThrift.setSerdeParam(Constants.SERIALIZATION_CLASS, Complex.class
+ .getName());
+ srcThrift.setSerdeParam(Constants.SERIALIZATION_FORMAT,
+ TBinaryProtocol.class.getName());
db.createTable(srcThrift);
srcTables.add("src_thrift");
LinkedList<String> json_cols = new LinkedList<String>();
json_cols.add("json");
- db.createTable("src_json", json_cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class);
+ db.createTable("src_json", json_cols, null, TextInputFormat.class,
+ IgnoreKeyTextOutputFormat.class);
srcTables.add("src_json");
// load the input data into the src table
fpath = new Path(testFiles, "kv1.txt");
newfpath = new Path(tmppath, "kv1.txt");
fs.copyFromLocalFile(false, true, fpath, newfpath);
- //db.loadTable(newfpath, "src", false);
- runLoadCmd("LOAD DATA INPATH '" + newfpath.toString() + "' INTO TABLE src");
+ // db.loadTable(newfpath, "src", false);
+ runLoadCmd("LOAD DATA INPATH '" + newfpath.toString() + "' INTO TABLE src");
// load the input data into the src table
fpath = new Path(testFiles, "kv3.txt");
newfpath = new Path(tmppath, "kv3.txt");
fs.copyFromLocalFile(false, true, fpath, newfpath);
- //db.loadTable(newfpath, "src1", false);
- runLoadCmd("LOAD DATA INPATH '" + newfpath.toString() + "' INTO TABLE src1");
+ // db.loadTable(newfpath, "src1", false);
+ runLoadCmd("LOAD DATA INPATH '" + newfpath.toString() + "' INTO TABLE src1");
// load the input data into the src_sequencefile table
fpath = new Path(testFiles, "kv1.seq");
newfpath = new Path(tmppath, "kv1.seq");
fs.copyFromLocalFile(false, true, fpath, newfpath);
- //db.loadTable(newfpath, "src_sequencefile", true);
- runLoadCmd("LOAD DATA INPATH '" + newfpath.toString() + "' INTO TABLE src_sequencefile");
+ // db.loadTable(newfpath, "src_sequencefile", true);
+ runLoadCmd("LOAD DATA INPATH '" + newfpath.toString()
+ + "' INTO TABLE src_sequencefile");
// load the input data into the src_thrift table
fpath = new Path(testFiles, "complex.seq");
newfpath = new Path(tmppath, "complex.seq");
fs.copyFromLocalFile(false, true, fpath, newfpath);
- //db.loadTable(newfpath, "src_thrift", true);
- runLoadCmd("LOAD DATA INPATH '" + newfpath.toString() + "' INTO TABLE src_thrift");
+ // db.loadTable(newfpath, "src_thrift", true);
+ runLoadCmd("LOAD DATA INPATH '" + newfpath.toString()
+ + "' INTO TABLE src_thrift");
// load the json data into the src_json table
fpath = new Path(testFiles, "json.txt");
newfpath = new Path(tmppath, "json.txt");
fs.copyFromLocalFile(false, true, fpath, newfpath);
- //db.loadTable(newfpath, "src_json", false);
- runLoadCmd("LOAD DATA INPATH '" + newfpath.toString() + "' INTO TABLE src_json");
+ // db.loadTable(newfpath, "src_json", false);
+ runLoadCmd("LOAD DATA INPATH '" + newfpath.toString()
+ + "' INTO TABLE src_json");
}
@@ -410,10 +423,13 @@
part_cols.add("ds");
part_cols.add("hr");
- db.createTable("dest1", cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class);
- db.createTable("dest2", cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class);
+ db.createTable("dest1", cols, null, TextInputFormat.class,
+ IgnoreKeyTextOutputFormat.class);
+ db.createTable("dest2", cols, null, TextInputFormat.class,
+ IgnoreKeyTextOutputFormat.class);
- db.createTable("dest3", cols, part_cols, TextInputFormat.class, IgnoreKeyTextOutputFormat.class);
+ db.createTable("dest3", cols, part_cols, TextInputFormat.class,
+ IgnoreKeyTextOutputFormat.class);
Table dest3 = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, "dest3");
HashMap<String, String> part_spec = new HashMap<String, String>();
@@ -421,8 +437,10 @@
part_spec.put("hr", "12");
db.createPartition(dest3, part_spec);
- db.createTable("dest4", cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class);
- db.createTable("dest4_sequencefile", cols, null, SequenceFileInputFormat.class, SequenceFileOutputFormat.class);
+ db.createTable("dest4", cols, null, TextInputFormat.class,
+ IgnoreKeyTextOutputFormat.class);
+ db.createTable("dest4_sequencefile", cols, null,
+ SequenceFileInputFormat.class, SequenceFileOutputFormat.class);
}
public void cliInit(String tname) throws Exception {
@@ -436,7 +454,7 @@
}
CliSessionState ss = new CliSessionState(conf);
- assert ss!= null;
+ assert ss != null;
ss.in = System.in;
File qf = new File(outDir, tname);
@@ -454,11 +472,12 @@
public int executeOne(String tname) {
String q = qMap.get(tname);
- if(q.indexOf(";") == -1)
+ if (q.indexOf(";") == -1) {
return -1;
+ }
String q1 = q.substring(0, q.indexOf(";") + 1);
- String qrest = q.substring(q.indexOf(";")+1);
+ String qrest = q.substring(q.indexOf(";") + 1);
qMap.put(tname, qrest);
System.out.println("Executing " + q1);
@@ -483,10 +502,12 @@
cols.add("value");
// Move all data from dest4_sequencefile to dest4
- drv.run("FROM dest4_sequencefile INSERT OVERWRITE TABLE dest4 SELECT dest4_sequencefile.*");
+ drv
+ .run("FROM dest4_sequencefile INSERT OVERWRITE TABLE dest4 SELECT dest4_sequencefile.*");
// Drop dest4_sequencefile
- db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, "dest4_sequencefile", true, true);
+ db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, "dest4_sequencefile",
+ true, true);
}
public int checkNegativeResults(String tname, Exception e) throws Exception {
@@ -502,11 +523,9 @@
FileWriter outfd = new FileWriter(outf);
if (e instanceof ParseException) {
outfd.write("Parse Error: ");
- }
- else if (e instanceof SemanticException) {
+ } else if (e instanceof SemanticException) {
outfd.write("Semantic Exception: \n");
- }
- else {
+ } else {
throw e;
}
@@ -518,15 +537,17 @@
Process executor = Runtime.getRuntime().exec(cmdLine);
- StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
- StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, System.err);
+ StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(),
+ null, System.out);
+ StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(),
+ null, System.err);
outPrinter.start();
errPrinter.start();
int exitVal = executor.waitFor();
- if(exitVal != 0 && overWrite) {
+ if (exitVal != 0 && overWrite) {
System.out.println("Overwriting results");
cmdLine = "cp " + outf.getPath() + " " + expf.getPath();
executor = Runtime.getRuntime().exec(cmdLine);
@@ -555,15 +576,17 @@
Process executor = Runtime.getRuntime().exec(cmdLine);
- StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
- StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, System.err);
+ StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(),
+ null, System.out);
+ StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(),
+ null, System.err);
outPrinter.start();
errPrinter.start();
int exitVal = executor.waitFor();
- if(exitVal != 0 && overWrite) {
+ if (exitVal != 0 && overWrite) {
System.out.println("Overwriting results");
cmdLine = "cp " + outf.getPath() + " " + expf.getPath();
executor = Runtime.getRuntime().exec(cmdLine);
@@ -571,13 +594,13 @@
}
return exitVal;
- }
- else {
+ } else {
throw new Exception("Parse tree is null");
}
}
- public int checkPlan(String tname, List<Task<? extends Serializable>> tasks) throws Exception {
+ public int checkPlan(String tname, List<Task<? extends Serializable>> tasks)
+ throws Exception {
if (tasks != null) {
File planDir = new File(outDir, "plan");
@@ -588,35 +611,37 @@
outf = new File(outf, tname.concat(".xml"));
FileOutputStream ofs = new FileOutputStream(outf);
- for(Task<? extends Serializable> plan: tasks) {
+ for (Task<? extends Serializable> plan : tasks) {
Utilities.serializeTasks(plan, ofs);
}
- String [] cmdArray = new String[6];
+ String[] cmdArray = new String[6];
cmdArray[0] = "diff";
cmdArray[1] = "-b";
cmdArray[2] = "-I";
- cmdArray[3] = "\\(\\(<java version=\".*\" class=\"java.beans.XMLDecoder\">\\)" +
- "\\|\\(<string>.*/tmp/.*</string>\\)" +
- "\\|\\(<string>file:.*</string>\\)" +
- "\\|\\(<string>[0-9]\\{10\\}</string>\\)" +
- "\\|\\(<string>/.*/warehouse/.*</string>\\)\\)";
+ cmdArray[3] = "\\(\\(<java version=\".*\" class=\"java.beans.XMLDecoder\">\\)"
+ + "\\|\\(<string>.*/tmp/.*</string>\\)"
+ + "\\|\\(<string>file:.*</string>\\)"
+ + "\\|\\(<string>[0-9]\\{10\\}</string>\\)"
+ + "\\|\\(<string>/.*/warehouse/.*</string>\\)\\)";
cmdArray[4] = outf.getPath();
cmdArray[5] = planFile.getPath();
- System.out.println(cmdArray[0] + " " + cmdArray[1] + " " + cmdArray[2] + "\'" + cmdArray[3] + "\'" +
- " " + cmdArray[4] + " " + cmdArray[5]);
+ System.out.println(cmdArray[0] + " " + cmdArray[1] + " " + cmdArray[2]
+ + "\'" + cmdArray[3] + "\'" + " " + cmdArray[4] + " " + cmdArray[5]);
Process executor = Runtime.getRuntime().exec(cmdArray);
- StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
- StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, System.err);
+ StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(),
+ null, System.out);
+ StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(),
+ null, System.err);
outPrinter.start();
errPrinter.start();
int exitVal = executor.waitFor();
- if(exitVal != 0 && overWrite) {
+ if (exitVal != 0 && overWrite) {
System.out.println("Overwriting results");
String cmdLine = "cp " + outf.getPath() + " " + planFile.getPath();
executor = Runtime.getRuntime().exec(cmdLine);
@@ -624,8 +649,7 @@
}
return exitVal;
- }
- else {
+ } else {
throw new Exception("Plan is null");
}
@@ -638,7 +662,8 @@
Path localPath = new Path(FileSystem.getLocal(conf).getUri().getPath());
localPath = new Path(localPath, logDir);
localPath = new Path(localPath, "warehouse_local_copy");
- System.out.println("warehousePath = " + warehousePath.toString() + " localPath = " + localPath.toString());
+ System.out.println("warehousePath = " + warehousePath.toString()
+ + " localPath = " + localPath.toString());
if (FileSystem.getLocal(conf).exists(localPath)) {
FileSystem.getLocal(conf).delete(localPath, true);
@@ -647,7 +672,7 @@
copyDirectoryToLocal(warehousePath, localPath);
normalizeNames(new File(localPath.toUri().getPath()));
- String [] cmdArray;
+ String[] cmdArray;
if (overWrite == false) {
cmdArray = new String[6];
cmdArray[0] = "diff";
@@ -656,22 +681,24 @@
cmdArray[3] = "--exclude=.svn";
cmdArray[4] = localPath.toUri().getPath();
cmdArray[5] = (new File(outDir, tname)).getPath() + "/warehouse";
- System.out.println(cmdArray[0] + " " + cmdArray[1] + " " + cmdArray[2] + " " +
- cmdArray[3] + " " + cmdArray[4] + " " + cmdArray[5]);
- }
- else {
+ System.out.println(cmdArray[0] + " " + cmdArray[1] + " " + cmdArray[2]
+ + " " + cmdArray[3] + " " + cmdArray[4] + " " + cmdArray[5]);
+ } else {
System.out.println("overwritting");
// Remove any existing output
- String [] cmdArray1 = new String[5];
+ String[] cmdArray1 = new String[5];
cmdArray1[0] = "rm";
cmdArray1[1] = "-rf";
cmdArray1[2] = (new File(outDir, tname)).getPath();
- System.out.println(cmdArray1[0] + " " + cmdArray1[1] + " " + cmdArray1[2]);
+ System.out
+ .println(cmdArray1[0] + " " + cmdArray1[1] + " " + cmdArray1[2]);
Process executor = Runtime.getRuntime().exec(cmdArray1);
- StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
- StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, System.err);
+ StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(),
+ null, System.out);
+ StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(),
+ null, System.err);
outPrinter.start();
errPrinter.start();
@@ -686,13 +713,16 @@
cmdArray[1] = "-r";
cmdArray[2] = localPath.toUri().getPath();
cmdArray[3] = (new File(outDir, tname)).getPath();
- System.out.println(cmdArray[0] + " " + cmdArray[1] + " " + cmdArray[2] + " " + cmdArray[3]);
+ System.out.println(cmdArray[0] + " " + cmdArray[1] + " " + cmdArray[2]
+ + " " + cmdArray[3]);
}
Process executor = Runtime.getRuntime().exec(cmdArray);
- StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
- StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, System.err);
+ StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(),
+ null, System.out);
+ StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(),
+ null, System.err);
outPrinter.start();
errPrinter.start();
@@ -703,42 +733,29 @@
}
public int checkCliDriverResults(String tname) throws Exception {
- String [] cmdArray;
+ String[] cmdArray;
- cmdArray = new String[] {
- "diff",
- "-a",
- "-I",
- "file:",
- "-I",
- "/tmp/",
- "-I",
- "invalidscheme:",
- "-I",
- "lastUpdateTime",
- "-I",
- "lastAccessTime",
- "-I",
- "owner",
- "-I",
- "transient_lastDdlTime",
+ cmdArray = new String[] { "diff", "-a", "-I", "file:", "-I", "/tmp/", "-I",
+ "invalidscheme:", "-I", "lastUpdateTime", "-I", "lastAccessTime", "-I",
+ "owner", "-I", "transient_lastDdlTime",
(new File(logDir, tname + ".out")).getPath(),
- (new File(outDir, tname + ".out")).getPath()
- };
+ (new File(outDir, tname + ".out")).getPath() };
System.out.println(org.apache.commons.lang.StringUtils.join(cmdArray, ' '));
Process executor = Runtime.getRuntime().exec(cmdArray);
- StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
- StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, System.err);
+ StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(),
+ null, System.out);
+ StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(),
+ null, System.err);
outPrinter.start();
errPrinter.start();
int exitVal = executor.waitFor();
- if(exitVal != 0 && overWrite) {
+ if (exitVal != 0 && overWrite) {
System.out.println("Overwriting results");
cmdArray = new String[3];
cmdArray[0] = "cp";
@@ -756,12 +773,13 @@
return pd.parse(qMap.get(tname));
}
- public List<Task<? extends Serializable>> analyzeAST(ASTNode ast) throws Exception {
+ public List<Task<? extends Serializable>> analyzeAST(ASTNode ast)
+ throws Exception {
// Do semantic analysis and plan generation
Context ctx = new Context(conf);
- while((ast.getToken() == null) && (ast.getChildCount() > 0)) {
- ast = (ASTNode)ast.getChild(0);
+ while ((ast.getToken() == null) && (ast.getChildCount() > 0)) {
+ ast = (ASTNode) ast.getChild(0);
}
sem.analyze(ast, ctx);
@@ -769,19 +787,17 @@
return sem.getRootTasks();
}
-
public TreeMap<String, String> getQMap() {
return qMap;
}
-
/**
* QTRunner: Runnable class for running a a single query file
- *
+ *
**/
public static class QTRunner implements Runnable {
- private QTestUtil qt;
- private String fname;
+ private final QTestUtil qt;
+ private final String fname;
public QTRunner(QTestUtil qt, String fname) {
this.qt = qt;
@@ -795,7 +811,8 @@
qt.cliInit(fname, false);
qt.executeClient(fname);
} catch (Throwable e) {
- System.err.println("Query file " + fname + " failed with exception " + e.getMessage());
+ System.err.println("Query file " + fname + " failed with exception "
+ + e.getMessage());
e.printStackTrace();
System.err.flush();
}
@@ -803,26 +820,33 @@
}
/**
- * executes a set of query files either in sequence or in parallel.
- * Uses QTestUtil to do so
- *
- * @param qfiles array of input query files containing arbitrary number of hive queries
- * @param resDirs array of output directories one corresponding to each input query file
- * @param mt whether to run in multithreaded mode or not
+ * executes a set of query files either in sequence or in parallel. Uses
+ * QTestUtil to do so
+ *
+ * @param qfiles
+ * array of input query files containing arbitrary number of hive
+ * queries
+ * @param resDirs
+ * array of output directories one corresponding to each input query
+ * file
+ * @param mt
+ * whether to run in multithreaded mode or not
* @return true if all the query files were executed successfully, else false
- *
- * In multithreaded mode each query file is run in a separate thread. the caller has to
- * arrange that different query files do not collide (in terms of destination tables)
+ *
+ * In multithreaded mode each query file is run in a separate thread.
+ * the caller has to arrange that different query files do not collide
+ * (in terms of destination tables)
*/
- public static boolean queryListRunner(File [] qfiles, String [] resDirs, String[] logDirs, boolean mt) {
+ public static boolean queryListRunner(File[] qfiles, String[] resDirs,
+ String[] logDirs, boolean mt) {
- assert(qfiles.length == resDirs.length);
- assert(qfiles.length == logDirs.length);
+ assert (qfiles.length == resDirs.length);
+ assert (qfiles.length == logDirs.length);
boolean failed = false;
try {
- QTestUtil[] qt = new QTestUtil [qfiles.length];
- for(int i=0; i<qfiles.length; i++) {
+ QTestUtil[] qt = new QTestUtil[qfiles.length];
+ for (int i = 0; i < qfiles.length; i++) {
qt[i] = new QTestUtil(resDirs[i], logDirs[i]);
qt[i].addFile(qfiles[i]);
}
@@ -833,36 +857,38 @@
qt[0].cleanUp();
qt[0].createSources();
- QTRunner [] qtRunners = new QTestUtil.QTRunner [qfiles.length];
- Thread [] qtThread = new Thread [qfiles.length];
+ QTRunner[] qtRunners = new QTestUtil.QTRunner[qfiles.length];
+ Thread[] qtThread = new Thread[qfiles.length];
- for(int i=0; i<qfiles.length; i++) {
- qtRunners[i] = new QTestUtil.QTRunner (qt[i], qfiles[i].getName());
- qtThread[i] = new Thread (qtRunners[i]);
+ for (int i = 0; i < qfiles.length; i++) {
+ qtRunners[i] = new QTestUtil.QTRunner(qt[i], qfiles[i].getName());
+ qtThread[i] = new Thread(qtRunners[i]);
}
- for(int i=0; i<qfiles.length; i++) {
+ for (int i = 0; i < qfiles.length; i++) {
qtThread[i].start();
}
- for(int i=0; i<qfiles.length; i++) {
+ for (int i = 0; i < qfiles.length; i++) {
qtThread[i].join();
int ecode = qt[i].checkCliDriverResults(qfiles[i].getName());
if (ecode != 0) {
failed = true;
- System.err.println("Test " + qfiles[i].getName() + " results check failed with error code " + ecode);
+ System.err.println("Test " + qfiles[i].getName()
+ + " results check failed with error code " + ecode);
}
}
} else {
- for(int i=0; i<qfiles.length && !failed; i++) {
+ for (int i = 0; i < qfiles.length && !failed; i++) {
qt[i].cliInit(qfiles[i].getName());
qt[i].executeClient(qfiles[i].getName());
int ecode = qt[i].checkCliDriverResults(qfiles[i].getName());
if (ecode != 0) {
failed = true;
- System.err.println("Test " + qfiles[i].getName() + " results check failed with error code " + ecode);
+ System.err.println("Test " + qfiles[i].getName()
+ + " results check failed with error code " + ecode);
}
}
}
@@ -873,4 +899,3 @@
return (!failed);
}
}
-
Modified: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java (original)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java Thu Jan 21 10:37:58 2010
@@ -18,38 +18,37 @@
package org.apache.hadoop.hive.ql;
-import junit.framework.TestCase;
-import java.io.*;
-import java.util.*;
+import java.io.File;
-import junit.framework.Test;
import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-import org.apache.hadoop.hive.ql.QTestUtil;
/**
* Suite for testing running of queries in multi-threaded mode
*/
public class TestMTQueries extends TestCase {
- private String inpDir = System.getProperty("ql.test.query.clientpositive.dir");
- private String resDir = System.getProperty("ql.test.results.clientpositive.dir");
- private String logDir = System.getProperty("test.log.dir") + "/clientpositive";
-
- public void testMTQueries1() throws Exception {
- String[] testNames = new String [] {"join1.q", "join2.q", "groupby1.q", "groupby2.q", "join3.q", "input1.q", "input19.q"};
- String [] logDirs = new String [testNames.length];
- String [] resDirs = new String [testNames.length];
- File [] qfiles = new File [testNames.length];
- for(int i=0; i<resDirs.length; i++) {
+ private final String inpDir = System
+ .getProperty("ql.test.query.clientpositive.dir");
+ private final String resDir = System
+ .getProperty("ql.test.results.clientpositive.dir");
+ private final String logDir = System.getProperty("test.log.dir")
+ + "/clientpositive";
+
+ public void testMTQueries1() throws Exception {
+ String[] testNames = new String[] { "join1.q", "join2.q", "groupby1.q",
+ "groupby2.q", "join3.q", "input1.q", "input19.q" };
+ String[] logDirs = new String[testNames.length];
+ String[] resDirs = new String[testNames.length];
+ File[] qfiles = new File[testNames.length];
+ for (int i = 0; i < resDirs.length; i++) {
logDirs[i] = logDir;
resDirs[i] = resDir;
qfiles[i] = new File(inpDir, testNames[i]);
}
boolean success = QTestUtil.queryListRunner(qfiles, resDirs, logDirs, true);
- if(!success)
- fail ("One or more queries failed");
+ if (!success) {
+ fail("One or more queries failed");
+ }
}
}