You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2020/04/20 23:13:54 UTC
[phoenix] branch master updated: PHOENIX-4521: Allow Pherf scenario
to define per query max allowed query execution duration after which thread
is interrupted
This is an automated email from the ASF dual-hosted git repository.
chinmayskulkarni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new dccc260 PHOENIX-4521: Allow Pherf scenario to define per query max allowed query execution duration after which thread is interrupted
dccc260 is described below
commit dccc260413591a7ab3133f8040b8547b8e993750
Author: Christine Feng <ch...@gmail.com>
AuthorDate: Mon Mar 16 19:32:27 2020 -0700
PHOENIX-4521: Allow Pherf scenario to define per query max allowed query execution duration after which thread is interrupted
Signed-off-by: Chinmay Kulkarni <ch...@apache.org>
---
phoenix-pherf/pom.xml | 5 +
.../java/org/apache/phoenix/pherf/PherfMainIT.java | 88 +++++++++++++
.../apache/phoenix/pherf/configuration/Query.java | 11 ++
.../apache/phoenix/pherf/result/QueryResult.java | 2 +
.../org/apache/phoenix/pherf/result/RunTime.java | 34 +++--
.../apache/phoenix/pherf/result/ThreadTime.java | 5 +-
.../apache/phoenix/pherf/result/file/Header.java | 4 +-
.../pherf/workload/MultiThreadedRunner.java | 85 +++++++++----
.../pherf/workload/MultithreadedDiffer.java | 4 +-
.../java/org/apache/phoenix/pherf/ResultTest.java | 10 +-
.../pherf/workload/MultiThreadedRunnerTest.java | 121 ++++++++++++++++++
.../resources/datamodel/timeout_test_schema.sql | 22 ++++
.../resources/scenario/timeout_test_scenario.xml | 138 +++++++++++++++++++++
13 files changed, 483 insertions(+), 46 deletions(-)
diff --git a/phoenix-pherf/pom.xml b/phoenix-pherf/pom.xml
index 9b5914e..a156318 100644
--- a/phoenix-pherf/pom.xml
+++ b/phoenix-pherf/pom.xml
@@ -178,6 +178,11 @@
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
<!-- Java 11 -->
<dependency>
<groupId>javax.activation</groupId>
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
index 3ee9327..be9b27a 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
@@ -18,14 +18,35 @@
package org.apache.phoenix.pherf;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.phoenix.pherf.result.Result;
+import org.apache.phoenix.pherf.result.ResultValue;
+import org.apache.phoenix.pherf.result.file.ResultFileDetails;
+import org.apache.phoenix.pherf.result.impl.CSVFileResultHandler;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.ExpectedSystemExit;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
import java.util.concurrent.Future;
+import static org.junit.Assert.assertEquals;
+
public class PherfMainIT extends ResultBaseTestIT {
+ public HashMap<String, String> mapResults(Result r) throws IOException {
+ HashMap<String, String> map = new HashMap<>();
+ List<ResultValue> resultValues = r.getResultValues();
+ String[] headerValues = r.getHeader().split(PherfConstants.RESULT_FILE_DELIMETER);
+ for (int i = 0; i < headerValues.length; i++) {
+ map.put(StringUtils.strip(headerValues[i],"[] "),
+ StringUtils.strip(resultValues.get(i).toString(), "[] "));
+ }
+ return map;
+ }
+
@Rule
public final ExpectedSystemExit exit = ExpectedSystemExit.none();
@@ -43,4 +64,71 @@ public class PherfMainIT extends ResultBaseTestIT {
future.get();
}
}
+
+ @Test
+ public void testQueryTimeout() throws Exception {
+ // Timeout of 0 ms means every query execution should time out
+ String[] args = {"-q", "-l",
+ "-drop", "all",
+ "-schemaFile", ".*timeout_test_schema.sql",
+ "-scenarioFile", ".*timeout_test_scenario.xml" };
+ Pherf p = new Pherf(args);
+ p.run();
+
+ CSVFileResultHandler rh = new CSVFileResultHandler();
+ rh.setResultFileDetails(ResultFileDetails.CSV_DETAILED_PERFORMANCE);
+ rh.setResultFileName("COMBINED");
+ List<Result> resultList = rh.read();
+ for (Result r : resultList) {
+ HashMap<String, String> resultsMap = mapResults(r);
+ if (resultsMap.get("QUERY_ID").equals("q1")) {
+ assertEquals(resultsMap.get("TIMED_OUT"), "true");
+ }
+ }
+ }
+
+ @Test
+ public void testLargeQueryTimeout() throws Exception {
+ // Timeout of max_long ms means every query execution should finish without timing out
+ String[] args = {"-q", "-l",
+ "-drop", "all",
+ "-schemaFile", ".*timeout_test_schema.sql",
+ "-scenarioFile", ".*timeout_test_scenario.xml" };
+ Pherf p = new Pherf(args);
+ p.run();
+
+ CSVFileResultHandler rh = new CSVFileResultHandler();
+ rh.setResultFileDetails(ResultFileDetails.CSV_DETAILED_PERFORMANCE);
+ rh.setResultFileName("COMBINED");
+ List<Result> resultList = rh.read();
+ for (Result r : resultList) {
+ HashMap<String, String> resultsMap = mapResults(r);
+ if (resultsMap.get("QUERY_ID").equals("q2")) {
+ assertEquals(resultsMap.get("TIMED_OUT"), "false");
+ }
+ }
+ }
+
+ @Test
+ public void testNoQueryTimeout() throws Exception {
+ // Missing timeout attribute means every query execution should finish without timing out
+ String[] args = {"-q", "-l",
+ "-drop", "all",
+ "-schemaFile", ".*timeout_test_schema.sql",
+ "-scenarioFile", ".*timeout_test_scenario.xml" };
+ Pherf p = new Pherf(args);
+ p.run();
+
+ CSVFileResultHandler rh = new CSVFileResultHandler();
+ rh.setResultFileDetails(ResultFileDetails.CSV_DETAILED_PERFORMANCE);
+ rh.setResultFileName("COMBINED");
+ List<Result> resultList = rh.read();
+ for (Result r : resultList) {
+ HashMap<String, String> resultsMap = mapResults(r);
+ if (resultsMap.get("QUERY_ID").equals("q3")) {
+ assertEquals(resultsMap.get("TIMED_OUT"), "false");
+ }
+ }
+ }
+
}
\ No newline at end of file
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
index e283715..5f28134 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
@@ -36,6 +36,7 @@ public class Query {
private String queryGroup;
private String id;
private Pattern pattern;
+ private long timeoutDuration = Long.MAX_VALUE;
public Query() {
pattern = Pattern.compile("\\[.*?\\]");
@@ -158,4 +159,14 @@ public class Query {
public void setId(String id) {
this.id = id;
}
+
+
+ @XmlAttribute
+ public long getTimeoutDuration() {
+ return this.timeoutDuration;
+ }
+
+ public void setTimeoutDuration(long timeoutDuration) {
+ this.timeoutDuration = timeoutDuration;
+ }
}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
index cef24f4..228d003 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
@@ -54,6 +54,7 @@ public class QueryResult extends Query {
this.setDdl(query.getDdl());
this.setQueryGroup(query.getQueryGroup());
this.setId(query.getId());
+ this.setTimeoutDuration(query.getTimeoutDuration());
}
public Date getStartTime() {
@@ -150,6 +151,7 @@ public class QueryResult extends Query {
List<ResultValue> rowValues = new ArrayList<>();
rowValues.add(new ResultValue(util.convertNull(getStartTimeText())));
rowValues.add(new ResultValue(util.convertNull(this.getQueryGroup())));
+ rowValues.add(new ResultValue(util.convertNull(this.getId())));
rowValues.add(new ResultValue(util.convertNull(this.getStatement())));
rowValues.add(new ResultValue(util.convertNull(this.getTenantId())));
rowValues.addAll(runTime);
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
index 3aa45fa..59bd265 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
@@ -24,38 +24,44 @@ import java.util.Date;
public class RunTime implements Comparator<RunTime>, Comparable<RunTime> {
private Date startTime;
- private Integer elapsedDurationInMs;
+ private Long elapsedDurationInMs;
private String message;
private Long resultRowCount;
private String explainPlan;
+ private boolean timedOut;
@SuppressWarnings("unused") public RunTime() {
}
- @SuppressWarnings("unused") public RunTime(Integer elapsedDurationInMs) {
+ @SuppressWarnings("unused") public RunTime(Long elapsedDurationInMs) {
this(null, elapsedDurationInMs);
}
- public RunTime(Long resultRowCount, Integer elapsedDurationInMs) {
+ public RunTime(Long resultRowCount, Long elapsedDurationInMs) {
this(null, resultRowCount, elapsedDurationInMs);
}
- public RunTime(Date startTime, Long resultRowCount, Integer elapsedDurationInMs) {
- this(null, null, startTime, resultRowCount, elapsedDurationInMs);
+ public RunTime(Date startTime, Long resultRowCount, Long elapsedDurationInMs) {
+ this(null, null, startTime, resultRowCount, elapsedDurationInMs, false);
+ }
+
+ public RunTime(Date startTime, Long elapsedDurationInMs, boolean timedOut) {
+ this(null, startTime, null, elapsedDurationInMs, timedOut);
}
public RunTime(String message, Date startTime, Long resultRowCount,
- Integer elapsedDurationInMs) {
- this(message, null, startTime, resultRowCount, elapsedDurationInMs);
+ Long elapsedDurationInMs, boolean timedOut) {
+ this(message, null, startTime, resultRowCount, elapsedDurationInMs, timedOut);
}
public RunTime(String message, String explainPlan, Date startTime, Long resultRowCount,
- Integer elapsedDurationInMs) {
+ Long elapsedDurationInMs, boolean timedOut) {
this.elapsedDurationInMs = elapsedDurationInMs;
this.startTime = startTime;
this.resultRowCount = resultRowCount;
this.message = message;
this.explainPlan = explainPlan;
+ this.timedOut = timedOut;
}
@XmlAttribute() public Date getStartTime() {
@@ -66,11 +72,11 @@ public class RunTime implements Comparator<RunTime>, Comparable<RunTime> {
this.startTime = startTime;
}
- @XmlAttribute() public Integer getElapsedDurationInMs() {
+ @XmlAttribute() public Long getElapsedDurationInMs() {
return elapsedDurationInMs;
}
- @SuppressWarnings("unused") public void setElapsedDurationInMs(Integer elapsedDurationInMs) {
+ @SuppressWarnings("unused") public void setElapsedDurationInMs(Long elapsedDurationInMs) {
this.elapsedDurationInMs = elapsedDurationInMs;
}
@@ -105,4 +111,12 @@ public class RunTime implements Comparator<RunTime>, Comparable<RunTime> {
@SuppressWarnings("unused") public void setResultRowCount(Long resultRowCount) {
this.resultRowCount = resultRowCount;
}
+
+ @SuppressWarnings("unused") public void setTimedOut(boolean timedOut) {
+ this.timedOut = timedOut;
+ }
+
+ @XmlAttribute() public boolean getTimedOut() {
+ return this.timedOut;
+ }
}
\ No newline at end of file
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
index 03b5664..e1e7652 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
@@ -69,13 +69,13 @@ public class ThreadTime {
public Integer getAvgTimeInMs() {
if (getRunTimesInMs().isEmpty()) return null;
- Integer totalTimeInMs = new Integer(0);
+ Long totalTimeInMs = new Long(0);
for (RunTime runTime : getRunTimesInMs()) {
if (null != runTime.getElapsedDurationInMs()) {
totalTimeInMs += runTime.getElapsedDurationInMs();
}
}
- return totalTimeInMs / getRunTimesInMs().size();
+ return (int) (totalTimeInMs / getRunTimesInMs().size());
}
public RunTime getMaxTimeInMs() {
@@ -117,6 +117,7 @@ public class ThreadTime {
rowValues.add(new ResultValue(
util.convertNull(getRunTimesInMs().get(i).getMessage())));
}
+ rowValues.add(new ResultValue(getRunTimesInMs().get(i).getTimedOut()));
rows.add(rowValues);
}
return rows;
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
index 7d09f68..c888199 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
@@ -24,8 +24,8 @@ public enum Header {
"START_TIME,QUERY_GROUP,QUERY,TENANT_ID,AVG_MAX_TIME_MS,AVG_TIME_MS,AVG_MIN_TIME_MS,RUN_COUNT,EXPLAIN_PLAN,RESULT_ROW_COUNT"),
DETAILED_BASE(
"BASE_TABLE_NAME,SCENARIO_NAME,ZOOKEEPER,ROW_COUNT,EXECUTION_COUNT,EXECUTION_TYPE,PHOENIX_PROPERTIES"
- + ",START_TIME,QUERY_GROUP,QUERY,TENANT_ID,THREAD_NUMBER,CONCURRENCY_LEVEL"),
- DETAILED_PERFORMANCE(DETAILED_BASE + ",RESULT_ROW_COUNT,RUN_TIME_MS"),
+ + ",START_TIME,QUERY_GROUP,QUERY_ID,QUERY,TENANT_ID,THREAD_NUMBER,CONCURRENCY_LEVEL"),
+ DETAILED_PERFORMANCE(DETAILED_BASE + ",RESULT_ROW_COUNT,RUN_TIME_MS,TIMED_OUT"),
DETAILED_FUNCTIONAL(DETAILED_BASE + ",DIFF_STATUS,EXPLAIN_PLAN"),
AGGREGATE_DATA_LOAD("ZK,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"),
THIN_AGGREGATE_DATA_LOAD("QUERYSERVER,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"),
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
index 9fcc38e..41e6045 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
@@ -25,6 +25,8 @@ import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.Callable;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.pherf.result.DataModelResult;
import org.apache.phoenix.pherf.result.ResultManager;
import org.apache.phoenix.pherf.result.RunTime;
@@ -88,13 +90,21 @@ class MultiThreadedRunner implements Callable<Void> {
*/
@Override
public Void call() throws Exception {
- LOGGER.info("\n\nThread Starting " + threadName + " ; " + query.getStatement() + " for "
- + numberOfExecutions + "times\n\n");
- Long start = EnvironmentEdgeManager.currentTimeMillis();
- for (long i = numberOfExecutions; (i > 0 && ((EnvironmentEdgeManager.currentTimeMillis() - start)
- < executionDurationInMs)); i--) {
+ LOGGER.info("\n\nThread Starting " + threadName + " ; '" + query.getStatement() + "' for "
+ + numberOfExecutions + " times\n\n");
+ long threadStartTime = EnvironmentEdgeManager.currentTimeMillis();
+ for (long i = 0; i < numberOfExecutions; i++) {
+ long threadElapsedTime = EnvironmentEdgeManager.currentTimeMillis() - threadStartTime;
+ if (threadElapsedTime >= executionDurationInMs) {
+ LOGGER.info("Queryset timeout of " + executionDurationInMs + " ms reached; current time is " + threadElapsedTime + " ms."
+ + "\nStopping queryset execution for query " + query.getId() + " on thread " + threadName + "...");
+ break;
+ }
+
synchronized (workloadExecutor) {
- timedQuery();
+ if (!timedQuery(i+1)) {
+ break;
+ }
if ((EnvironmentEdgeManager.currentTimeMillis() - lastResultWritten) > 1000) {
resultManager.write(dataModelResult, ruleApplier);
lastResultWritten = EnvironmentEdgeManager.currentTimeMillis();
@@ -119,8 +129,9 @@ class MultiThreadedRunner implements Callable<Void> {
* Timed query execution
*
* @throws Exception
+ * @returns boolean true if query finished without timing out; false otherwise
*/
- private void timedQuery() throws Exception {
+ private boolean timedQuery(long iterationNumber) throws Exception {
boolean
isSelectCountStatement =
query.getStatement().toUpperCase().trim().contains("COUNT(") ? true : false;
@@ -128,17 +139,19 @@ class MultiThreadedRunner implements Callable<Void> {
Connection conn = null;
PreparedStatement statement = null;
ResultSet rs = null;
- Long start = EnvironmentEdgeManager.currentTimeMillis();
+ Long queryStartTime = EnvironmentEdgeManager.currentTimeMillis();
Date startDate = Calendar.getInstance().getTime();
String exception = null;
- long resultRowCount = 0;
+ Long resultRowCount = 0L;
+ String queryIteration = threadName + ":" + iterationNumber;
+ Long queryElapsedTime = 0L;
try {
conn = pUtil.getConnection(query.getTenantId(), scenario.getPhoenixProperties());
conn.setAutoCommit(true);
final String statementString = query.getDynamicStatement(ruleApplier, scenario);
statement = conn.prepareStatement(statementString);
- LOGGER.info("Executing: " + statementString);
+ LOGGER.info("Executing iteration: " + queryIteration + ": " + statementString);
if (scenario.getWriteParams() != null) {
Workload writes = new WriteWorkload(PhoenixUtil.create(), parser, scenario, GeneratePhoenixStats.NO);
@@ -148,34 +161,54 @@ class MultiThreadedRunner implements Callable<Void> {
boolean isQuery = statement.execute();
if (isQuery) {
rs = statement.getResultSet();
- while (rs.next()) {
- if (null != query.getExpectedAggregateRowCount()) {
- if (rs.getLong(1) != query.getExpectedAggregateRowCount())
- throw new RuntimeException(
- "Aggregate count " + rs.getLong(1) + " does not match expected "
- + query.getExpectedAggregateRowCount());
- }
-
- if (isSelectCountStatement) {
- resultRowCount = rs.getLong(1);
- } else {
- resultRowCount++;
- }
- }
+ Pair<Long, Long> r = getResults(rs, queryIteration, isSelectCountStatement, queryStartTime);
+ resultRowCount = r.getFirst();
+ queryElapsedTime = r.getSecond();
} else {
conn.commit();
}
} catch (Exception e) {
- LOGGER.error("Exception while executing query", e);
+ LOGGER.error("Exception while executing query iteration " + queryIteration, e);
exception = e.getMessage();
throw e;
} finally {
getThreadTime().getRunTimesInMs().add(new RunTime(exception, startDate, resultRowCount,
- (int) (EnvironmentEdgeManager.currentTimeMillis() - start)));
+ queryElapsedTime, queryElapsedTime > query.getTimeoutDuration()));
if (rs != null) rs.close();
if (statement != null) statement.close();
if (conn != null) conn.close();
}
+ return true;
}
+
+ @VisibleForTesting
+ /**
+ * @return a Pair whose first value is the resultRowCount, and whose second value is whether the query timed out.
+ */
+ Pair<Long, Long> getResults(ResultSet rs, String queryIteration, boolean isSelectCountStatement, Long queryStartTime) throws Exception {
+ Long resultRowCount = 0L;
+ while (rs.next()) {
+ if (null != query.getExpectedAggregateRowCount()) {
+ if (rs.getLong(1) != query.getExpectedAggregateRowCount())
+ throw new RuntimeException(
+ "Aggregate count " + rs.getLong(1) + " does not match expected "
+ + query.getExpectedAggregateRowCount());
+ }
+
+ if (isSelectCountStatement) {
+ resultRowCount = rs.getLong(1);
+ } else {
+ resultRowCount++;
+ }
+ long queryElapsedTime = EnvironmentEdgeManager.currentTimeMillis() - queryStartTime;
+ if (queryElapsedTime >= query.getTimeoutDuration()) {
+ LOGGER.error("Query " + queryIteration + " exceeded timeout of "
+ + query.getTimeoutDuration() + " ms at " + queryElapsedTime + " ms.");
+ return new Pair(resultRowCount, queryElapsedTime);
+ }
+ }
+ return new Pair(resultRowCount, EnvironmentEdgeManager.currentTimeMillis() - queryStartTime);
+ }
+
}
\ No newline at end of file
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
index e3480dd..6f43ee4 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
@@ -22,6 +22,7 @@ import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.Callable;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.pherf.PherfConstants;
import org.apache.phoenix.pherf.configuration.Query;
import org.apache.phoenix.pherf.result.RunTime;
@@ -56,10 +57,11 @@ class MultithreadedDiffer implements Callable<Void> {
String newCSV = queryVerifier.exportCSV(query);
boolean verifyResult = queryVerifier.doDiff(query, newCSV);
String explainPlan = pUtil.getExplainPlan(query);
+ long elapsedTime = EnvironmentEdgeManager.currentTimeMillis() - start;
getThreadTime().getRunTimesInMs().add(new RunTime(
verifyResult == true ? PherfConstants.DIFF_PASS : PherfConstants.DIFF_FAIL,
explainPlan, startDate, -1L,
- (int) (EnvironmentEdgeManager.currentTimeMillis() - start)));
+ elapsedTime, !(elapsedTime >= executionDurationInMs)));
}
/**
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
index 28e8c62..9cad1f1 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
@@ -137,8 +137,8 @@ public class ResultTest extends ResultBaseTest {
ThreadTime ttFromFile = queryResultFromFile.getThreadTimes().get(0);
// thread level verification
- assertEquals(10, (int) ttFromFile.getMinTimeInMs().getElapsedDurationInMs());
- assertEquals(30, (int) ttFromFile.getMaxTimeInMs().getElapsedDurationInMs());
+ assertEquals(new Long(10), ttFromFile.getMinTimeInMs().getElapsedDurationInMs());
+ assertEquals(new Long(30), ttFromFile.getMaxTimeInMs().getElapsedDurationInMs());
assertEquals(20, (int) ttFromFile.getAvgTimeInMs());
// 3rd runtime has the earliest start time, therefore that's what's expected.
@@ -190,13 +190,13 @@ public class ResultTest extends ResultBaseTest {
tt.setThreadName("thread1");
Calendar calendar = Calendar.getInstance();
Date startTime1 = calendar.getTime();
- RunTime runtime1 = new RunTime(startTime1, 1000L, 10);
+ RunTime runtime1 = new RunTime(startTime1, 1000L, new Long(10));
tt.getRunTimesInMs().add(runtime1);
calendar.add(Calendar.MINUTE, -1);
- RunTime runtime2 = new RunTime(calendar.getTime(), 2000L, 20);
+ RunTime runtime2 = new RunTime(calendar.getTime(), 2000L, new Long(20));
tt.getRunTimesInMs().add(runtime2);
calendar.add(Calendar.MINUTE, -1);
- RunTime runtime3 = new RunTime(calendar.getTime(), 3000L, 30);
+ RunTime runtime3 = new RunTime(calendar.getTime(), 3000L, new Long(30));
tt.getRunTimesInMs().add(runtime3);
queryResult.getThreadTimes().add(tt);
queryResult2.getThreadTimes().add(tt);
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/MultiThreadedRunnerTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/MultiThreadedRunnerTest.java
new file mode 100644
index 0000000..d9c7ca3
--- /dev/null
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/MultiThreadedRunnerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload;
+
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.pherf.configuration.Query;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.configuration.XMLConfigParser;
+import org.apache.phoenix.pherf.result.DataModelResult;
+import org.apache.phoenix.pherf.result.ThreadTime;
+import org.apache.phoenix.pherf.rules.RulesApplier;
+import org.apache.phoenix.util.DefaultEnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.sql.ResultSet;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class MultiThreadedRunnerTest {
+ @Mock
+ private static XMLConfigParser mockParser;
+ @Mock
+ private static DataModelResult mockDMR;
+ @Mock
+ private static RulesApplier mockRA;
+ @Mock
+ private static ThreadTime mockTT;
+ @Mock
+ private static Scenario mockScenario;
+ @Mock
+ private static WorkloadExecutor mockWE;
+ @Mock
+ private static Query mockQuery;
+ @Mock
+ private static ResultSet mockRS;
+
+ @Before
+ public void init() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testExpectedRowsMismatch() throws Exception {
+ Mockito.when(mockQuery.getExpectedAggregateRowCount()).thenReturn(1L);
+ MultiThreadedRunner mtr = new MultiThreadedRunner("test",
+ mockQuery, mockDMR, mockTT,
+ 10L, 1000L,
+ true, mockRA,
+ mockScenario, mockWE, mockParser);
+ Mockito.when(mockRS.next()).thenReturn(true);
+ Mockito.when(mockRS.getLong(1)).thenReturn(2L);
+ try {
+ mtr.getResults(mockRS, "test_iteration", false,0L);
+ fail();
+ } catch (RuntimeException e) {
+ //pass;
+ }
+
+ }
+
+ @Test
+ public void testTimeout() throws Exception {
+ Mockito.when(mockQuery.getTimeoutDuration()).thenReturn(1000L);
+ Mockito.when(mockQuery.getExpectedAggregateRowCount()).thenReturn(1L);
+ MultiThreadedRunner mtr = new MultiThreadedRunner("test",
+ mockQuery, mockDMR, mockTT,
+ 10L, 1000L,
+ true, mockRA,
+ mockScenario, mockWE, mockParser);
+ DefaultEnvironmentEdge myClock = Mockito.mock(DefaultEnvironmentEdge.class);
+ Mockito.when(myClock.currentTime()).thenReturn(0L, 5000L);
+ EnvironmentEdgeManager.injectEdge(myClock);
+ Mockito.when(mockRS.next()).thenReturn(true);
+ Mockito.when(mockRS.getLong(1)).thenReturn(1L);
+ Pair<Long, Long> results = mtr.getResults(mockRS, "test_iteration", false,0L);
+ assertTrue(results.getSecond()>mockQuery.getTimeoutDuration());
+ }
+
+ @Test
+ public void testFinishWithoutTimeout() throws Exception {
+ DefaultEnvironmentEdge myClock = Mockito.mock(DefaultEnvironmentEdge.class);
+ Mockito.when(myClock.currentTime()).thenReturn(0L);
+ EnvironmentEdgeManager.injectEdge(myClock);
+ Mockito.when(mockQuery.getTimeoutDuration()).thenReturn(1000L);
+ Mockito.when(mockQuery.getExpectedAggregateRowCount()).thenReturn(1L);
+ MultiThreadedRunner mtr = new MultiThreadedRunner("test",
+ mockQuery, mockDMR, mockTT,
+ 10L, 1000L,
+ true, mockRA,
+ mockScenario, mockWE, mockParser);
+ Mockito.when(mockRS.next()).thenReturn(true, false);
+ Mockito.when(mockRS.getLong(1)).thenReturn(1L);
+ Pair<Long, Long> results = mtr.getResults(mockRS, "test_iteration", false, 0L);
+ assertFalse(results.getSecond() > mockQuery.getTimeoutDuration());
+ }
+
+}
diff --git a/phoenix-pherf/src/test/resources/datamodel/timeout_test_schema.sql b/phoenix-pherf/src/test/resources/datamodel/timeout_test_schema.sql
new file mode 100644
index 0000000..e753ddf
--- /dev/null
+++ b/phoenix-pherf/src/test/resources/datamodel/timeout_test_schema.sql
@@ -0,0 +1,22 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+ CREATE TABLE IF NOT EXISTS PHERF.USER_DEFINED_TEST (
+ TENANT_ID VARCHAR NOT NULL PRIMARY KEY,
+ CREATED_DATE DATE,
+ VAL_STRING VARCHAR
+)
\ No newline at end of file
diff --git a/phoenix-pherf/src/test/resources/scenario/timeout_test_scenario.xml b/phoenix-pherf/src/test/resources/scenario/timeout_test_scenario.xml
new file mode 100644
index 0000000..0fe2463
--- /dev/null
+++ b/phoenix-pherf/src/test/resources/scenario/timeout_test_scenario.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<datamodel release="192" name="test_scenario">
+ <datamapping>
+ <column>
+ <!-- This column type defines what will generally happen to VARCHAR fields unless they are explicitly defined or overridden elsewhere -->
+ <type>VARCHAR</type>
+ <dataSequence>SEQUENTIAL</dataSequence>
+ <length>15</length>
+ <name>GENERAL_VARCHAR</name>
+ </column>
+ <column>
+ <type>CHAR</type>
+ <dataSequence>SEQUENTIAL</dataSequence>
+ <length>15</length>
+ <name>GENERAL_CHAR</name>
+ </column>
+ <column>
+ <type>DATE</type>
+ <!--SEQUENTIAL is unsupported for DATE -->
+ <dataSequence>RANDOM</dataSequence>
+ <!-- Number [0-100] that represents the probability of creating a null value -->
+ <!-- The higher the number, the more like the value will returned will be null -->
+ <!-- Leaving this tag out is equivalent to having a 0 probability. i.e. never null -->
+ <nullChance>0</nullChance>
+ <minValue>1975</minValue>
+ <maxValue>2025</maxValue>
+ <name>GENERAL_DATE</name>
+ </column>
+ <column>
+ <type>DECIMAL</type>
+ <dataSequence>RANDOM</dataSequence>
+ <minValue>0</minValue>
+ <maxValue>1</maxValue>
+
+ <!-- Precision is limited to 18 -->
+ <precision>18</precision>
+ <!-- Number [0-100] that represents the probability of creating a null value -->
+ <!-- The higher the number, the more like the value will returned will be null -->
+ <!-- Leaving this tag out is equivalent to having a 0 probability. i.e. never null -->
+ <nullChance>10</nullChance>
+ <name>GENERAL_DECIMAL</name>
+ </column>
+ <column>
+ <type>INTEGER</type>
+ <dataSequence>RANDOM</dataSequence>
+ <minValue>1</minValue>
+ <maxValue>50000000</maxValue>
+ <!-- Number [0-100] that represents the probability of creating a null value -->
+ <!-- The higher the number, the more like the value will returned will be null -->
+ <!-- Leaving this tag out is equivalent to having a 0 probability. i.e. never null -->
+ <name>GENERAL_INTEGER</name>
+ </column>
+ <column>
+ <type>DATE</type>
+ <name>CREATED_DATE</name>
+ <minValue>1975</minValue>
+ <maxValue>2025</maxValue>
+ <valuelist>
+ <!-- Distributes randomly with equal chance of being picked -->
+ <datavalue distribution="80">
+ <!-- Joda time format: yyyy-MM-dd HH:mm:ss.SSS ZZZ -->
+ <minValue>2019-09-15 00:01:00.000</minValue>
+ <maxValue>2019-09-15 11:00:00.000</maxValue>
+ </datavalue>
+ <datavalue distribution="10">
+ <value>2019-09-19 00:01:00.000</value>
+ </datavalue>
+ <datavalue distribution="10">
+ <minValue>2019-09-22 00:01:00.000</minValue>
+ <maxValue>2019-09-22 00:01:00.300</maxValue>
+ </datavalue>
+ </valuelist>
+ </column>
+ <column>
+ <type>CHAR</type>
+ <userDefined>true</userDefined>
+ <dataSequence>LIST</dataSequence>
+ <length>15</length>
+ <name>VAL_STRING</name>
+ <valuelist>
+ <!-- Distributes randomly with equal chance of being picked -->
+ <datavalue distribution="50">
+ <value>KjhoOmnNbBs9kWs</value>
+ </datavalue>
+ <datavalue distribution="50">
+ <value>VAL123</value>
+ </datavalue>
+ </valuelist>
+ </column>
+ </datamapping>
+ <scenarios>
+ <scenario tableName="PHERF.USER_DEFINED_TEST" rowCount="100000" name="myscenario">
+ <!-- Scenario level rule overrides will be unsupported in V1.
+ You can use the general datamappings in the mean time-->
+ <dataOverride>
+ <column>
+ <type>VARCHAR</type>
+ <userDefined>true</userDefined>
+ <dataSequence>RANDOM</dataSequence>
+ <length>10</length>
+ <name>DO_NOT_USE</name>
+ </column>
+ </dataOverride>
+ <!--Note: 1. Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first
+ 2. DDL included in query are executed only once on start of querySet execution.
+ -->
+ <querySet concurrency="1-3" executionType="SERIAL" executionDurationInMs="5000" numberOfExecutions="10">
+ <!-- queryGroup is a way to organize queries across tables or scenario files.
+ The value will be dumped to results. This gives a value to group by on reporting to compare queries -->
+ <query id="q1" expectedAggregateRowCount="100000" timeoutDuration="0"
+ statement="SELECT COUNT(*) FROM PHERF.USER_DEFINED_TEST"/>
+ <query id="q2" expectedAggregateRowCount="100000" timeoutDuration="9223372036854775807"
+ statement="SELECT COUNT(*) FROM PHERF.USER_DEFINED_TEST"/>
+ <query id="q3" expectedAggregateRowCount="100000"
+ statement="SELECT COUNT(*) FROM PHERF.USER_DEFINED_TEST"/>
+
+ </querySet>
+ </scenario>
+ </scenarios>
+</datamodel>