You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/25 18:27:45 UTC

[36/50] [abbrv] hive git commit: HIVE-20029 : add parallel insert, analyze, iow tests (Sergey Shelukhin)

HIVE-20029 : add parallel insert, analyze, iow tests (Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f2d5ac22
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f2d5ac22
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f2d5ac22

Branch: refs/heads/master
Commit: f2d5ac221da8dc81292831db06a92453a5c60ff1
Parents: bdd3cec
Author: sergey <se...@apache.org>
Authored: Fri Jul 20 15:54:01 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Fri Jul 20 15:54:01 2018 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/TestTxnCommands.java  | 189 +++++++++++++++++++
 1 file changed, 189 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f2d5ac22/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 8c33f6a..3d4cb83 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -27,6 +27,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.curator.shaded.com.google.common.collect.Lists;
@@ -41,6 +46,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
@@ -60,6 +66,7 @@ import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.thrift.TException;
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -77,6 +84,7 @@ import org.slf4j.LoggerFactory;
  * Mostly uses bucketed tables
  */
 public class TestTxnCommands extends TxnCommandsBaseForTests {
+
   static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class);
   private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
     File.separator + TestTxnCommands.class.getCanonicalName()
@@ -108,6 +116,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     Assert.assertEquals("1", rs.get(0));
     Assert.assertEquals("5", rs.get(1));
   }
+
   @Ignore("not needed but useful for testing")
   @Test
   public void testNonAcidInsert() throws Exception {
@@ -230,6 +239,186 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     msClient.close();
   }
 
+  private static final class QueryRunnable implements Runnable {
+    private final CountDownLatch cdlIn, cdlOut;
+    private final String query;
+    private final HiveConf hiveConf;
+
+    QueryRunnable(HiveConf hiveConf, String query, CountDownLatch cdlIn, CountDownLatch cdlOut) {
+      this.query = query;
+      this.cdlIn = cdlIn;
+      this.cdlOut = cdlOut;
+      this.hiveConf = new HiveConf(hiveConf);
+    }
+
+    @Override
+    public void run() {
+      SessionState ss = SessionState.start(hiveConf);
+      try {
+        ss.applyAuthorizationPolicy();
+      } catch (HiveException e) {
+        throw new RuntimeException(e);
+      }
+      QueryState qs = new QueryState.Builder().withHiveConf(hiveConf).nonIsolated().build();
+      Driver d = new Driver(qs, null);
+      try {
+        LOG.info("Ready to run the query: " + query);
+        syncThreadStart(cdlIn, cdlOut);
+        try {
+          CommandProcessorResponse cpr = d.run(query);
+          if(cpr.getResponseCode() != 0) {
+            throw new RuntimeException(query + " failed: " + cpr);
+          }
+          d.getResults(new ArrayList<String>());
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      } finally {
+        d.close();
+      }
+    }
+  }
+
+
+  private static void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+    cdlIn.countDown();
+    try {
+      cdlOut.await();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testParallelInsertStats() throws Exception {
+    final int TASK_COUNT = 4;
+    String tableName = "mm_table";
+    List<ColumnStatisticsObj> stats;
+    IMetaStoreClient msClient = prepareParallelTest(tableName, 0);
+
+    String[] queries = new String[TASK_COUNT];
+    for (int i = 0; i < queries.length; ++i) {
+      queries[i] = String.format("insert into %s (a) values (" + i + ")", tableName);
+    }
+
+    runParallelQueries(queries);
+
+    // Verify stats are either invalid, or valid and correct.
+    stats = getTxnTableStats(msClient, tableName);
+    boolean hasStats = 0 != stats.size();
+    if (hasStats) {
+      verifyLongStats(TASK_COUNT, 0, TASK_COUNT - 1, stats);
+    }
+
+    runStatementOnDriver(String.format("insert into %s (a) values (" + TASK_COUNT + ")", tableName));
+    if (!hasStats) {
+      // Stats should still be invalid if they were invalid.
+      stats = getTxnTableStats(msClient, tableName);
+      Assert.assertEquals(0, stats.size());
+    }
+
+    // Stats should be valid after analyze.
+    runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName));
+    verifyLongStats(TASK_COUNT + 1, 0, TASK_COUNT, getTxnTableStats(msClient, tableName));
+  }
+
+  private void verifyLongStats(int dvCount, int min, int max, List<ColumnStatisticsObj> stats) {
+    Assert.assertEquals(1, stats.size());
+    LongColumnStatsData data = stats.get(0).getStatsData().getLongStats();
+    Assert.assertEquals(min, data.getLowValue());
+    Assert.assertEquals(max, data.getHighValue());
+    Assert.assertEquals(dvCount, data.getNumDVs());
+  }
+
+  private void runParallelQueries(String[] queries)
+      throws InterruptedException, ExecutionException {
+    ExecutorService executor = Executors.newFixedThreadPool(queries.length);
+    final CountDownLatch cdlIn = new CountDownLatch(queries.length), cdlOut = new CountDownLatch(1);
+    Future<?>[] tasks = new Future[queries.length];
+    for (int i = 0; i < tasks.length; ++i) {
+      tasks[i] = executor.submit(new QueryRunnable(hiveConf, queries[i], cdlIn, cdlOut));
+    }
+    cdlIn.await(); // Wait for all threads to be ready.
+    cdlOut.countDown(); // Release them at the same time.
+    for (int i = 0; i < tasks.length; ++i) {
+      tasks[i].get();
+    }
+  }
+
+  private IMetaStoreClient prepareParallelTest(String tableName, int val)
+      throws Exception, MetaException, TException, NoSuchObjectException {
+    hiveConf.setBoolean("hive.stats.autogather", true);
+    hiveConf.setBoolean("hive.stats.column.autogather", true);
+    runStatementOnDriver("drop table if exists " + tableName);
+    runStatementOnDriver(String.format("create table %s (a int) stored as orc " +
+        "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
+        tableName));
+    runStatementOnDriver(String.format("insert into %s (a) values (" + val + ")", tableName));
+    runStatementOnDriver(String.format("insert into %s (a) values (" + val + ")", tableName));
+    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+    // Stats should be valid after serial inserts.
+    List<ColumnStatisticsObj> stats = getTxnTableStats(msClient, tableName);
+    Assert.assertEquals(1, stats.size());
+    return msClient;
+  }
+
+
+  @Test
+  public void testParallelInsertAnalyzeStats() throws Exception {
+    String tableName = "mm_table";
+    List<ColumnStatisticsObj> stats;
+    IMetaStoreClient msClient = prepareParallelTest(tableName, 0);
+
+    String[] queries = {
+        String.format("insert into %s (a) values (999)", tableName),
+        String.format("analyze table %s compute statistics for columns", tableName)
+    };
+    runParallelQueries(queries);
+
+    // Verify stats are either invalid, or valid and correct.
+    stats = getTxnTableStats(msClient, tableName);
+    boolean hasStats = 0 != stats.size();
+    if (hasStats) {
+      verifyLongStats(2, 0, 999, stats);
+    }
+
+    runStatementOnDriver(String.format("insert into %s (a) values (1000)", tableName));
+    if (!hasStats) {
+      // Stats should still be invalid if they were invalid.
+      stats = getTxnTableStats(msClient, tableName);
+      Assert.assertEquals(0, stats.size());
+    }
+
+    // Stats should be valid after analyze.
+    runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName));
+    verifyLongStats(3, 0, 1000, getTxnTableStats(msClient, tableName));
+  }
+
+  // TODO## this test is broken; would probably be fixed by HIVE-20046
+  @Test
+  public void testParallelTruncateAnalyzeStats() throws Exception {
+    String tableName = "mm_table";
+    List<ColumnStatisticsObj> stats;
+    IMetaStoreClient msClient = prepareParallelTest(tableName, 0);
+
+    String[] queries = {
+        String.format("truncate table %s", tableName),
+        String.format("analyze table %s compute statistics for columns", tableName)
+    };
+    runParallelQueries(queries);
+
+    // Verify stats are either invalid, or valid and correct.
+    stats = getTxnTableStats(msClient, tableName);
+    boolean hasStats = 0 != stats.size();
+    if (hasStats) {
+      verifyLongStats(0, 0, 0, stats);
+    }
+
+    // Stats should be valid after analyze.
+    runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName));
+    verifyLongStats(0, 0, 0, getTxnTableStats(msClient, tableName));
+  }
+
 
   @Test
   public void testTxnStatsOnOff() throws Exception {