You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2020/04/20 23:13:54 UTC

[phoenix] branch master updated: PHOENIX-4521: Allow Pherf scenario to define per query max allowed query execution duration after which thread is interrupted

This is an automated email from the ASF dual-hosted git repository.

chinmayskulkarni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new dccc260  PHOENIX-4521: Allow Pherf scenario to define per query max allowed query execution duration after which thread is interrupted
dccc260 is described below

commit dccc260413591a7ab3133f8040b8547b8e993750
Author: Christine Feng <ch...@gmail.com>
AuthorDate: Mon Mar 16 19:32:27 2020 -0700

    PHOENIX-4521: Allow Pherf scenario to define per query max allowed query execution duration after which thread is interrupted
    
    Signed-off-by: Chinmay Kulkarni <ch...@apache.org>
---
 phoenix-pherf/pom.xml                              |   5 +
 .../java/org/apache/phoenix/pherf/PherfMainIT.java |  88 +++++++++++++
 .../apache/phoenix/pherf/configuration/Query.java  |  11 ++
 .../apache/phoenix/pherf/result/QueryResult.java   |   2 +
 .../org/apache/phoenix/pherf/result/RunTime.java   |  34 +++--
 .../apache/phoenix/pherf/result/ThreadTime.java    |   5 +-
 .../apache/phoenix/pherf/result/file/Header.java   |   4 +-
 .../pherf/workload/MultiThreadedRunner.java        |  85 +++++++++----
 .../pherf/workload/MultithreadedDiffer.java        |   4 +-
 .../java/org/apache/phoenix/pherf/ResultTest.java  |  10 +-
 .../pherf/workload/MultiThreadedRunnerTest.java    | 121 ++++++++++++++++++
 .../resources/datamodel/timeout_test_schema.sql    |  22 ++++
 .../resources/scenario/timeout_test_scenario.xml   | 138 +++++++++++++++++++++
 13 files changed, 483 insertions(+), 46 deletions(-)

diff --git a/phoenix-pherf/pom.xml b/phoenix-pherf/pom.xml
index 9b5914e..a156318 100644
--- a/phoenix-pherf/pom.xml
+++ b/phoenix-pherf/pom.xml
@@ -178,6 +178,11 @@
       <artifactId>hamcrest-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
     <!-- Java 11 -->
     <dependency>
       <groupId>javax.activation</groupId>
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
index 3ee9327..be9b27a 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
@@ -18,14 +18,35 @@
 
 package org.apache.phoenix.pherf;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.phoenix.pherf.result.Result;
+import org.apache.phoenix.pherf.result.ResultValue;
+import org.apache.phoenix.pherf.result.file.ResultFileDetails;
+import org.apache.phoenix.pherf.result.impl.CSVFileResultHandler;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.contrib.java.lang.system.ExpectedSystemExit;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
 import java.util.concurrent.Future;
 
+import static org.junit.Assert.assertEquals;
+
 public class PherfMainIT extends ResultBaseTestIT {
 
+    public HashMap<String, String> mapResults(Result r) throws IOException {
+        HashMap<String, String> map = new HashMap<>();
+        List<ResultValue> resultValues = r.getResultValues();
+        String[] headerValues = r.getHeader().split(PherfConstants.RESULT_FILE_DELIMETER);
+        for (int i = 0; i < headerValues.length; i++) {
+            map.put(StringUtils.strip(headerValues[i],"[] "),
+                    StringUtils.strip(resultValues.get(i).toString(), "[] "));
+        }
+        return map;
+    }
+
     @Rule
     public final ExpectedSystemExit exit = ExpectedSystemExit.none();
 
@@ -43,4 +64,71 @@ public class PherfMainIT extends ResultBaseTestIT {
             future.get();
         }
     }
+
+    @Test
+    public void testQueryTimeout() throws Exception {
+        // Timeout of 0 ms means every query execution should time out
+        String[] args = {"-q", "-l",
+                "-drop", "all",
+                "-schemaFile", ".*timeout_test_schema.sql",
+                "-scenarioFile", ".*timeout_test_scenario.xml" };
+        Pherf p = new Pherf(args);
+        p.run();
+
+        CSVFileResultHandler rh = new CSVFileResultHandler();
+        rh.setResultFileDetails(ResultFileDetails.CSV_DETAILED_PERFORMANCE);
+        rh.setResultFileName("COMBINED");
+        List<Result> resultList = rh.read();
+        for (Result r : resultList) {
+            HashMap<String, String> resultsMap = mapResults(r);
+            if (resultsMap.get("QUERY_ID").equals("q1")) {
+                assertEquals(resultsMap.get("TIMED_OUT"), "true");
+            }
+        }
+    }
+
+    @Test
+    public void testLargeQueryTimeout() throws Exception {
+        // Timeout of max_long ms means every query execution should finish without timing out
+        String[] args = {"-q", "-l",
+                "-drop", "all",
+                "-schemaFile", ".*timeout_test_schema.sql",
+                "-scenarioFile", ".*timeout_test_scenario.xml" };
+        Pherf p = new Pherf(args);
+        p.run();
+
+        CSVFileResultHandler rh = new CSVFileResultHandler();
+        rh.setResultFileDetails(ResultFileDetails.CSV_DETAILED_PERFORMANCE);
+        rh.setResultFileName("COMBINED");
+        List<Result> resultList = rh.read();
+        for (Result r : resultList) {
+            HashMap<String, String> resultsMap = mapResults(r);
+            if (resultsMap.get("QUERY_ID").equals("q2")) {
+                assertEquals(resultsMap.get("TIMED_OUT"), "false");
+            }
+        }
+    }
+
+    @Test
+    public void testNoQueryTimeout() throws Exception {
+        // Missing timeout attribute means every query execution should finish without timing out
+        String[] args = {"-q", "-l",
+                "-drop", "all",
+                "-schemaFile", ".*timeout_test_schema.sql",
+                "-scenarioFile", ".*timeout_test_scenario.xml" };
+        Pherf p = new Pherf(args);
+        p.run();
+
+        CSVFileResultHandler rh = new CSVFileResultHandler();
+        rh.setResultFileDetails(ResultFileDetails.CSV_DETAILED_PERFORMANCE);
+        rh.setResultFileName("COMBINED");
+        List<Result> resultList = rh.read();
+        for (Result r : resultList) {
+            HashMap<String, String> resultsMap = mapResults(r);
+            if (resultsMap.get("QUERY_ID").equals("q3")) {
+                assertEquals(resultsMap.get("TIMED_OUT"), "false");
+            }
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
index e283715..5f28134 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
@@ -36,6 +36,7 @@ public class Query {
     private String queryGroup;
     private String id;
     private Pattern pattern;
+    private long timeoutDuration = Long.MAX_VALUE;
 
     public Query() {
     	pattern = Pattern.compile("\\[.*?\\]");
@@ -158,4 +159,14 @@ public class Query {
     public void setId(String id) {
         this.id = id;
     }
+
+
+    @XmlAttribute
+    public long getTimeoutDuration() {
+        return this.timeoutDuration;
+    }
+
+    public void setTimeoutDuration(long timeoutDuration) {
+        this.timeoutDuration = timeoutDuration;
+    }
 }
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
index cef24f4..228d003 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
@@ -54,6 +54,7 @@ public class QueryResult extends Query {
         this.setDdl(query.getDdl());
         this.setQueryGroup(query.getQueryGroup());
         this.setId(query.getId());
+        this.setTimeoutDuration(query.getTimeoutDuration());
     }
 
     public Date getStartTime() {
@@ -150,6 +151,7 @@ public class QueryResult extends Query {
                 List<ResultValue> rowValues = new ArrayList<>();
                 rowValues.add(new ResultValue(util.convertNull(getStartTimeText())));
                 rowValues.add(new ResultValue(util.convertNull(this.getQueryGroup())));
+                rowValues.add(new ResultValue(util.convertNull(this.getId())));
                 rowValues.add(new ResultValue(util.convertNull(this.getStatement())));
                 rowValues.add(new ResultValue(util.convertNull(this.getTenantId())));
                 rowValues.addAll(runTime);
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
index 3aa45fa..59bd265 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
@@ -24,38 +24,44 @@ import java.util.Date;
 
 public class RunTime implements Comparator<RunTime>, Comparable<RunTime> {
     private Date startTime;
-    private Integer elapsedDurationInMs;
+    private Long elapsedDurationInMs;
     private String message;
     private Long resultRowCount;
     private String explainPlan;
+    private boolean timedOut;
 
     @SuppressWarnings("unused") public RunTime() {
     }
 
-    @SuppressWarnings("unused") public RunTime(Integer elapsedDurationInMs) {
+    @SuppressWarnings("unused") public RunTime(Long elapsedDurationInMs) {
         this(null, elapsedDurationInMs);
     }
 
-    public RunTime(Long resultRowCount, Integer elapsedDurationInMs) {
+    public RunTime(Long resultRowCount, Long elapsedDurationInMs) {
         this(null, resultRowCount, elapsedDurationInMs);
     }
 
-    public RunTime(Date startTime, Long resultRowCount, Integer elapsedDurationInMs) {
-        this(null, null, startTime, resultRowCount, elapsedDurationInMs);
+    public RunTime(Date startTime, Long resultRowCount, Long elapsedDurationInMs) {
+        this(null, null, startTime, resultRowCount, elapsedDurationInMs, false);
+    }
+
+    public RunTime(Date startTime, Long elapsedDurationInMs, boolean timedOut) {
+        this(null, startTime, null, elapsedDurationInMs, timedOut);
     }
 
     public RunTime(String message, Date startTime, Long resultRowCount,
-            Integer elapsedDurationInMs) {
-        this(message, null, startTime, resultRowCount, elapsedDurationInMs);
+            Long elapsedDurationInMs, boolean timedOut) {
+        this(message, null, startTime, resultRowCount, elapsedDurationInMs, timedOut);
     }
 
     public RunTime(String message, String explainPlan, Date startTime, Long resultRowCount,
-            Integer elapsedDurationInMs) {
+            Long elapsedDurationInMs, boolean timedOut) {
         this.elapsedDurationInMs = elapsedDurationInMs;
         this.startTime = startTime;
         this.resultRowCount = resultRowCount;
         this.message = message;
         this.explainPlan = explainPlan;
+        this.timedOut = timedOut;
     }
 
     @XmlAttribute() public Date getStartTime() {
@@ -66,11 +72,11 @@ public class RunTime implements Comparator<RunTime>, Comparable<RunTime> {
         this.startTime = startTime;
     }
 
-    @XmlAttribute() public Integer getElapsedDurationInMs() {
+    @XmlAttribute() public Long getElapsedDurationInMs() {
         return elapsedDurationInMs;
     }
 
-    @SuppressWarnings("unused") public void setElapsedDurationInMs(Integer elapsedDurationInMs) {
+    @SuppressWarnings("unused") public void setElapsedDurationInMs(Long elapsedDurationInMs) {
         this.elapsedDurationInMs = elapsedDurationInMs;
     }
 
@@ -105,4 +111,12 @@ public class RunTime implements Comparator<RunTime>, Comparable<RunTime> {
     @SuppressWarnings("unused") public void setResultRowCount(Long resultRowCount) {
         this.resultRowCount = resultRowCount;
     }
+
+    @SuppressWarnings("unused") public void setTimedOut(boolean timedOut) {
+        this.timedOut = timedOut;
+    }
+
+    @XmlAttribute() public boolean getTimedOut() {
+        return this.timedOut;
+    }
 }
\ No newline at end of file
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
index 03b5664..e1e7652 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
@@ -69,13 +69,13 @@ public class ThreadTime {
     public Integer getAvgTimeInMs() {
         if (getRunTimesInMs().isEmpty()) return null;
 
-        Integer totalTimeInMs = new Integer(0);
+        Long totalTimeInMs = new Long(0);
         for (RunTime runTime : getRunTimesInMs()) {
             if (null != runTime.getElapsedDurationInMs()) {
                 totalTimeInMs += runTime.getElapsedDurationInMs();
             }
         }
-        return totalTimeInMs / getRunTimesInMs().size();
+        return (int) (totalTimeInMs / getRunTimesInMs().size());
     }
 
     public RunTime getMaxTimeInMs() {
@@ -117,6 +117,7 @@ public class ThreadTime {
                 rowValues.add(new ResultValue(
                         util.convertNull(getRunTimesInMs().get(i).getMessage())));
             }
+            rowValues.add(new ResultValue(getRunTimesInMs().get(i).getTimedOut()));
             rows.add(rowValues);
         }
         return rows;
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
index 7d09f68..c888199 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
@@ -24,8 +24,8 @@ public enum Header {
             "START_TIME,QUERY_GROUP,QUERY,TENANT_ID,AVG_MAX_TIME_MS,AVG_TIME_MS,AVG_MIN_TIME_MS,RUN_COUNT,EXPLAIN_PLAN,RESULT_ROW_COUNT"),
     DETAILED_BASE(
             "BASE_TABLE_NAME,SCENARIO_NAME,ZOOKEEPER,ROW_COUNT,EXECUTION_COUNT,EXECUTION_TYPE,PHOENIX_PROPERTIES"
-                    + ",START_TIME,QUERY_GROUP,QUERY,TENANT_ID,THREAD_NUMBER,CONCURRENCY_LEVEL"),
-    DETAILED_PERFORMANCE(DETAILED_BASE + ",RESULT_ROW_COUNT,RUN_TIME_MS"),
+                    + ",START_TIME,QUERY_GROUP,QUERY_ID,QUERY,TENANT_ID,THREAD_NUMBER,CONCURRENCY_LEVEL"),
+    DETAILED_PERFORMANCE(DETAILED_BASE + ",RESULT_ROW_COUNT,RUN_TIME_MS,TIMED_OUT"),
     DETAILED_FUNCTIONAL(DETAILED_BASE + ",DIFF_STATUS,EXPLAIN_PLAN"),
     AGGREGATE_DATA_LOAD("ZK,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"),
     THIN_AGGREGATE_DATA_LOAD("QUERYSERVER,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"),
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
index 9fcc38e..41e6045 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
@@ -25,6 +25,8 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.concurrent.Callable;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.pherf.result.DataModelResult;
 import org.apache.phoenix.pherf.result.ResultManager;
 import org.apache.phoenix.pherf.result.RunTime;
@@ -88,13 +90,21 @@ class MultiThreadedRunner implements Callable<Void> {
      */
     @Override
     public Void call() throws Exception {
-        LOGGER.info("\n\nThread Starting " + threadName + " ; " + query.getStatement() + " for "
-                + numberOfExecutions + "times\n\n");
-        Long start = EnvironmentEdgeManager.currentTimeMillis();
-        for (long i = numberOfExecutions; (i > 0 && ((EnvironmentEdgeManager.currentTimeMillis() - start)
-                < executionDurationInMs)); i--) {
+        LOGGER.info("\n\nThread Starting " + threadName + " ; '" + query.getStatement() + "' for "
+                + numberOfExecutions + " times\n\n");
+        long threadStartTime = EnvironmentEdgeManager.currentTimeMillis();
+        for (long i = 0; i < numberOfExecutions; i++) {
+            long threadElapsedTime = EnvironmentEdgeManager.currentTimeMillis() - threadStartTime;
+            if (threadElapsedTime >= executionDurationInMs) {
+                LOGGER.info("Queryset timeout of " + executionDurationInMs + " ms reached; current time is " + threadElapsedTime + " ms."
+                        + "\nStopping queryset execution for query " + query.getId() + " on thread " + threadName + "...");
+                break;
+            }
+
             synchronized (workloadExecutor) {
-                timedQuery();
+                if (!timedQuery(i+1)) {
+                    break;
+                }
                 if ((EnvironmentEdgeManager.currentTimeMillis() - lastResultWritten) > 1000) {
                     resultManager.write(dataModelResult, ruleApplier);
                     lastResultWritten = EnvironmentEdgeManager.currentTimeMillis();
@@ -119,8 +129,9 @@ class MultiThreadedRunner implements Callable<Void> {
      * Timed query execution
      *
      * @throws Exception
+     * @returns boolean true if query finished without timing out; false otherwise
      */
-    private void timedQuery() throws Exception {
+    private boolean timedQuery(long iterationNumber) throws Exception {
         boolean
                 isSelectCountStatement =
                 query.getStatement().toUpperCase().trim().contains("COUNT(") ? true : false;
@@ -128,17 +139,19 @@ class MultiThreadedRunner implements Callable<Void> {
         Connection conn = null;
         PreparedStatement statement = null;
         ResultSet rs = null;
-        Long start = EnvironmentEdgeManager.currentTimeMillis();
+        Long queryStartTime = EnvironmentEdgeManager.currentTimeMillis();
         Date startDate = Calendar.getInstance().getTime();
         String exception = null;
-        long resultRowCount = 0;
+        Long resultRowCount = 0L;
+        String queryIteration = threadName + ":" + iterationNumber;
+        Long queryElapsedTime = 0L;
 
         try {
             conn = pUtil.getConnection(query.getTenantId(), scenario.getPhoenixProperties());
             conn.setAutoCommit(true);
             final String statementString = query.getDynamicStatement(ruleApplier, scenario);
             statement = conn.prepareStatement(statementString);
-            LOGGER.info("Executing: " + statementString);
+            LOGGER.info("Executing iteration: " + queryIteration + ": " + statementString);
             
             if (scenario.getWriteParams() != null) {
             	Workload writes = new WriteWorkload(PhoenixUtil.create(), parser, scenario, GeneratePhoenixStats.NO);
@@ -148,34 +161,54 @@ class MultiThreadedRunner implements Callable<Void> {
             boolean isQuery = statement.execute();
             if (isQuery) {
                 rs = statement.getResultSet();
-                while (rs.next()) {
-                    if (null != query.getExpectedAggregateRowCount()) {
-                        if (rs.getLong(1) != query.getExpectedAggregateRowCount())
-                            throw new RuntimeException(
-                                    "Aggregate count " + rs.getLong(1) + " does not match expected "
-                                            + query.getExpectedAggregateRowCount());
-                    }
-
-                    if (isSelectCountStatement) {
-                        resultRowCount = rs.getLong(1);
-                    } else {
-                        resultRowCount++;
-                    }
-                }
+                Pair<Long, Long> r = getResults(rs, queryIteration, isSelectCountStatement, queryStartTime);
+                resultRowCount = r.getFirst();
+                queryElapsedTime = r.getSecond();
             } else {
                 conn.commit();
             }
         } catch (Exception e) {
-            LOGGER.error("Exception while executing query", e);
+            LOGGER.error("Exception while executing query iteration " + queryIteration, e);
             exception = e.getMessage();
             throw e;
         } finally {
             getThreadTime().getRunTimesInMs().add(new RunTime(exception, startDate, resultRowCount,
-                    (int) (EnvironmentEdgeManager.currentTimeMillis() - start)));
+                    queryElapsedTime, queryElapsedTime > query.getTimeoutDuration()));
 
             if (rs != null) rs.close();
             if (statement != null) statement.close();
             if (conn != null) conn.close();
         }
+        return true;
     }
+
+    @VisibleForTesting
+    /**
+     * @return a Pair whose first value is the resultRowCount, and whose second value is whether the query timed out.
+     */
+    Pair<Long, Long> getResults(ResultSet rs, String queryIteration, boolean isSelectCountStatement, Long queryStartTime) throws Exception {
+        Long resultRowCount = 0L;
+        while (rs.next()) {
+            if (null != query.getExpectedAggregateRowCount()) {
+                if (rs.getLong(1) != query.getExpectedAggregateRowCount())
+                    throw new RuntimeException(
+                            "Aggregate count " + rs.getLong(1) + " does not match expected "
+                                    + query.getExpectedAggregateRowCount());
+            }
+
+            if (isSelectCountStatement) {
+                resultRowCount = rs.getLong(1);
+            } else {
+                resultRowCount++;
+            }
+            long queryElapsedTime = EnvironmentEdgeManager.currentTimeMillis() - queryStartTime;
+            if (queryElapsedTime >= query.getTimeoutDuration()) {
+                LOGGER.error("Query " + queryIteration + " exceeded timeout of "
+                        +  query.getTimeoutDuration() + " ms at " + queryElapsedTime + " ms.");
+                return new Pair(resultRowCount, queryElapsedTime);
+            }
+        }
+        return new Pair(resultRowCount, EnvironmentEdgeManager.currentTimeMillis() - queryStartTime);
+    }
+
 }
\ No newline at end of file
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
index e3480dd..6f43ee4 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
@@ -22,6 +22,7 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.concurrent.Callable;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.pherf.PherfConstants;
 import org.apache.phoenix.pherf.configuration.Query;
 import org.apache.phoenix.pherf.result.RunTime;
@@ -56,10 +57,11 @@ class MultithreadedDiffer implements Callable<Void> {
         String newCSV = queryVerifier.exportCSV(query);
         boolean verifyResult = queryVerifier.doDiff(query, newCSV);
         String explainPlan = pUtil.getExplainPlan(query);
+        long elapsedTime = EnvironmentEdgeManager.currentTimeMillis() - start;
         getThreadTime().getRunTimesInMs().add(new RunTime(
                         verifyResult == true ? PherfConstants.DIFF_PASS : PherfConstants.DIFF_FAIL,
                         explainPlan, startDate, -1L,
-            (int) (EnvironmentEdgeManager.currentTimeMillis() - start)));
+            elapsedTime, !(elapsedTime >= executionDurationInMs)));
     }
 
     /**
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
index 28e8c62..9cad1f1 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
@@ -137,8 +137,8 @@ public class ResultTest extends ResultBaseTest {
         ThreadTime ttFromFile = queryResultFromFile.getThreadTimes().get(0);
 
         // thread level verification
-        assertEquals(10, (int) ttFromFile.getMinTimeInMs().getElapsedDurationInMs());
-        assertEquals(30, (int) ttFromFile.getMaxTimeInMs().getElapsedDurationInMs());
+        assertEquals(new Long(10), ttFromFile.getMinTimeInMs().getElapsedDurationInMs());
+        assertEquals(new Long(30), ttFromFile.getMaxTimeInMs().getElapsedDurationInMs());
         assertEquals(20, (int) ttFromFile.getAvgTimeInMs());
 
         // 3rd runtime has the earliest start time, therefore that's what's expected.
@@ -190,13 +190,13 @@ public class ResultTest extends ResultBaseTest {
         tt.setThreadName("thread1");
         Calendar calendar = Calendar.getInstance();
         Date startTime1 = calendar.getTime();
-        RunTime runtime1 = new RunTime(startTime1, 1000L, 10);
+        RunTime runtime1 = new RunTime(startTime1, 1000L, new Long(10));
         tt.getRunTimesInMs().add(runtime1);
         calendar.add(Calendar.MINUTE, -1);
-        RunTime runtime2 = new RunTime(calendar.getTime(), 2000L, 20);
+        RunTime runtime2 = new RunTime(calendar.getTime(), 2000L, new Long(20));
         tt.getRunTimesInMs().add(runtime2);
         calendar.add(Calendar.MINUTE, -1);
-        RunTime runtime3 = new RunTime(calendar.getTime(), 3000L, 30);
+        RunTime runtime3 = new RunTime(calendar.getTime(), 3000L, new Long(30));
         tt.getRunTimesInMs().add(runtime3);
         queryResult.getThreadTimes().add(tt);
         queryResult2.getThreadTimes().add(tt);
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/MultiThreadedRunnerTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/MultiThreadedRunnerTest.java
new file mode 100644
index 0000000..d9c7ca3
--- /dev/null
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/MultiThreadedRunnerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload;
+
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.pherf.configuration.Query;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.configuration.XMLConfigParser;
+import org.apache.phoenix.pherf.result.DataModelResult;
+import org.apache.phoenix.pherf.result.ThreadTime;
+import org.apache.phoenix.pherf.rules.RulesApplier;
+import org.apache.phoenix.util.DefaultEnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.sql.ResultSet;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class MultiThreadedRunnerTest {
+    @Mock
+    private static XMLConfigParser mockParser;
+    @Mock
+    private static DataModelResult mockDMR;
+    @Mock
+    private static RulesApplier mockRA;
+    @Mock
+    private static ThreadTime mockTT;
+    @Mock
+    private static Scenario mockScenario;
+    @Mock
+    private static WorkloadExecutor mockWE;
+    @Mock
+    private static Query mockQuery;
+    @Mock
+    private static ResultSet mockRS;
+
+    @Before
+    public void init() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test
+    public void testExpectedRowsMismatch() throws Exception {
+        Mockito.when(mockQuery.getExpectedAggregateRowCount()).thenReturn(1L);
+        MultiThreadedRunner mtr = new MultiThreadedRunner("test",
+                mockQuery, mockDMR, mockTT,
+                10L, 1000L,
+                true, mockRA,
+                mockScenario, mockWE, mockParser);
+        Mockito.when(mockRS.next()).thenReturn(true);
+        Mockito.when(mockRS.getLong(1)).thenReturn(2L);
+        try {
+            mtr.getResults(mockRS, "test_iteration", false,0L);
+            fail();
+        } catch (RuntimeException e) {
+            //pass;
+        }
+
+    }
+
+    @Test
+    public void testTimeout() throws Exception {
+        Mockito.when(mockQuery.getTimeoutDuration()).thenReturn(1000L);
+        Mockito.when(mockQuery.getExpectedAggregateRowCount()).thenReturn(1L);
+        MultiThreadedRunner mtr = new MultiThreadedRunner("test",
+                mockQuery, mockDMR, mockTT,
+                10L, 1000L,
+                true, mockRA,
+                mockScenario, mockWE, mockParser);
+        DefaultEnvironmentEdge myClock = Mockito.mock(DefaultEnvironmentEdge.class);
+        Mockito.when(myClock.currentTime()).thenReturn(0L, 5000L);
+        EnvironmentEdgeManager.injectEdge(myClock);
+        Mockito.when(mockRS.next()).thenReturn(true);
+        Mockito.when(mockRS.getLong(1)).thenReturn(1L);
+        Pair<Long, Long> results = mtr.getResults(mockRS, "test_iteration", false,0L);
+        assertTrue(results.getSecond()>mockQuery.getTimeoutDuration());
+    }
+
+    @Test
+    public void testFinishWithoutTimeout() throws Exception {
+        DefaultEnvironmentEdge myClock = Mockito.mock(DefaultEnvironmentEdge.class);
+        Mockito.when(myClock.currentTime()).thenReturn(0L);
+        EnvironmentEdgeManager.injectEdge(myClock);
+        Mockito.when(mockQuery.getTimeoutDuration()).thenReturn(1000L);
+        Mockito.when(mockQuery.getExpectedAggregateRowCount()).thenReturn(1L);
+        MultiThreadedRunner mtr = new MultiThreadedRunner("test",
+                mockQuery, mockDMR, mockTT,
+                10L, 1000L,
+                true, mockRA,
+                mockScenario, mockWE, mockParser);
+        Mockito.when(mockRS.next()).thenReturn(true, false);
+        Mockito.when(mockRS.getLong(1)).thenReturn(1L);
+        Pair<Long, Long> results = mtr.getResults(mockRS, "test_iteration", false, 0L);
+        assertFalse(results.getSecond() > mockQuery.getTimeoutDuration());
+    }
+
+}
diff --git a/phoenix-pherf/src/test/resources/datamodel/timeout_test_schema.sql b/phoenix-pherf/src/test/resources/datamodel/timeout_test_schema.sql
new file mode 100644
index 0000000..e753ddf
--- /dev/null
+++ b/phoenix-pherf/src/test/resources/datamodel/timeout_test_schema.sql
@@ -0,0 +1,22 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+ CREATE TABLE IF NOT EXISTS PHERF.USER_DEFINED_TEST (
+    TENANT_ID VARCHAR NOT NULL PRIMARY KEY,
+    CREATED_DATE DATE,
+    VAL_STRING VARCHAR
+)
\ No newline at end of file
diff --git a/phoenix-pherf/src/test/resources/scenario/timeout_test_scenario.xml b/phoenix-pherf/src/test/resources/scenario/timeout_test_scenario.xml
new file mode 100644
index 0000000..0fe2463
--- /dev/null
+++ b/phoenix-pherf/src/test/resources/scenario/timeout_test_scenario.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~   or more contributor license agreements.  See the NOTICE file
+  ~   distributed with this work for additional information
+  ~   regarding copyright ownership.  The ASF licenses this file
+  ~   to you under the Apache License, Version 2.0 (the
+  ~   "License"); you may not use this file except in compliance
+  ~   with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~   Unless required by applicable law or agreed to in writing, software
+  ~   distributed under the License is distributed on an "AS IS" BASIS,
+  ~   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~   See the License for the specific language governing permissions and
+  ~   limitations under the License.
+  -->
+
+<datamodel release="192" name="test_scenario">
+    <datamapping>
+        <column>
+            <!-- This column type defines what will generally happen to VARCHAR fields unless they are explicitly defined or overridden elsewhere -->
+            <type>VARCHAR</type>
+            <dataSequence>SEQUENTIAL</dataSequence>
+            <length>15</length>
+            <name>GENERAL_VARCHAR</name>
+        </column>
+        <column>
+            <type>CHAR</type>
+            <dataSequence>SEQUENTIAL</dataSequence>
+            <length>15</length>
+            <name>GENERAL_CHAR</name>
+        </column>
+        <column>
+            <type>DATE</type>
+            <!--SEQUENTIAL is unsupported for DATE -->
+            <dataSequence>RANDOM</dataSequence>
+            <!-- Number [0-100] that represents the probability of creating a null value -->
+            <!-- The higher the number, the more like the value will returned will be null -->
+            <!-- Leaving this tag out is equivalent to having a 0 probability. i.e. never null -->
+            <nullChance>0</nullChance>
+            <minValue>1975</minValue>
+            <maxValue>2025</maxValue>
+            <name>GENERAL_DATE</name>
+        </column>
+        <column>
+            <type>DECIMAL</type>
+            <dataSequence>RANDOM</dataSequence>
+            <minValue>0</minValue>
+            <maxValue>1</maxValue>
+
+            <!-- Precision is limited to 18 -->
+            <precision>18</precision>
+            <!-- Number [0-100] that represents the probability of creating a null value -->
+            <!-- The higher the number, the more like the value will returned will be null -->
+            <!-- Leaving this tag out is equivalent to having a 0 probability. i.e. never null -->
+            <nullChance>10</nullChance>
+            <name>GENERAL_DECIMAL</name>
+        </column>
+        <column>
+            <type>INTEGER</type>
+            <dataSequence>RANDOM</dataSequence>
+            <minValue>1</minValue>
+            <maxValue>50000000</maxValue>
+            <!-- Number [0-100] that represents the probability of creating a null value -->
+            <!-- The higher the number, the more like the value will returned will be null -->
+            <!-- Leaving this tag out is equivalent to having a 0 probability. i.e. never null -->
+            <name>GENERAL_INTEGER</name>
+        </column>
+        <column>
+            <type>DATE</type>
+            <name>CREATED_DATE</name>
+            <minValue>1975</minValue>
+            <maxValue>2025</maxValue>
+            <valuelist>
+                <!-- Distributes randomly with equal chance of being picked -->
+                <datavalue distribution="80">
+                    <!-- Joda time format: yyyy-MM-dd HH:mm:ss.SSS ZZZ -->
+                    <minValue>2019-09-15 00:01:00.000</minValue>
+                    <maxValue>2019-09-15 11:00:00.000</maxValue>
+                </datavalue>
+                <datavalue distribution="10">
+                    <value>2019-09-19 00:01:00.000</value>
+                </datavalue>
+                <datavalue distribution="10">
+                    <minValue>2019-09-22 00:01:00.000</minValue>
+                    <maxValue>2019-09-22 00:01:00.300</maxValue>
+                </datavalue>
+            </valuelist>
+        </column>
+        <column>
+            <type>CHAR</type>
+            <userDefined>true</userDefined>
+            <dataSequence>LIST</dataSequence>
+            <length>15</length>
+            <name>VAL_STRING</name>
+            <valuelist>
+                <!-- Distributes randomly with equal chance of being picked -->
+                <datavalue distribution="50">
+                    <value>KjhoOmnNbBs9kWs</value>
+                </datavalue>
+                <datavalue distribution="50">
+                    <value>VAL123</value>
+                </datavalue>
+            </valuelist>
+        </column>
+    </datamapping>
+    <scenarios>
+        <scenario tableName="PHERF.USER_DEFINED_TEST" rowCount="100000" name="myscenario">
+            <!-- Scenario level rule overrides will be unsupported in V1.
+                    You can use the general datamappings in the mean time-->
+            <dataOverride>
+                <column>
+                    <type>VARCHAR</type>
+                    <userDefined>true</userDefined>
+                    <dataSequence>RANDOM</dataSequence>
+                    <length>10</length>
+                    <name>DO_NOT_USE</name>
+                </column>
+            </dataOverride>
+            <!--Note: 1. Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first
+                      2. DDL included in query are executed only once on start of querySet execution.
+            -->
+            <querySet concurrency="1-3" executionType="SERIAL" executionDurationInMs="5000" numberOfExecutions="10">
+                <!-- queryGroup is a way to organize queries across tables or scenario files.
+                    The value will be dumped to results. This gives a value to group by on reporting to compare queries -->
+                <query id="q1" expectedAggregateRowCount="100000" timeoutDuration="0"
+                       statement="SELECT COUNT(*) FROM PHERF.USER_DEFINED_TEST"/>
+                <query id="q2" expectedAggregateRowCount="100000" timeoutDuration="9223372036854775807"
+                       statement="SELECT COUNT(*) FROM PHERF.USER_DEFINED_TEST"/>
+                <query id="q3" expectedAggregateRowCount="100000"
+                       statement="SELECT COUNT(*) FROM PHERF.USER_DEFINED_TEST"/>
+
+            </querySet>
+        </scenario>
+    </scenarios>
+</datamodel>