You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/25 18:27:45 UTC
[36/50] [abbrv] hive git commit: HIVE-20029 : add parallel insert,
analyze, iow tests (Sergey Shelukhin)
HIVE-20029 : add parallel insert, analyze, iow tests (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f2d5ac22
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f2d5ac22
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f2d5ac22
Branch: refs/heads/master
Commit: f2d5ac221da8dc81292831db06a92453a5c60ff1
Parents: bdd3cec
Author: sergey <se...@apache.org>
Authored: Fri Jul 20 15:54:01 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Fri Jul 20 15:54:01 2018 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/TestTxnCommands.java | 189 +++++++++++++++++++
1 file changed, 189 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f2d5ac22/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 8c33f6a..3d4cb83 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -27,6 +27,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.curator.shaded.com.google.common.collect.Lists;
@@ -41,6 +46,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
@@ -60,6 +66,7 @@ import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Ignore;
@@ -77,6 +84,7 @@ import org.slf4j.LoggerFactory;
* Mostly uses bucketed tables
*/
public class TestTxnCommands extends TxnCommandsBaseForTests {
+
static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class);
private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
File.separator + TestTxnCommands.class.getCanonicalName()
@@ -108,6 +116,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
Assert.assertEquals("1", rs.get(0));
Assert.assertEquals("5", rs.get(1));
}
+
@Ignore("not needed but useful for testing")
@Test
public void testNonAcidInsert() throws Exception {
@@ -230,6 +239,186 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
msClient.close();
}
+ private static final class QueryRunnable implements Runnable {
+ private final CountDownLatch cdlIn, cdlOut;
+ private final String query;
+ private final HiveConf hiveConf;
+
+ QueryRunnable(HiveConf hiveConf, String query, CountDownLatch cdlIn, CountDownLatch cdlOut) {
+ this.query = query;
+ this.cdlIn = cdlIn;
+ this.cdlOut = cdlOut;
+ this.hiveConf = new HiveConf(hiveConf);
+ }
+
+ @Override
+ public void run() {
+ SessionState ss = SessionState.start(hiveConf);
+ try {
+ ss.applyAuthorizationPolicy();
+ } catch (HiveException e) {
+ throw new RuntimeException(e);
+ }
+ QueryState qs = new QueryState.Builder().withHiveConf(hiveConf).nonIsolated().build();
+ Driver d = new Driver(qs, null);
+ try {
+ LOG.info("Ready to run the query: " + query);
+ syncThreadStart(cdlIn, cdlOut);
+ try {
+ CommandProcessorResponse cpr = d.run(query);
+ if(cpr.getResponseCode() != 0) {
+ throw new RuntimeException(query + " failed: " + cpr);
+ }
+ d.getResults(new ArrayList<String>());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } finally {
+ d.close();
+ }
+ }
+ }
+
+
+ private static void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+ cdlIn.countDown();
+ try {
+ cdlOut.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testParallelInsertStats() throws Exception {
+ final int TASK_COUNT = 4;
+ String tableName = "mm_table";
+ List<ColumnStatisticsObj> stats;
+ IMetaStoreClient msClient = prepareParallelTest(tableName, 0);
+
+ String[] queries = new String[TASK_COUNT];
+ for (int i = 0; i < queries.length; ++i) {
+ queries[i] = String.format("insert into %s (a) values (" + i + ")", tableName);
+ }
+
+ runParallelQueries(queries);
+
+ // Verify stats are either invalid, or valid and correct.
+ stats = getTxnTableStats(msClient, tableName);
+ boolean hasStats = 0 != stats.size();
+ if (hasStats) {
+ verifyLongStats(TASK_COUNT, 0, TASK_COUNT - 1, stats);
+ }
+
+ runStatementOnDriver(String.format("insert into %s (a) values (" + TASK_COUNT + ")", tableName));
+ if (!hasStats) {
+ // Stats should still be invalid if they were invalid.
+ stats = getTxnTableStats(msClient, tableName);
+ Assert.assertEquals(0, stats.size());
+ }
+
+ // Stats should be valid after analyze.
+ runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName));
+ verifyLongStats(TASK_COUNT + 1, 0, TASK_COUNT, getTxnTableStats(msClient, tableName));
+ }
+
+ private void verifyLongStats(int dvCount, int min, int max, List<ColumnStatisticsObj> stats) {
+ Assert.assertEquals(1, stats.size());
+ LongColumnStatsData data = stats.get(0).getStatsData().getLongStats();
+ Assert.assertEquals(min, data.getLowValue());
+ Assert.assertEquals(max, data.getHighValue());
+ Assert.assertEquals(dvCount, data.getNumDVs());
+ }
+
+ private void runParallelQueries(String[] queries)
+ throws InterruptedException, ExecutionException {
+ ExecutorService executor = Executors.newFixedThreadPool(queries.length);
+ final CountDownLatch cdlIn = new CountDownLatch(queries.length), cdlOut = new CountDownLatch(1);
+ Future<?>[] tasks = new Future[queries.length];
+ for (int i = 0; i < tasks.length; ++i) {
+ tasks[i] = executor.submit(new QueryRunnable(hiveConf, queries[i], cdlIn, cdlOut));
+ }
+ cdlIn.await(); // Wait for all threads to be ready.
+ cdlOut.countDown(); // Release them at the same time.
+ for (int i = 0; i < tasks.length; ++i) {
+ tasks[i].get();
+ }
+ }
+
+ private IMetaStoreClient prepareParallelTest(String tableName, int val)
+ throws Exception, MetaException, TException, NoSuchObjectException {
+ hiveConf.setBoolean("hive.stats.autogather", true);
+ hiveConf.setBoolean("hive.stats.column.autogather", true);
+ runStatementOnDriver("drop table if exists " + tableName);
+ runStatementOnDriver(String.format("create table %s (a int) stored as orc " +
+ "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
+ tableName));
+ runStatementOnDriver(String.format("insert into %s (a) values (" + val + ")", tableName));
+ runStatementOnDriver(String.format("insert into %s (a) values (" + val + ")", tableName));
+ IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+ // Stats should be valid after serial inserts.
+ List<ColumnStatisticsObj> stats = getTxnTableStats(msClient, tableName);
+ Assert.assertEquals(1, stats.size());
+ return msClient;
+ }
+
+
+ @Test
+ public void testParallelInsertAnalyzeStats() throws Exception {
+ String tableName = "mm_table";
+ List<ColumnStatisticsObj> stats;
+ IMetaStoreClient msClient = prepareParallelTest(tableName, 0);
+
+ String[] queries = {
+ String.format("insert into %s (a) values (999)", tableName),
+ String.format("analyze table %s compute statistics for columns", tableName)
+ };
+ runParallelQueries(queries);
+
+ // Verify stats are either invalid, or valid and correct.
+ stats = getTxnTableStats(msClient, tableName);
+ boolean hasStats = 0 != stats.size();
+ if (hasStats) {
+ verifyLongStats(2, 0, 999, stats);
+ }
+
+ runStatementOnDriver(String.format("insert into %s (a) values (1000)", tableName));
+ if (!hasStats) {
+ // Stats should still be invalid if they were invalid.
+ stats = getTxnTableStats(msClient, tableName);
+ Assert.assertEquals(0, stats.size());
+ }
+
+ // Stats should be valid after analyze.
+ runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName));
+ verifyLongStats(3, 0, 1000, getTxnTableStats(msClient, tableName));
+ }
+
+ // TODO## this test is broken; would probably be fixed by HIVE-20046
+ @Test
+ public void testParallelTruncateAnalyzeStats() throws Exception {
+ String tableName = "mm_table";
+ List<ColumnStatisticsObj> stats;
+ IMetaStoreClient msClient = prepareParallelTest(tableName, 0);
+
+ String[] queries = {
+ String.format("truncate table %s", tableName),
+ String.format("analyze table %s compute statistics for columns", tableName)
+ };
+ runParallelQueries(queries);
+
+ // Verify stats are either invalid, or valid and correct.
+ stats = getTxnTableStats(msClient, tableName);
+ boolean hasStats = 0 != stats.size();
+ if (hasStats) {
+ verifyLongStats(0, 0, 0, stats);
+ }
+
+ // Stats should be valid after analyze.
+ runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName));
+ verifyLongStats(0, 0, 0, getTxnTableStats(msClient, tableName));
+ }
+
@Test
public void testTxnStatsOnOff() throws Exception {