You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/05 10:21:08 UTC
[kylin] 16/22: KYLIN-5320 check and update dataflow lastQueryTime
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 294895e4eec2a99dd22635bf41e51fac7d509465
Author: Pengfei Zhan <pe...@kyligence.io>
AuthorDate: Tue Sep 27 21:29:12 2022 +0800
KYLIN-5320 check and update dataflow lastQueryTime
---
.../service/task/QueryHistoryTaskScheduler.java | 19 +++++----
.../kylin/rest/service/UserAclServiceTest.java | 7 ++--
.../task/QueryHistoryTaskSchedulerRunnerTest.java | 34 +++++++--------
.../task/QueryHistoryTaskSchedulerTest.java | 48 ++++++++++++++++++----
4 files changed, 71 insertions(+), 37 deletions(-)
diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
index 21764634a9..1d3664e4ec 100644
--- a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
+++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
@@ -36,6 +36,13 @@ import org.apache.kylin.common.util.NamedThreadFactory;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.optimization.FrequencyMap;
+import org.apache.kylin.metadata.epoch.EpochManager;
+import org.apache.kylin.metadata.favorite.AbstractAsyncTask;
+import org.apache.kylin.metadata.favorite.AccelerateRuleUtil;
+import org.apache.kylin.metadata.favorite.AsyncAccelerationTask;
+import org.apache.kylin.metadata.favorite.AsyncTaskManager;
+import org.apache.kylin.metadata.favorite.QueryHistoryIdOffset;
+import org.apache.kylin.metadata.favorite.QueryHistoryIdOffsetManager;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
@@ -53,13 +60,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.kylin.metadata.epoch.EpochManager;
-import org.apache.kylin.metadata.favorite.AbstractAsyncTask;
-import org.apache.kylin.metadata.favorite.AccelerateRuleUtil;
-import org.apache.kylin.metadata.favorite.AsyncAccelerationTask;
-import org.apache.kylin.metadata.favorite.AsyncTaskManager;
-import org.apache.kylin.metadata.favorite.QueryHistoryIdOffset;
-import org.apache.kylin.metadata.favorite.QueryHistoryIdOffsetManager;
import lombok.Data;
import lombok.Getter;
import lombok.val;
@@ -254,7 +254,7 @@ public class QueryHistoryTaskScheduler {
}
val snapshotsInRealization = queryHistory.getQueryHistoryInfo().getQuerySnapshots();
for (val snapshots : snapshotsInRealization) {
- snapshots.stream().forEach(tableIdentify -> {
+ snapshots.forEach(tableIdentify -> {
results.merge(tableManager.getOrCreateTableExt(tableIdentify), 1, Integer::sum);
});
}
@@ -305,6 +305,9 @@ public class QueryHistoryTaskScheduler {
for (Map.Entry<String, Long> entry : modelsLastQueryTime.entrySet()) {
String dataflowId = entry.getKey();
Long lastQueryTime = entry.getValue();
+ if (dfManager.getDataflow(dataflowId) == null) {
+ continue;
+ }
dfManager.updateDataflow(dataflowId, copyForWrite -> copyForWrite.setLastQueryTime(lastQueryTime));
}
}
diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/service/UserAclServiceTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/UserAclServiceTest.java
index 9ac59fb40c..35c7e70657 100644
--- a/src/common-service/src/test/java/org/apache/kylin/rest/service/UserAclServiceTest.java
+++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/UserAclServiceTest.java
@@ -29,6 +29,8 @@ import java.util.Locale;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.msg.MsgPicker;
+import org.apache.kylin.metadata.epoch.EpochManager;
+import org.apache.kylin.metadata.user.ManagedUser;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.request.GlobalAccessRequest;
import org.apache.kylin.rest.request.GlobalBatchAccessRequest;
@@ -38,6 +40,7 @@ import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.rest.util.SpringContext;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -52,9 +55,6 @@ import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.test.util.ReflectionTestUtils;
-import org.apache.kylin.metadata.epoch.EpochManager;
-import org.apache.kylin.metadata.user.ManagedUser;
-
public class UserAclServiceTest extends ServiceTestBase {
@Mock
@@ -120,6 +120,7 @@ public class UserAclServiceTest extends ServiceTestBase {
userAclService.grantUserAclPermission("admin", "DATA_QUERY");
}
+ @Ignore("very unstable")
@Test
public void testGetAllUsersHasGlobalPermission() {
KylinUserService kylinUserService = new KylinUserService() {
diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerRunnerTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerRunnerTest.java
similarity index 82%
rename from src/query-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerRunnerTest.java
rename to src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerRunnerTest.java
index b6fa055350..64a6aa44b9 100644
--- a/src/query-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerRunnerTest.java
+++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerRunnerTest.java
@@ -18,13 +18,15 @@
package org.apache.kylin.rest.service.task;
+import static org.awaitility.Awaitility.await;
+
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.common.util.NamedThreadFactory;
import org.apache.kylin.rest.util.SpringContext;
-import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -69,29 +71,20 @@ public class QueryHistoryTaskSchedulerRunnerTest extends NLocalFileMetadataTestC
val queryHistoryAccelerateRunnerMock = qhAccelerateScheduler.new QueryHistoryAccelerateRunner(false) {
@Override
public void work() {
- try {
- TimeUnit.SECONDS.sleep(mockSleepTimeSecs);
+ await().pollDelay(mockSleepTimeSecs, TimeUnit.SECONDS).until(() -> {
internalExecute.add((System.currentTimeMillis() - startTime) / 1000);
//mock exception
throw new RuntimeException("test for exception");
- } catch (InterruptedException e) {
- log.error("queryHistoryAccelerateRunnerMock is interrupted", e);
- }
+ });
}
-
};
val queryHistoryMetaUpdateRunnerMock = qhAccelerateScheduler.new QueryHistoryMetaUpdateRunner() {
@Override
public void work() {
- try {
- TimeUnit.SECONDS.sleep(mockSleepTimeSecs);
- } catch (InterruptedException e) {
- log.error("queryHistoryMetaUpdateRunner is interrupted", e);
- }
+ await().pollDelay(mockSleepTimeSecs, TimeUnit.SECONDS);
}
-
};
ReflectionTestUtils.setField(qhAccelerateScheduler, "taskScheduler", Executors.newScheduledThreadPool(1,
@@ -101,20 +94,23 @@ public class QueryHistoryTaskSchedulerRunnerTest extends NLocalFileMetadataTestC
val schedulerService = (ScheduledExecutorService) ReflectionTestUtils.getField(qhAccelerateScheduler,
"taskScheduler");
+ Assert.assertNotNull(schedulerService);
schedulerService.scheduleWithFixedDelay(queryHistoryAccelerateRunnerMock, 0, mockSchedulerDelay,
TimeUnit.SECONDS);
schedulerService.scheduleWithFixedDelay(queryHistoryMetaUpdateRunnerMock, 0, mockSchedulerDelay,
TimeUnit.SECONDS);
val schedulerNum = 10;
+ await().pollDelay(schedulerNum, TimeUnit.SECONDS).until(() -> {
+ Assert.assertEquals(internalExecute.size(), schedulerNum / (mockSchedulerDelay + mockSleepTimeSecs));
- TimeUnit.SECONDS.sleep(schedulerNum);
-
- Assert.assertEquals(internalExecute.size(), schedulerNum / (mockSchedulerDelay + mockSleepTimeSecs));
+ for (int i = 0; i < internalExecute.size(); i++) {
+ Assert.assertEquals(internalExecute.get(i), i * mockSchedulerDelay + mockSleepTimeSecs * (i + 1),
+ 1);
+ }
+ return null;
+ });
- for (int i = 0; i < internalExecute.size(); i++) {
- Assert.assertEquals(internalExecute.get(i), i * mockSchedulerDelay + mockSleepTimeSecs * (i + 1), 1);
- }
} catch (Exception e) {
log.error("test qhAccelerateScheduler error :", e);
} finally {
diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java
similarity index 93%
rename from src/query-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java
rename to src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java
index c026e85c34..dc4dba454e 100644
--- a/src/query-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java
+++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java
@@ -18,17 +18,15 @@
package org.apache.kylin.rest.service.task;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
-
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.metadata.model.TableExtDesc;
-import org.apache.kylin.rest.service.IUserGroupService;
-import org.apache.kylin.rest.util.SpringContext;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.junit.TimeZoneTestRunner;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
@@ -38,11 +36,15 @@ import org.apache.kylin.metadata.favorite.AsyncTaskManager;
import org.apache.kylin.metadata.favorite.QueryHistoryIdOffset;
import org.apache.kylin.metadata.favorite.QueryHistoryIdOffsetManager;
import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.query.QueryHistory;
import org.apache.kylin.metadata.query.QueryHistoryInfo;
import org.apache.kylin.metadata.query.QueryMetrics;
import org.apache.kylin.metadata.query.RDBMSQueryHistoryDAO;
+import org.apache.kylin.rest.service.IUserGroupService;
import org.apache.kylin.rest.service.NUserGroupService;
+import org.apache.kylin.rest.service.task.QueryHistoryTaskScheduler.QueryHistoryMetaUpdateRunner;
+import org.apache.kylin.rest.util.SpringContext;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -60,9 +62,12 @@ import org.springframework.security.acls.domain.PermissionFactory;
import org.springframework.security.acls.model.PermissionGrantingStrategy;
import org.springframework.test.util.ReflectionTestUtils;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import lombok.val;
+
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TimeZoneTestRunner.class)
@PrepareForTest({ SpringContext.class, UserGroupInformation.class })
@@ -73,10 +78,11 @@ public class QueryHistoryTaskSchedulerTest extends NLocalFileMetadataTestCase {
private static final String LAYOUT1 = "20000000001";
private static final String LAYOUT2 = "1000001";
private static final Long QUERY_TIME = 1586760398338L;
+
+ private QueryHistoryTaskScheduler qhAccelerateScheduler;
+
@Mock
private final IUserGroupService userGroupService = Mockito.spy(NUserGroupService.class);
- int startOffset = 0;
- private QueryHistoryTaskScheduler qhAccelerateScheduler;
@Before
public void setUp() throws Exception {
@@ -214,6 +220,32 @@ public class QueryHistoryTaskSchedulerTest extends NLocalFileMetadataTestCase {
Assert.assertEquals(8, idOffsetManager.get().getStatMetaUpdateOffset());
}
+ @Test
+ public void testUpdateLastQueryTime()
+ throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+
+ // before update dataflow usage, layout usage and last query time
+ NDataflow dataflow = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT)
+ .getDataflow(DATAFLOW);
+ Assert.assertEquals(3, dataflow.getQueryHitCount());
+ Assert.assertNull(dataflow.getLayoutHitCount().get(20000000001L));
+ Assert.assertNull(dataflow.getLayoutHitCount().get(1000001L));
+ Assert.assertEquals(0L, dataflow.getLastQueryTime());
+
+ val queryHistoryAccelerateRunner = qhAccelerateScheduler.new QueryHistoryMetaUpdateRunner();
+ Class<? extends QueryHistoryMetaUpdateRunner> clazz = queryHistoryAccelerateRunner.getClass();
+ Method method = clazz.getDeclaredMethod("updateLastQueryTime", Map.class, String.class);
+ method.setAccessible(true);
+ method.invoke(queryHistoryAccelerateRunner, ImmutableMap.of("aaa", 100L), PROJECT);
+ method.invoke(queryHistoryAccelerateRunner, ImmutableMap.of(DATAFLOW, 100L), PROJECT);
+ method.setAccessible(false);
+
+ NDataflow dataflow1 = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT)
+ .getDataflow(DATAFLOW);
+ long lastQueryTime = dataflow1.getLastQueryTime();
+ Assert.assertEquals(100L, lastQueryTime);
+ }
+
@Test
public void testUpdateMetadataWithStringRealization() {
qhAccelerateScheduler.queryHistoryDAO = Mockito.mock(RDBMSQueryHistoryDAO.class);
@@ -485,4 +517,6 @@ public class QueryHistoryTaskSchedulerTest extends NLocalFileMetadataTestCase {
return histories;
}
+ int startOffset = 0;
+
}