You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ve...@apache.org on 2023/03/08 08:30:58 UTC
[hive] branch master updated: HIVE-27117: Fix compaction related flaky tests (Laszlo Vegh, reviewed by Laszlo Bodor)
This is an automated email from the ASF dual-hosted git repository.
veghlaci05 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new e1bf208b309 HIVE-27117: Fix compaction related flaky tests (Laszlo Vegh, reviewed by Laszlo Bodor)
e1bf208b309 is described below
commit e1bf208b309e8057bea93eaec53add2c7e3ca906
Author: veghlaci05 <ve...@gmail.com>
AuthorDate: Wed Mar 8 09:30:50 2023 +0100
HIVE-27117: Fix compaction related flaky tests (Laszlo Vegh, reviewed by Laszlo Bodor)
---
.../hive/ql/txn/compactor/CompactorOnTezTest.java | 43 ++++++++++++++--------
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 17 +++++++--
.../apache/hadoop/hive/metastore/HMSHandler.java | 2 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 8 +++-
4 files changed, 48 insertions(+), 22 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
index 37f06371ba4..4c6209d55c7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
@@ -44,7 +44,9 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
+import java.io.EOFException;
import java.io.File;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -185,23 +187,32 @@ public abstract class CompactorOnTezTest {
}
protected HiveHookEvents.HiveHookEventProto getRelatedTezEvent(String dbTableName) throws Exception {
- List<ProtoMessageReader<HiveHookEvents.HiveHookEventProto>> readers = TestHiveProtoLoggingHook.getTestReader(conf, tmpFolder);
- for (ProtoMessageReader<HiveHookEvents.HiveHookEventProto> reader : readers) {
- HiveHookEvents.HiveHookEventProto event = reader.readEvent();
- boolean getRelatedEvent = false;
- while (!getRelatedEvent) {
- while (event != null && ExecutionMode.TEZ != ExecutionMode.valueOf(event.getExecutionMode())) {
- event = reader.readEvent();
+ int retryCount = 3;
+ while (retryCount-- > 0) {
+ try {
+ List<ProtoMessageReader<HiveHookEvents.HiveHookEventProto>> readers = TestHiveProtoLoggingHook.getTestReader(conf, tmpFolder);
+ for (ProtoMessageReader<HiveHookEvents.HiveHookEventProto> reader : readers) {
+ HiveHookEvents.HiveHookEventProto event = reader.readEvent();
+ boolean getRelatedEvent = false;
+ while (!getRelatedEvent) {
+ while (event != null && ExecutionMode.TEZ != ExecutionMode.valueOf(event.getExecutionMode())) {
+ event = reader.readEvent();
+ }
+ // Tables read is the table picked for compaction.
+ if (event != null && event.getTablesReadCount() > 0 && dbTableName.equalsIgnoreCase(event.getTablesRead(0))) {
+ getRelatedEvent = true;
+ } else {
+ event = reader.readEvent();
+ }
+ }
+ if (getRelatedEvent) {
+ return event;
+ }
}
- // Tables read is the table picked for compaction.
- if (event.getTablesReadCount() > 0 && dbTableName.equalsIgnoreCase(event.getTablesRead(0))) {
- getRelatedEvent = true;
- } else {
- event = reader.readEvent();
- }
- }
- if (getRelatedEvent) {
- return event;
+ } catch (EOFException e) {
+ //Since Event writing is async it may happen that the event we are looking for is not yet written out.
+ //Let's retry it after waiting a bit
+ Thread.sleep(2000);
}
}
return null;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 57e9d1da5d9..2a9ae73ca26 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -29,11 +29,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -1434,6 +1434,10 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
execDDLOpAndCompactionConcurrently("TRUNCATE_PARTITION", true);
}
private void execDDLOpAndCompactionConcurrently(String opType, boolean isPartioned) throws Exception {
+ // Stats gathering needs to be disabled as it runs in a separate transaction, and it cannot be synchronized using
+ // countdownlatch, because the Statsupdater instance is private and static.
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+
String tblName = "hive12352";
String partName = "test";
@@ -1456,21 +1460,28 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
txnHandler.compact(req);
MRCompactor mrCompactor = Mockito.spy(new MRCompactor(HiveMetaStoreUtils.getHiveMetastoreClient(hiveConf)));
+ CountDownLatch ddlStart = new CountDownLatch(1);
Mockito.doAnswer((Answer<JobConf>) invocationOnMock -> {
JobConf job = (JobConf) invocationOnMock.callRealMethod();
job.setMapperClass(SlowCompactorMap.class);
+ //let concurrent DDL oparetaions to start in the middle of the Compaction Txn, right before SlowCompactorMap will
+ //mimic a long-running compaction
+ ddlStart.countDown();
return job;
}).when(mrCompactor).createBaseJobConf(any(), any(), any(), any(), any(), any());
CompactorFactory mockedFactory = Mockito.mock(CompactorFactory.class);
when(mockedFactory.getCompactorPipeline(any(), any(), any(), any())).thenReturn(new CompactorPipeline(mrCompactor));
- Worker worker = Mockito.spy(new Worker(mockedFactory));
+ Worker worker = new Worker(mockedFactory);
worker.setConf(hiveConf);
worker.init(new AtomicBoolean(true));
CompletableFuture<Void> compactionJob = CompletableFuture.runAsync(worker);
- Thread.sleep(1000);
+
+ if (!ddlStart.await(5000, TimeUnit.MILLISECONDS)) {
+ Assert.fail("Waiting too long for compaction to start!");
+ }
int compHistory = 0;
switch (opType) {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
index 4857634d1e4..ffc6a162796 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
@@ -577,7 +577,7 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
return getMSForConf(conf);
}
- public static RawStore getMSForConf(Configuration conf) throws MetaException {
+ public static synchronized RawStore getMSForConf(Configuration conf) throws MetaException {
RawStore ms = getRawStore();
if (ms == null) {
ms = newRawStoreForConf(conf);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 0d22f5fd9af..ab1db0f8201 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -3647,9 +3647,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
+ "no record found in next_compaction_queue_id");
}
long id = rs.getLong(1);
- s = "UPDATE \"NEXT_COMPACTION_QUEUE_ID\" SET \"NCQ_NEXT\" = " + (id + 1);
+ s = "UPDATE \"NEXT_COMPACTION_QUEUE_ID\" SET \"NCQ_NEXT\" = " + (id + 1) + " WHERE \"NCQ_NEXT\" = " + id;
LOG.debug("Going to execute update <{}>", s);
- stmt.executeUpdate(s);
+ if (stmt.executeUpdate(s) != 1) {
+ //TODO: Eliminate this id generation by implementing: https://issues.apache.org/jira/browse/HIVE-27121
+ LOG.info("The returned compaction ID ({}) already taken, obtaining new", id);
+ return generateCompactionQueueId(stmt);
+ }
return id;
}
}