You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2022/07/21 11:55:28 UTC
[hive] branch master updated: HIVE-26409: Assign NO_TXN operation type to a table in global locks for scheduled queries (Sourabh Badhya, reviewed by Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a0880b90ad1 HIVE-26409: Assign NO_TXN operation type to a table in global locks for scheduled queries (Sourabh Badhya, reviewed by Denys Kuzmenko)
a0880b90ad1 is described below
commit a0880b90ad1866c9ac3616177f0ac98b8eb0f08a
Author: Sourabh Badhya <42...@users.noreply.github.com>
AuthorDate: Thu Jul 21 17:25:18 2022 +0530
HIVE-26409: Assign NO_TXN operation type to a table in global locks for scheduled queries (Sourabh Badhya, reviewed by Denys Kuzmenko)
Closes #3454
---
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 3 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 85 ++++++++++++++++++++++
.../hive/ql/schq/MockScheduledQueryService.java | 69 ++++++++++++++++++
.../hive/ql/schq/TestScheduledQueryService.java | 46 ------------
4 files changed, 156 insertions(+), 47 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 7c23e348c23..40ff171db22 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -450,9 +450,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
}
LockComponentBuilder compBuilder = new LockComponentBuilder();
compBuilder.setExclusive();
- compBuilder.setOperationType(DataOperationType.UPDATE);
+ compBuilder.setOperationType(DataOperationType.NO_TXN);
compBuilder.setDbName(GLOBAL_LOCKS);
compBuilder.setTableName(lockName);
+
globalLocks.add(compBuilder.build());
LOG.debug("Adding global lock: " + lockName);
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 692e76f5b66..71697d2e30c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -29,9 +29,12 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -63,6 +66,11 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.metastore.txn.AcidOpenTxnsCounterService;
+import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionContext;
+import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionService;
+import org.apache.hadoop.hive.ql.schq.MockScheduledQueryService;
+import org.apache.hadoop.hive.ql.schq.TestScheduledQueryService;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorMR;
import org.apache.hadoop.hive.ql.txn.compactor.Worker;
import org.apache.hadoop.io.Writable;
@@ -70,11 +78,15 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
+import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
@@ -3105,6 +3117,79 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
}
}
+ @Test
+ public void testNoTxnComponentsForScheduledQueries() throws Exception {
+ String tableName = "scheduledquerytable";
+ int[][] tableData = {{1, 2},{3, 4}};
+ runStatementOnDriver("create table " + tableName + " (a int, b int) stored as orc tblproperties ('transactional'='true')");
+
+ int noOfTimesScheduledQueryExecuted = 4;
+
+ // Logic for executing scheduled queries multiple times.
+ for (int index = 0;index < noOfTimesScheduledQueryExecuted;index++) {
+ ExecutorService executor =
+ Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Scheduled queries for transactional tables").build());
+
+ // Mock service which initialises the query for execution.
+ MockScheduledQueryService qService = new
+ MockScheduledQueryService("insert into " + tableName + " (a,b) " + makeValuesClause(tableData));
+ ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, hiveConf, qService);
+
+ // Start the scheduled query execution.
+ try (ScheduledQueryExecutionService sQ = ScheduledQueryExecutionService.startScheduledQueryExecutorService(ctx)) {
+ // Wait for the scheduled query to finish. Hopefully 30 seconds should be more than enough.
+ SessionState.getConsole().logInfo("Waiting for query execution to finish ...");
+ synchronized (qService.notifier) {
+ qService.notifier.wait(30000);
+ }
+ SessionState.getConsole().logInfo("Done waiting for query execution!");
+ }
+
+ assertThat(qService.lastProgressInfo.isSetExecutorQueryId(), is(true));
+ assertThat(qService.lastProgressInfo.getExecutorQueryId(),
+ Matchers.containsString(ctx.executorHostName + "/"));
+ }
+
+ // Check whether the table has delta files corresponding to the number of scheduled executions.
+ FileSystem fs = FileSystem.get(hiveConf);
+ FileStatus[] fileStatuses = fs.globStatus(new Path(getWarehouseDir() + "/" + tableName + "/*"));
+ Assert.assertEquals(fileStatuses.length, noOfTimesScheduledQueryExecuted);
+ for(FileStatus fileStatus : fileStatuses) {
+ Assert.assertTrue(fileStatus.getPath().getName().startsWith(AcidUtils.DELTA_PREFIX));
+ }
+
+ // Check whether the COMPLETED_TXN_COMPONENTS table has records with
+ // '__global_locks' database and associate writeId corresponding to the
+ // number of scheduled executions.
+ Assert.assertEquals(TestTxnDbUtil.countQueryAgent(hiveConf,
+ "select count(*) from completed_txn_components" +
+ " where ctc_database='__global_locks'"),
+ 0);
+
+ // Compact the table which has inserts from the scheduled query.
+ runStatementOnDriver("alter table " + tableName + " compact 'major'");
+ runWorker(hiveConf);
+ runCleaner(hiveConf);
+
+ // Run AcidHouseKeeperService to cleanup the COMPLETED_TXN_COMPONENTS.
+ MetastoreTaskThread houseKeeper = new AcidHouseKeeperService();
+ houseKeeper.setConf(hiveConf);
+ houseKeeper.run();
+
+ // Check whether the table is compacted.
+ fileStatuses = fs.globStatus(new Path(getWarehouseDir() + "/" + tableName + "/*"));
+ Assert.assertEquals(fileStatuses.length, 1);
+ for(FileStatus fileStatus : fileStatuses) {
+ Assert.assertTrue(fileStatus.getPath().getName().startsWith(AcidUtils.BASE_PREFIX));
+ }
+
+ // Check whether the data in the table is correct.
+ int[][] actualData = {{1,2}, {1,2}, {1,2}, {1,2}, {3,4}, {3,4}, {3,4}, {3,4}};
+ List<String> resData = runStatementOnDriver("select a,b from " + tableName + " order by a");
+ Assert.assertEquals(resData, stringifyValues(actualData));
+ }
+
/**
* takes raw data and turns it into a string as if from Driver.getResults()
* sorts rows in dictionary order
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/schq/MockScheduledQueryService.java b/ql/src/test/org/apache/hadoop/hive/ql/schq/MockScheduledQueryService.java
new file mode 100644
index 00000000000..3987ae3d724
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/schq/MockScheduledQueryService.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.schq;
+
+import org.apache.hadoop.hive.metastore.api.QueryState;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryKey;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryPollResponse;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryProgressInfo;
+import org.apache.hadoop.hive.ql.scheduled.IScheduledQueryMaintenanceService;
+
+public class MockScheduledQueryService implements IScheduledQueryMaintenanceService {
+ // Use notify/wait on this object to indicate when the scheduled query has finished executing.
+ public final Object notifier = new Object();
+
+ int id = 0;
+ private String stmt;
+ public ScheduledQueryProgressInfo lastProgressInfo;
+
+ public MockScheduledQueryService(String string) {
+ stmt = string;
+ }
+
+ @Override
+ public ScheduledQueryPollResponse scheduledQueryPoll() {
+ ScheduledQueryPollResponse r = new ScheduledQueryPollResponse();
+ r.setQuery(stmt);
+ r.setScheduleKey(new ScheduledQueryKey("sch1", getClusterNamespace()));
+ r.setUser("nobody");
+ if (id == 0) {
+ r.setExecutionId(id++);
+ return r;
+ } else {
+ return r;
+ }
+ }
+
+ @Override
+ public void scheduledQueryProgress(ScheduledQueryProgressInfo info) {
+ System.out.printf("%d, state: %s, error: %s", info.getScheduledExecutionId(), info.getState(),
+ info.getErrorMessage());
+ lastProgressInfo = info;
+ if (info.getState() == QueryState.FINISHED || info.getState() == QueryState.FAILED) {
+ // Query is done, notify any waiters
+ synchronized (notifier) {
+ notifier.notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public String getClusterNamespace() {
+ return "default";
+ }
+}
\ No newline at end of file
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java b/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
index 03a0cac5acf..b47f629d49b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
@@ -98,52 +98,6 @@ public class TestScheduledQueryService {
return res.size();
}
-
- public static class MockScheduledQueryService implements IScheduledQueryMaintenanceService {
- // Use notify/wait on this object to indicate when the scheduled query has finished executing.
- Object notifier = new Object();
-
- int id = 0;
- private String stmt;
- ScheduledQueryProgressInfo lastProgressInfo;
-
- public MockScheduledQueryService(String string) {
- stmt = string;
- }
-
- @Override
- public ScheduledQueryPollResponse scheduledQueryPoll() {
- ScheduledQueryPollResponse r = new ScheduledQueryPollResponse();
- r.setExecutionId(id++);
- r.setQuery(stmt);
- r.setScheduleKey(new ScheduledQueryKey("sch1", getClusterNamespace()));
- r.setUser("nobody");
- if (id == 1) {
- return r;
- } else {
- return null;
- }
- }
-
- @Override
- public void scheduledQueryProgress(ScheduledQueryProgressInfo info) {
- System.out.printf("%d, state: %s, error: %s", info.getScheduledExecutionId(), info.getState(),
- info.getErrorMessage());
- lastProgressInfo = info;
- if (info.getState() == QueryState.FINISHED || info.getState() == QueryState.FAILED) {
- // Query is done, notify any waiters
- synchronized (notifier) {
- notifier.notifyAll();
- }
- }
- }
-
- @Override
- public String getClusterNamespace() {
- return "default";
- }
- }
-
@Test
public void testScheduledQueryExecution() throws ParseException, Exception {
IDriver driver = createDriver();