You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2022/07/21 11:55:28 UTC

[hive] branch master updated: HIVE-26409: Assign NO_TXN operation type to a table in global locks for scheduled queries (Sourabh Badhya, reviewed by Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a0880b90ad1 HIVE-26409: Assign NO_TXN operation type to a table in global locks for scheduled queries (Sourabh Badhya, reviewed by Denys Kuzmenko)
a0880b90ad1 is described below

commit a0880b90ad1866c9ac3616177f0ac98b8eb0f08a
Author: Sourabh Badhya <42...@users.noreply.github.com>
AuthorDate: Thu Jul 21 17:25:18 2022 +0530

    HIVE-26409: Assign NO_TXN operation type to a table in global locks for scheduled queries (Sourabh Badhya, reviewed by Denys Kuzmenko)
    
    Closes #3454
---
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java       |  3 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java    | 85 ++++++++++++++++++++++
 .../hive/ql/schq/MockScheduledQueryService.java    | 69 ++++++++++++++++++
 .../hive/ql/schq/TestScheduledQueryService.java    | 46 ------------
 4 files changed, 156 insertions(+), 47 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 7c23e348c23..40ff171db22 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -450,9 +450,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
       }
       LockComponentBuilder compBuilder = new LockComponentBuilder();
       compBuilder.setExclusive();
-      compBuilder.setOperationType(DataOperationType.UPDATE);
+      compBuilder.setOperationType(DataOperationType.NO_TXN);
       compBuilder.setDbName(GLOBAL_LOCKS);
       compBuilder.setTableName(lockName);
+
       globalLocks.add(compBuilder.build());
       LOG.debug("Adding global lock: " + lockName);
     }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 692e76f5b66..71697d2e30c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -29,9 +29,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -63,6 +66,11 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 import org.apache.hadoop.hive.metastore.txn.AcidOpenTxnsCounterService;
+import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionContext;
+import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionService;
+import org.apache.hadoop.hive.ql.schq.MockScheduledQueryService;
+import org.apache.hadoop.hive.ql.schq.TestScheduledQueryService;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.txn.compactor.CompactorMR;
 import org.apache.hadoop.hive.ql.txn.compactor.Worker;
 import org.apache.hadoop.io.Writable;
@@ -70,11 +78,15 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.orc.OrcFile;
 import org.apache.orc.Reader;
 import org.apache.orc.TypeDescription;
+import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.mockito.ArgumentMatchers.any;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
@@ -3105,6 +3117,79 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
     }
   }
 
+  @Test
+  public void testNoTxnComponentsForScheduledQueries() throws Exception {
+    String tableName = "scheduledquerytable";
+    int[][] tableData = {{1, 2},{3, 4}};
+    runStatementOnDriver("create table " + tableName + " (a int, b int) stored as orc tblproperties ('transactional'='true')");
+
+    int noOfTimesScheduledQueryExecuted = 4;
+
+    // Logic for executing scheduled queries multiple times.
+    for (int index = 0;index < noOfTimesScheduledQueryExecuted;index++) {
+      ExecutorService executor =
+              Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true)
+                      .setNameFormat("Scheduled queries for transactional tables").build());
+
+      // Mock service which initialises the query for execution.
+      MockScheduledQueryService qService = new
+              MockScheduledQueryService("insert into " + tableName + " (a,b) " + makeValuesClause(tableData));
+      ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, hiveConf, qService);
+
+      // Start the scheduled query execution.
+      try (ScheduledQueryExecutionService sQ = ScheduledQueryExecutionService.startScheduledQueryExecutorService(ctx)) {
+        // Wait for the scheduled query to finish. Hopefully 30 seconds should be more than enough.
+        SessionState.getConsole().logInfo("Waiting for query execution to finish ...");
+        synchronized (qService.notifier) {
+          qService.notifier.wait(30000);
+        }
+        SessionState.getConsole().logInfo("Done waiting for query execution!");
+      }
+
+      assertThat(qService.lastProgressInfo.isSetExecutorQueryId(), is(true));
+      assertThat(qService.lastProgressInfo.getExecutorQueryId(),
+              Matchers.containsString(ctx.executorHostName + "/"));
+    }
+
+    // Check whether the table has delta files corresponding to the number of scheduled executions.
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] fileStatuses = fs.globStatus(new Path(getWarehouseDir() + "/" + tableName + "/*"));
+    Assert.assertEquals(fileStatuses.length, noOfTimesScheduledQueryExecuted);
+    for(FileStatus fileStatus : fileStatuses) {
+      Assert.assertTrue(fileStatus.getPath().getName().startsWith(AcidUtils.DELTA_PREFIX));
+    }
+
+    // Check whether the COMPLETED_TXN_COMPONENTS table has records with
+    // '__global_locks' database and associate writeId corresponding to the
+    // number of scheduled executions.
+    Assert.assertEquals(TestTxnDbUtil.countQueryAgent(hiveConf,
+            "select count(*) from completed_txn_components" +
+            " where ctc_database='__global_locks'"),
+            0);
+
+    // Compact the table which has inserts from the scheduled query.
+    runStatementOnDriver("alter table " + tableName + " compact 'major'");
+    runWorker(hiveConf);
+    runCleaner(hiveConf);
+
+    // Run AcidHouseKeeperService to cleanup the COMPLETED_TXN_COMPONENTS.
+    MetastoreTaskThread houseKeeper = new AcidHouseKeeperService();
+    houseKeeper.setConf(hiveConf);
+    houseKeeper.run();
+
+    // Check whether the table is compacted.
+    fileStatuses = fs.globStatus(new Path(getWarehouseDir() + "/" + tableName + "/*"));
+    Assert.assertEquals(fileStatuses.length, 1);
+    for(FileStatus fileStatus : fileStatuses) {
+      Assert.assertTrue(fileStatus.getPath().getName().startsWith(AcidUtils.BASE_PREFIX));
+    }
+
+    // Check whether the data in the table is correct.
+    int[][] actualData = {{1,2}, {1,2}, {1,2}, {1,2}, {3,4}, {3,4}, {3,4}, {3,4}};
+    List<String> resData = runStatementOnDriver("select a,b from " + tableName + " order by a");
+    Assert.assertEquals(resData, stringifyValues(actualData));
+  }
+
   /**
    * takes raw data and turns it into a string as if from Driver.getResults()
    * sorts rows in dictionary order
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/schq/MockScheduledQueryService.java b/ql/src/test/org/apache/hadoop/hive/ql/schq/MockScheduledQueryService.java
new file mode 100644
index 00000000000..3987ae3d724
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/schq/MockScheduledQueryService.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.schq;
+
+import org.apache.hadoop.hive.metastore.api.QueryState;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryKey;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryPollResponse;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryProgressInfo;
+import org.apache.hadoop.hive.ql.scheduled.IScheduledQueryMaintenanceService;
+
+public class MockScheduledQueryService implements IScheduledQueryMaintenanceService {
+    // Use notify/wait on this object to indicate when the scheduled query has finished executing.
+    public final Object notifier = new Object();
+
+    int id = 0;
+    private String stmt;
+    public ScheduledQueryProgressInfo lastProgressInfo;
+
+    public MockScheduledQueryService(String string) {
+        stmt = string;
+    }
+
+    @Override
+    public ScheduledQueryPollResponse scheduledQueryPoll() {
+        ScheduledQueryPollResponse r = new ScheduledQueryPollResponse();
+        r.setQuery(stmt);
+        r.setScheduleKey(new ScheduledQueryKey("sch1", getClusterNamespace()));
+        r.setUser("nobody");
+        if (id == 0) {
+            r.setExecutionId(id++);
+            return r;
+        } else {
+            return r;
+        }
+    }
+
+    @Override
+    public void scheduledQueryProgress(ScheduledQueryProgressInfo info) {
+        System.out.printf("%d, state: %s, error: %s", info.getScheduledExecutionId(), info.getState(),
+                info.getErrorMessage());
+        lastProgressInfo = info;
+        if (info.getState() == QueryState.FINISHED || info.getState() == QueryState.FAILED) {
+            // Query is done, notify any waiters
+            synchronized (notifier) {
+                notifier.notifyAll();
+            }
+        }
+    }
+
+    @Override
+    public String getClusterNamespace() {
+        return "default";
+    }
+}
\ No newline at end of file
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java b/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
index 03a0cac5acf..b47f629d49b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
@@ -98,52 +98,6 @@ public class TestScheduledQueryService {
     return res.size();
   }
 
-
-  public static class MockScheduledQueryService implements IScheduledQueryMaintenanceService {
-    // Use notify/wait on this object to indicate when the scheduled query has finished executing.
-    Object notifier = new Object();
-
-    int id = 0;
-    private String stmt;
-    ScheduledQueryProgressInfo lastProgressInfo;
-
-    public MockScheduledQueryService(String string) {
-      stmt = string;
-    }
-
-    @Override
-    public ScheduledQueryPollResponse scheduledQueryPoll() {
-      ScheduledQueryPollResponse r = new ScheduledQueryPollResponse();
-      r.setExecutionId(id++);
-      r.setQuery(stmt);
-      r.setScheduleKey(new ScheduledQueryKey("sch1", getClusterNamespace()));
-      r.setUser("nobody");
-      if (id == 1) {
-        return r;
-      } else {
-        return null;
-      }
-    }
-
-    @Override
-    public void scheduledQueryProgress(ScheduledQueryProgressInfo info) {
-      System.out.printf("%d, state: %s, error: %s", info.getScheduledExecutionId(), info.getState(),
-          info.getErrorMessage());
-      lastProgressInfo = info;
-      if (info.getState() == QueryState.FINISHED || info.getState() == QueryState.FAILED) {
-        // Query is done, notify any waiters
-        synchronized (notifier) {
-          notifier.notifyAll();
-        }
-      }
-    }
-
-    @Override
-    public String getClusterNamespace() {
-      return "default";
-    }
-  }
-
   @Test
   public void testScheduledQueryExecution() throws ParseException, Exception {
     IDriver driver = createDriver();