You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2014/09/05 20:12:52 UTC

svn commit: r1622751 - in /hive/trunk: hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/ itests/hive-unit/ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/ itest...

Author: gates
Date: Fri Sep  5 18:12:51 2014
New Revision: 1622751

URL: http://svn.apache.org/r1622751
Log:
HIVE-7811 Compactions need to update table/partition stats (Eugene Koifman via Alan Gates)

Added:
    hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/
    hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/
    hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
Modified:
    hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
    hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
    hive/trunk/itests/hive-unit/pom.xml
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java

Modified: hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java (original)
+++ hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java Fri Sep  5 18:12:51 2014
@@ -343,7 +343,10 @@ public class HiveEndPoint {
       if (ep.partitionVals.isEmpty()) {
         return;
       }
-      SessionState state = SessionState.start(new CliSessionState(conf));
+      SessionState localSession = null;
+      if(SessionState.get() == null) {
+        localSession = SessionState.start(new CliSessionState(conf));
+      }
       Driver driver = new Driver(conf);
 
       try {
@@ -372,7 +375,9 @@ public class HiveEndPoint {
       } finally {
         driver.close();
         try {
-          state.close();
+          if(localSession != null) {
+            localSession.close();
+          }
         } catch (IOException e) {
           LOG.warn("Error closing SessionState used to run Hive DDL.");
         }
@@ -563,11 +568,14 @@ public class HiveEndPoint {
 
     /**
      * Get Id of currently open transaction
-     * @return
+     * @return -1 if there is no open TX
      */
     @Override
     public Long getCurrentTxnId() {
-      return txnIds.get(currentTxnIndex);
+      if(currentTxnIndex >= 0) {
+        return txnIds.get(currentTxnIndex);
+      }
+      return -1L;
     }
 
     /**

Modified: hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java (original)
+++ hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java Fri Sep  5 18:12:51 2014
@@ -54,6 +54,8 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -67,6 +69,7 @@ import java.util.Map;
 
 
 public class TestStreaming {
+  private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class);
 
   public static class RawFileSystem extends RawLocalFileSystem {
     private static final URI NAME;
@@ -636,18 +639,25 @@ public class TestStreaming {
     connection.close();
   }
 
-  class WriterThd extends Thread {
+  private static class WriterThd extends Thread {
 
-    private StreamingConnection conn;
-    private HiveEndPoint ep;
-    private DelimitedInputWriter writer;
-    private String data;
+    private final StreamingConnection conn;
+    private final DelimitedInputWriter writer;
+    private final String data;
+    private Throwable error;
 
     WriterThd(HiveEndPoint ep, String data) throws Exception {
-      this.ep = ep;
+      super("Writer_" + data);
       writer = new DelimitedInputWriter(fieldNames, ",", ep);
       conn = ep.newConnection(false);
       this.data = data;
+      setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+        @Override
+        public void uncaughtException(Thread thread, Throwable throwable) {
+          error = throwable;
+          LOG.error("Thread " + thread.getName() + " died: " + throwable.getMessage(), throwable);
+        }
+      });
     }
 
     @Override
@@ -668,14 +678,14 @@ public class TestStreaming {
           try {
             txnBatch.close();
           } catch (Exception e) {
+            LOG.error("txnBatch.close() failed: " + e.getMessage(), e);
             conn.close();
-            throw new RuntimeException(e);
           }
         }
         try {
           conn.close();
         } catch (Exception e) {
-          throw new RuntimeException(e);
+          LOG.error("conn.close() failed: " + e.getMessage(), e);
         }
 
       }
@@ -685,18 +695,23 @@ public class TestStreaming {
   @Test
   public void testConcurrentTransactionBatchCommits() throws Exception {
     final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals);
-    WriterThd t1 = new WriterThd(ep, "1,Matrix");
-    WriterThd t2 = new WriterThd(ep, "2,Gandhi");
-    WriterThd t3 = new WriterThd(ep, "3,Silence");
-
-    t1.start();
-    t2.start();
-    t3.start();
-
-    t1.join();
-    t2.join();
-    t3.join();
-
+    List<WriterThd> writers = new ArrayList<WriterThd>(3);
+    writers.add(new WriterThd(ep, "1,Matrix"));
+    writers.add(new WriterThd(ep, "2,Gandhi"));
+    writers.add(new WriterThd(ep, "3,Silence"));
+
+    for(WriterThd w : writers) {
+      w.start();
+    }
+    for(WriterThd w : writers) {
+      w.join();
+    }
+    for(WriterThd w : writers) {
+      if(w.error != null) {
+        Assert.assertFalse("Writer thread" + w.getName() + " died: " + w.error.getMessage() +
+          " See log file for stack trace", true);
+      }
+    }
   }
 
   // delete db and all tables in it

Modified: hive/trunk/itests/hive-unit/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/pom.xml?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/pom.xml (original)
+++ hive/trunk/itests/hive-unit/pom.xml Fri Sep  5 18:12:51 2014
@@ -53,6 +53,16 @@
       <artifactId>hive-exec</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-streaming</artifactId>
+      <version>${project.version}</version>
+    </dependency>
 
     <!-- dependencies are always listed in sorted order by groupId, artifectId -->
     <!-- test intra-project -->

Added: hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java?rev=1622751&view=auto
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java (added)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java Fri Sep  5 18:12:51 2014
@@ -0,0 +1,310 @@
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreThread;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.StreamingConnection;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class TestCompactor {
+  private static final Logger LOG = LoggerFactory.getLogger(TestCompactor.class);
+  private static final String TEST_DATA_DIR = HCatUtil.makePathASafeFileName(System.getProperty("java.io.tmpdir") +
+    File.separator + TestCompactor.class.getCanonicalName() + "-" + System.currentTimeMillis());
+  private static final String BASIC_FILE_NAME = TEST_DATA_DIR + "/basic.input.data";
+  private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+
+  @Rule
+  public TemporaryFolder stagingFolder = new TemporaryFolder();
+  private HiveConf conf;
+  IMetaStoreClient msClient;
+  private Driver driver;
+
+  @Before
+  public void setup() throws Exception {
+
+    File f = new File(TEST_WAREHOUSE_DIR);
+    if (f.exists()) {
+      FileUtil.fullyDelete(f);
+    }
+    if(!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
+      throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
+    }
+
+    HiveConf hiveConf = new HiveConf(this.getClass());
+    hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
+    hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
+    hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR);
+    hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
+    //"org.apache.hadoop.hive.ql.io.HiveInputFormat"
+
+    TxnDbUtil.setConfValues(hiveConf);
+    TxnDbUtil.cleanDb();
+    TxnDbUtil.prepDb();
+
+    conf = hiveConf;
+    msClient = new HiveMetaStoreClient(conf);
+    driver = new Driver(hiveConf);
+    SessionState.start(new CliSessionState(hiveConf));
+
+
+    int LOOP_SIZE = 3;
+    String[] input = new String[LOOP_SIZE * LOOP_SIZE];
+    int k = 0;
+    for (int i = 1; i <= LOOP_SIZE; i++) {
+      String si = i + "";
+      for (int j = 1; j <= LOOP_SIZE; j++) {
+        String sj = "S" + j + "S";
+        input[k] = si + "\t" + sj;
+        k++;
+      }
+    }
+    createTestDataFile(BASIC_FILE_NAME, input);
+  }
+  @After
+  public void tearDown() {
+    conf = null;
+    if(msClient != null) {
+      msClient.close();
+    }
+    if(driver != null) {
+      driver.close();
+    }
+  }
+  
+  /**
+   * After each major compaction, stats need to be updated on each column of the
+   * table/partition which previously had stats.
+   * 1. create a bucketed ORC backed table (Orc is currently required by ACID)
+   * 2. populate 2 partitions with data
+   * 3. compute stats
+   * 4. insert some data into the table using StreamingAPI
+   * 5. Trigger major compaction (which should update stats)
+   * 6. check that stats have been updated
+   * @throws Exception
+   * todo: 
+   * 2. add non-partitioned test
+   * 4. add a test with sorted table?
+   */
+  @Test
+  public void testStatsAfterCompactionPartTbl() throws Exception {
+    //as of (8/27/2014) Hive 0.14, ACID/Orc requires HiveInputFormat
+    String tblName = "compaction_test";
+    String tblNameStg = tblName + "_stg";
+    List<String> colNames = Arrays.asList("a", "b");
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("drop table if exists " + tblNameStg, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+      " PARTITIONED BY(bkt INT)" +
+      " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires table to be bucketed
+      " STORED AS ORC", driver);
+    executeStatementOnDriver("CREATE EXTERNAL TABLE " + tblNameStg + "(a INT, b STRING)" +
+      " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'" +
+      " STORED AS TEXTFILE" +
+      " LOCATION '" + stagingFolder.newFolder() + "'", driver);
+
+    executeStatementOnDriver("load data local inpath '" + BASIC_FILE_NAME +
+      "' overwrite into table " + tblNameStg, driver);
+    execSelectAndDumpData("select * from " + tblNameStg, driver, "Dumping data for " +
+      tblNameStg + " after load:");
+    executeStatementOnDriver("FROM " + tblNameStg +
+      " INSERT OVERWRITE TABLE " + tblName + " PARTITION(bkt=0) " +
+      "SELECT a, b where a < 2", driver);
+    executeStatementOnDriver("FROM " + tblNameStg +
+      " INSERT OVERWRITE TABLE " + tblName + " PARTITION(bkt=1) " +
+      "SELECT a, b where a >= 2", driver);
+    execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " +
+      tblName + " after load:");
+
+    CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+    CompactionInfo ci = new CompactionInfo("default", tblName, "bkt=0", CompactionType.MAJOR);
+    LOG.debug("List of stats columns before analyze Part1: " + txnHandler.findColumnsWithStats(ci));
+    Worker.StatsUpdater su = Worker.StatsUpdater.init(ci, colNames, conf,
+      System.getProperty("user.name"));
+    su.gatherStats();//compute stats before compaction
+    LOG.debug("List of stats columns after analyze Part1: " + txnHandler.findColumnsWithStats(ci));
+
+    CompactionInfo ciPart2 = new CompactionInfo("default", tblName, "bkt=1", CompactionType.MAJOR);
+    LOG.debug("List of stats columns before analyze Part2: " + txnHandler.findColumnsWithStats(ci));
+    su = Worker.StatsUpdater.init(ciPart2, colNames, conf, System.getProperty("user.name"));
+    su.gatherStats();//compute stats before compaction
+    LOG.debug("List of stats columns after analyze Part2: " + txnHandler.findColumnsWithStats(ci));
+    
+    //now make sure we get the stats we expect for partition we are going to add data to later
+    Map<String, List<ColumnStatisticsObj>> stats = msClient.getPartitionColumnStatistics(ci.dbname,
+      ci.tableName, Arrays.asList(ci.partName), colNames);
+    List<ColumnStatisticsObj> colStats = stats.get(ci.partName);
+    Assert.assertNotNull("No stats found for partition " + ci.partName, colStats);
+    Assert.assertEquals("Expected column 'a' at index 0", "a", colStats.get(0).getColName());
+    Assert.assertEquals("Expected column 'b' at index 1", "b", colStats.get(1).getColName());
+    LongColumnStatsData colAStats = colStats.get(0).getStatsData().getLongStats();
+    Assert.assertEquals("lowValue a", 1, colAStats.getLowValue());
+    Assert.assertEquals("highValue a", 1, colAStats.getHighValue());
+    Assert.assertEquals("numNulls a", 0, colAStats.getNumNulls());
+    Assert.assertEquals("numNdv a", 1, colAStats.getNumDVs());
+    StringColumnStatsData colBStats = colStats.get(1).getStatsData().getStringStats();
+    Assert.assertEquals("maxColLen b", 3, colBStats.getMaxColLen());
+    Assert.assertEquals("avgColLen b", 3.0, colBStats.getAvgColLen());
+    Assert.assertEquals("numNulls b", 0, colBStats.getNumNulls());
+    Assert.assertEquals("nunDVs", 2, colBStats.getNumDVs());
+
+    //now save stats for partition we won't modify
+    stats = msClient.getPartitionColumnStatistics(ciPart2.dbname,
+      ciPart2.tableName, Arrays.asList(ciPart2.partName), colNames);
+    colStats = stats.get(ciPart2.partName);
+    LongColumnStatsData colAStatsPart2 = colStats.get(0).getStatsData().getLongStats();
+    StringColumnStatsData colBStatsPart2 = colStats.get(1).getStatsData().getStringStats();
+
+
+    HiveEndPoint endPt = new HiveEndPoint(null, ci.dbname, ci.tableName, Arrays.asList("0"));
+    DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
+    /*next call will eventually end up in HiveEndPoint.createPartitionIfNotExists() which
+    makes an operation on Driver
+    * and starts it's own CliSessionState and then closes it, which removes it from ThreadLoacal;
+    * thus the session
+    * created in this class is gone after this; I fixed it in HiveEndPoint*/
+    StreamingConnection connection = endPt.newConnection(true);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN, txnBatch.getCurrentTransactionState());
+    txnBatch.write("50,Kiev".getBytes());
+    txnBatch.write("51,St. Petersburg".getBytes());
+    txnBatch.write("44,Boston".getBytes());
+    txnBatch.commit();
+
+    txnBatch.beginNextTransaction();
+    txnBatch.write("52,Tel Aviv".getBytes());
+    txnBatch.write("53,Atlantis".getBytes());
+    txnBatch.write("53,Boston".getBytes());
+    txnBatch.commit();
+
+    txnBatch.close();
+    connection.close();
+    execSelectAndDumpData("select * from " + ci.getFullTableName(), driver, ci.getFullTableName());
+
+    //so now we have written some new data to bkt=0 and it shows up
+    CompactionRequest rqst = new CompactionRequest(ci.dbname, ci.tableName, CompactionType.MAJOR);
+    rqst.setPartitionname(ci.partName);
+    txnHandler.compact(rqst);
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setHiveConf(conf);
+    MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer();
+    stop.boolVal = true;
+    t.init(stop);
+    t.run();
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+    stats = msClient.getPartitionColumnStatistics(ci.dbname, ci.tableName,
+      Arrays.asList(ci.partName), colNames);
+    colStats = stats.get(ci.partName);
+    Assert.assertNotNull("No stats found for partition " + ci.partName, colStats);
+    Assert.assertEquals("Expected column 'a' at index 0", "a", colStats.get(0).getColName());
+    Assert.assertEquals("Expected column 'b' at index 1", "b", colStats.get(1).getColName());
+    colAStats = colStats.get(0).getStatsData().getLongStats();
+    Assert.assertEquals("lowValue a", 1, colAStats.getLowValue());
+    Assert.assertEquals("highValue a", 53, colAStats.getHighValue());
+    Assert.assertEquals("numNulls a", 0, colAStats.getNumNulls());
+    Assert.assertEquals("numNdv a", 6, colAStats.getNumDVs());
+    colBStats = colStats.get(1).getStatsData().getStringStats();
+    Assert.assertEquals("maxColLen b", 14, colBStats.getMaxColLen());
+    //cast it to long to get rid of periodic decimal
+    Assert.assertEquals("avgColLen b", (long)6.1111111111, (long)colBStats.getAvgColLen());
+    Assert.assertEquals("numNulls b", 0, colBStats.getNumNulls());
+    Assert.assertEquals("nunDVs", 10, colBStats.getNumDVs());
+
+    //now check that stats for partition we didn't modify did not change
+    stats = msClient.getPartitionColumnStatistics(ciPart2.dbname, ciPart2.tableName,
+      Arrays.asList(ciPart2.partName), colNames);
+    colStats = stats.get(ciPart2.partName);
+    Assert.assertEquals("Expected stats for " + ciPart2.partName + " to stay the same",
+      colAStatsPart2, colStats.get(0).getStatsData().getLongStats());
+    Assert.assertEquals("Expected stats for " + ciPart2.partName + " to stay the same",
+      colBStatsPart2, colStats.get(1).getStatsData().getStringStats());
+  }
+
+  /**
+   * convenience method to execute a select stmt and dump results to log file
+   */
+  private static void execSelectAndDumpData(String selectStmt, Driver driver, String msg)
+    throws  Exception {
+    executeStatementOnDriver(selectStmt, driver);
+    ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    int rowIdx = 0;
+    LOG.debug(msg);
+    for(String row : valuesReadFromHiveDriver) {
+      LOG.debug(" rowIdx=" + rowIdx++ + ":" + row);
+    }
+  }
+  /**
+   * Execute Hive CLI statement
+   * @param cmd arbitrary statement to execute
+   */
+  static void executeStatementOnDriver(String cmd, Driver driver) throws IOException, CommandNeedRetryException {
+    LOG.debug("Executing: " + cmd);
+    CommandProcessorResponse cpr = driver.run(cmd);
+    if(cpr.getResponseCode() != 0) {
+      throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + cpr);
+    }
+  }
+  static void createTestDataFile(String filename, String[] lines) throws IOException {
+    FileWriter writer = null;
+    try {
+      File file = new File(filename);
+      file.deleteOnExit();
+      writer = new FileWriter(file);
+      for (String line : lines) {
+        writer.write(line + "\n");
+      }
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+    }
+
+  }
+}

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java Fri Sep  5 18:12:51 2014
@@ -34,9 +34,17 @@ public class CompactionInfo {
   private String fullPartitionName = null;
   private String fullTableName = null;
 
+  public CompactionInfo(String dbname, String tableName, String partName, CompactionType type) {
+    this.dbname = dbname;
+    this.tableName = tableName;
+    this.partName = partName;
+    this.type = type;
+  }
+  CompactionInfo() {}
+  
   public String getFullPartitionName() {
     if (fullPartitionName == null) {
-      StringBuffer buf = new StringBuffer(dbname);
+      StringBuilder buf = new StringBuilder(dbname);
       buf.append('.');
       buf.append(tableName);
       if (partName != null) {
@@ -50,11 +58,14 @@ public class CompactionInfo {
 
   public String getFullTableName() {
     if (fullTableName == null) {
-      StringBuffer buf = new StringBuffer(dbname);
+      StringBuilder buf = new StringBuilder(dbname);
       buf.append('.');
       buf.append(tableName);
       fullTableName = buf.toString();
     }
     return fullTableName;
   }
+  public boolean isMajorCompaction() {
+    return CompactionType.MAJOR == type;
+  }
 }

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java Fri Sep  5 18:12:51 2014
@@ -535,6 +535,46 @@ public class CompactionTxnHandler extend
       deadlockCnt = 0;
     }
   }
+
+  /**
+   * Queries metastore DB directly to find columns in the table which have statistics information.
+   * If {@code ci} includes partition info then per partition stats info is examined, otherwise
+   * table level stats are examined.
+   * @throws MetaException
+   */
+  public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException {
+    Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      stmt = dbConn.createStatement();
+      String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS")
+         + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'"
+        + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");
+      LOG.debug("Going to execute <" + s + ">");
+      rs = stmt.executeQuery(s);
+      List<String> columns = new ArrayList<String>();
+      while(rs.next()) {
+        columns.add(rs.getString(1));
+      }
+      LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName +
+        (ci.partName == null ? "" : "/" + ci.partName));
+      dbConn.commit();
+      return columns;
+    } catch (SQLException e) {
+      try {
+        LOG.error("Failed to find columns to analyze stats on for " + ci.tableName +
+            (ci.partName == null ? "" : "/" + ci.partName), e);
+        dbConn.rollback();
+      } catch (SQLException e1) {
+        //nothing we can do here
+      }
+      throw new MetaException("Unable to connect to transaction database " +
+        StringUtils.stringifyException(e));
+    } finally {
+      close(rs, stmt, dbConn);
+    }
+  }
 }
 
 

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Fri Sep  5 18:12:51 2014
@@ -860,6 +860,29 @@ public class TxnHandler {
   }
 
   /**
+   * Close the ResultSet.
+   * @param rs may be {@code null}
+   */
+  void close(ResultSet rs) {
+    try {
+      if (rs != null && !rs.isClosed()) {
+        rs.close();
+      }
+    }
+    catch(SQLException ex) {
+      LOG.warn("Failed to close statement " + ex.getMessage());
+    }
+  }
+
+  /**
+   * Close all 3 JDBC artifacts in order: {@code rs stmt dbConn}
+   */
+  void close(ResultSet rs, Statement stmt, Connection dbConn) {
+    close(rs);
+    closeStmt(stmt);
+    closeDbConn(dbConn);
+  }
+  /**
    * Determine if an exception was a deadlock.  Unfortunately there is no standard way to do
    * this, so we have to inspect the error messages and catch the telltale signs for each
    * different database.

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java Fri Sep  5 18:12:51 2014
@@ -76,4 +76,9 @@ public class CommandProcessorResponse {
   public String getSQLState() { return SQLState; }
   public Schema getSchema() { return resSchema; }
   public Throwable getException() { return exception; }
+  public String toString() {
+    return "(" + responseCode + "," + errorMessage + "," + SQLState + 
+      (resSchema == null ? "" : ",") +
+      (exception == null ? "" : exception.getMessage()) + ")";
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java Fri Sep  5 18:12:51 2014
@@ -100,7 +100,7 @@ public class CompactorMR {
    * @throws java.io.IOException if the job fails
    */
   void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
-           ValidTxnList txns, boolean isMajor) throws IOException {
+           ValidTxnList txns, boolean isMajor, Worker.StatsUpdater su) throws IOException {
     JobConf job = new JobConf(conf);
     job.setJobName(jobName);
     job.setOutputKeyClass(NullWritable.class);
@@ -182,6 +182,7 @@ public class CompactorMR {
     LOG.debug("Setting maximume transaction to " + maxTxn);
 
     JobClient.runJob(job).waitForCompletion();
+    su.gatherStats();
   }
 
   /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java Fri Sep  5 18:12:51 2014
@@ -20,20 +20,28 @@ package org.apache.hadoop.hive.ql.txn.co
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnHandler;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 /**
  * A class to do compactions.  This will run in a separate thread.  It will spin on the
@@ -110,7 +118,7 @@ public class Worker extends CompactorThr
           continue;
         }
 
-        final boolean isMajor = (ci.type == CompactionType.MAJOR);
+        final boolean isMajor = ci.isMajorCompaction();
         final ValidTxnList txns =
             TxnHandler.createValidTxnList(txnHandler.getOpenTxns());
         final StringBuffer jobName = new StringBuffer(name);
@@ -129,17 +137,19 @@ public class Worker extends CompactorThr
         LOG.info("Starting " + ci.type.toString() + " compaction for " +
             ci.getFullPartitionName());
 
+        final StatsUpdater su = StatsUpdater.init(ci, txnHandler.findColumnsWithStats(ci), conf,
+          runJobAsSelf(runAs) ? runAs : t.getOwner());
         final CompactorMR mr = new CompactorMR();
         try {
           if (runJobAsSelf(runAs)) {
-            mr.run(conf, jobName.toString(), t, sd, txns, isMajor);
+            mr.run(conf, jobName.toString(), t, sd, txns, isMajor, su);
           } else {
             UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
               UserGroupInformation.getLoginUser());
             ugi.doAs(new PrivilegedExceptionAction<Object>() {
               @Override
               public Object run() throws Exception {
-                mr.run(conf, jobName.toString(), t, sd, txns, isMajor);
+                mr.run(conf, jobName.toString(), t, sd, txns, isMajor, su);
                 return null;
               }
             });
@@ -161,11 +171,95 @@ public class Worker extends CompactorThr
   public void init(BooleanPointer stop) throws MetaException {
     super.init(stop);
 
-    StringBuffer name = new StringBuffer(hostname());
+    StringBuilder name = new StringBuilder(hostname());
     name.append("-");
     name.append(getId());
     this.name = name.toString();
     setName(name.toString());
   }
 
+  static final class StatsUpdater {
+    static final private Log LOG = LogFactory.getLog(StatsUpdater.class);
+
+    public static StatsUpdater init(CompactionInfo ci, List<String> columnListForStats,
+                                     HiveConf conf, String userName) {
+      return new StatsUpdater(ci, columnListForStats, conf, userName);
+    }
+    /**
+     * list columns for which to compute stats.  This maybe empty which means no stats gathering
+     * is needed.
+     */
+    private final List<String> columnList;
+    private final HiveConf conf;
+    private final String userName;
+    private final CompactionInfo ci;
+      
+    private StatsUpdater(CompactionInfo ci, List<String> columnListForStats,
+                         HiveConf conf, String userName) {
+      this.conf = conf;
+      this.userName = userName;
+      this.ci = ci;
+      if(!ci.isMajorCompaction() || columnListForStats == null || columnListForStats.isEmpty()) {
+        columnList = Collections.emptyList();
+        return;
+      }
+      columnList = columnListForStats;
+    }
+
+    /**
+     * todo: what should this do on failure?  Should it rethrow? Invalidate stats?
+     */
+    void gatherStats() throws IOException {
+      if(!ci.isMajorCompaction()) {
+        return;
+      }
+      if(columnList.isEmpty()) {
+        LOG.debug("No existing stats for " + ci.dbname + "." + ci.tableName + " found.  Will not run analyze.");
+        return;//nothing to do
+      }
+      //e.g. analyze table page_view partition(dt='10/15/2014',country=’US’)
+      // compute statistics for columns viewtime
+      StringBuilder sb = new StringBuilder("analyze table ").append(ci.dbname).append(".").append(ci.tableName);
+      if(ci.partName != null) {
+        try {
+          sb.append(" partition(");
+          Map<String, String> partitionColumnValues = Warehouse.makeEscSpecFromName(ci.partName);
+          for(Map.Entry<String, String> ent : partitionColumnValues.entrySet()) {
+            sb.append(ent.getKey()).append("='").append(ent.getValue()).append("'");
+          }
+          sb.append(")");
+        }
+        catch(MetaException ex) {
+          throw new IOException(ex);
+        }
+      }
+      sb.append(" compute statistics for columns ");
+      for(String colName : columnList) {
+        sb.append(colName).append(",");
+      }
+      sb.setLength(sb.length() - 1);//remove trailing ,
+      LOG.debug("running '" + sb.toString() + "'");
+      Driver d = new Driver(conf, userName);
+      SessionState localSession = null;
+      if(SessionState.get() == null) {
+         localSession = SessionState.start(new SessionState(conf));
+      }
+      try {
+        CommandProcessorResponse cpr = d.run(sb.toString());
+        if (cpr.getResponseCode() != 0) {
+          throw new IOException("Could not update stats for table " + ci.getFullTableName() +
+            (ci.partName == null ? "" : "/" + ci.partName) + " due to: " + cpr);
+        }
+      }
+      catch(CommandNeedRetryException cnre) {
+        throw new IOException("Could not update stats for table " + ci.getFullTableName() +
+          (ci.partName == null ? "" : "/" + ci.partName) + " due to: " + cnre.getMessage());
+      }
+      finally {
+        if(localSession != null) {
+          localSession.close();
+        }
+      }
+    }
+  }
 }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java Fri Sep  5 18:12:51 2014
@@ -24,7 +24,6 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -280,7 +279,7 @@ public class TestWorker extends Compacto
     // There should still now be 5 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-for (int i = 0; i < stat.length; i++) System.out.println("HERE: " + stat[i].getPath().toString());
+    for (int i = 0; i < stat.length; i++) System.out.println("HERE: " + stat[i].getPath().toString());
     Assert.assertEquals(4, stat.length);
 
     // Find the new delta file and make sure it has the right contents
@@ -507,7 +506,7 @@ for (int i = 0; i < stat.length; i++) Sy
     Assert.assertEquals(1, compacts.size());
     Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
 
-    // There should still now be 5 directories in the location
+    // There should now be 3 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
     Assert.assertEquals(3, stat.length);