You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by co...@apache.org on 2015/06/26 19:15:54 UTC
[2/3] phoenix git commit: PHOENIX-1920 - Pherf - Add support for
mixed r/w workloads - port form paster
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f76b92fd/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
index fd960d1..07dfa86 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
@@ -22,15 +22,16 @@ import org.apache.phoenix.pherf.PherfConstants;
import org.apache.phoenix.pherf.PherfConstants.RunMode;
import org.apache.phoenix.pherf.result.file.ResultFileDetails;
import org.apache.phoenix.pherf.result.impl.CSVResultHandler;
-import org.apache.phoenix.pherf.result.impl.ImageResultHandler;
-import org.apache.phoenix.pherf.result.impl.XMLResultHandler;
import org.apache.phoenix.pherf.util.PhoenixUtil;
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
import java.text.Format;
import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
+import java.util.Map;
public class ResultUtil {
@@ -54,7 +55,10 @@ public class ResultUtil {
List<ResultValue> rowValues = new ArrayList<>();
rowValues.add(new ResultValue(PhoenixUtil.getZookeeper()));
rowValues.addAll(writeThreadTime.getCsvRepresentation(this));
- Result result = new Result(ResultFileDetails.CSV_DETAILED_PERFORMANCE, "ZK," + dataLoadThreadTime.getCsvTitle(), rowValues);
+ Result
+ result =
+ new Result(ResultFileDetails.CSV_DETAILED_PERFORMANCE,
+ "ZK," + dataLoadThreadTime.getCsvTitle(), rowValues);
writer.write(result);
}
}
@@ -83,7 +87,10 @@ public class ResultUtil {
List<ResultValue> rowValues = new ArrayList<>();
rowValues.add(new ResultValue(PhoenixUtil.getZookeeper()));
rowValues.addAll(loadTime.getCsvRepresentation(this));
- Result result = new Result(resultFileDetails, resultFileDetails.getHeader().toString(), rowValues);
+ Result
+ result =
+ new Result(resultFileDetails, resultFileDetails.getHeader().toString(),
+ rowValues);
writer.write(result);
}
} finally {
@@ -94,23 +101,29 @@ public class ResultUtil {
}
}
- public synchronized void write(ResultHandler resultHandler, DataModelResult dataModelResult, RunMode runMode) throws Exception {
+ public synchronized void write(ResultHandler resultHandler, DataModelResult dataModelResult,
+ RunMode runMode) throws Exception {
ResultFileDetails resultFileDetails = resultHandler.getResultFileDetails();
switch (resultFileDetails) {
- case CSV_AGGREGATE_PERFORMANCE:
- case CSV_DETAILED_PERFORMANCE:
- case CSV_DETAILED_FUNCTIONAL:
- List<List<ResultValue>> rowDetails = getCSVResults(dataModelResult, resultFileDetails, runMode);
- for (List<ResultValue> row : rowDetails) {
- Result result = new Result(resultFileDetails, resultFileDetails.getHeader().toString(), row);
- resultHandler.write(result);
- }
- break;
- default:
- List<ResultValue> resultValue = new ArrayList();
- resultValue.add(new ResultValue<>(dataModelResult));
- resultHandler.write(new Result(resultFileDetails, null, resultValue));
- break;
+ case CSV_AGGREGATE_PERFORMANCE:
+ case CSV_DETAILED_PERFORMANCE:
+ case CSV_DETAILED_FUNCTIONAL:
+ List<List<ResultValue>>
+ rowDetails =
+ getCSVResults(dataModelResult, resultFileDetails, runMode);
+ for (List<ResultValue> row : rowDetails) {
+ Result
+ result =
+ new Result(resultFileDetails, resultFileDetails.getHeader().toString(),
+ row);
+ resultHandler.write(result);
+ }
+ break;
+ default:
+ List<ResultValue> resultValue = new ArrayList();
+ resultValue.add(new ResultValue<>(dataModelResult));
+ resultHandler.write(new Result(resultFileDetails, null, resultValue));
+ break;
}
}
@@ -146,40 +159,47 @@ public class ResultUtil {
return str;
}
- private List<List<ResultValue>> getCSVResults(DataModelResult dataModelResult, ResultFileDetails resultFileDetails, RunMode runMode) {
+ private List<List<ResultValue>> getCSVResults(DataModelResult dataModelResult,
+ ResultFileDetails resultFileDetails, RunMode runMode) {
List<List<ResultValue>> rowList = new ArrayList<>();
for (ScenarioResult result : dataModelResult.getScenarioResult()) {
for (QuerySetResult querySetResult : result.getQuerySetResult()) {
for (QueryResult queryResult : querySetResult.getQueryResults()) {
switch (resultFileDetails) {
- case CSV_AGGREGATE_PERFORMANCE:
- List<ResultValue> csvResult = queryResult.getCsvRepresentation(this);
- rowList.add(csvResult);
- break;
- case CSV_DETAILED_PERFORMANCE:
- case CSV_DETAILED_FUNCTIONAL:
- List<List<ResultValue>> detailedRows = queryResult.getCsvDetailedRepresentation(this, runMode);
- for (List<ResultValue> detailedRowList : detailedRows) {
- List<ResultValue> valueList = new ArrayList<>();
- valueList.add(new ResultValue(convertNull(result.getTableName())));
- valueList.add(new ResultValue(convertNull(result.getName())));
- valueList.add(new ResultValue(convertNull(dataModelResult.getZookeeper())));
- valueList.add(new ResultValue(convertNull(String.valueOf(result.getRowCount()))));
- valueList.add(new ResultValue(convertNull(String.valueOf(querySetResult.getNumberOfExecutions()))));
- valueList.add(new ResultValue(convertNull(String.valueOf(querySetResult.getExecutionType()))));
- if (result.getPhoenixProperties() != null) {
- String props = buildProperty(result);
- valueList.add(new ResultValue(convertNull(props)));
- } else {
- valueList.add(new ResultValue("null"));
- }
- valueList.addAll(detailedRowList);
- rowList.add(valueList);
+ case CSV_AGGREGATE_PERFORMANCE:
+ List<ResultValue> csvResult = queryResult.getCsvRepresentation(this);
+ rowList.add(csvResult);
+ break;
+ case CSV_DETAILED_PERFORMANCE:
+ case CSV_DETAILED_FUNCTIONAL:
+ List<List<ResultValue>>
+ detailedRows =
+ queryResult.getCsvDetailedRepresentation(this, runMode);
+ for (List<ResultValue> detailedRowList : detailedRows) {
+ List<ResultValue> valueList = new ArrayList<>();
+ valueList.add(new ResultValue(convertNull(result.getTableName())));
+ valueList.add(new ResultValue(convertNull(result.getName())));
+ valueList.add(new ResultValue(
+ convertNull(dataModelResult.getZookeeper())));
+ valueList.add(new ResultValue(
+ convertNull(String.valueOf(result.getRowCount()))));
+ valueList.add(new ResultValue(convertNull(
+ String.valueOf(querySetResult.getNumberOfExecutions()))));
+ valueList.add(new ResultValue(convertNull(
+ String.valueOf(querySetResult.getExecutionType()))));
+ if (result.getPhoenixProperties() != null) {
+ String props = buildProperty(result);
+ valueList.add(new ResultValue(convertNull(props)));
+ } else {
+ valueList.add(new ResultValue("null"));
}
- break;
- default:
- break;
+ valueList.addAll(detailedRowList);
+ rowList.add(valueList);
+ }
+ break;
+ default:
+ break;
}
}
}
@@ -192,8 +212,7 @@ public class ResultUtil {
boolean firstPartialSeparator = true;
for (Map.Entry<String, String> entry : result.getPhoenixProperties().entrySet()) {
- if (!firstPartialSeparator)
- sb.append("|");
+ if (!firstPartialSeparator) sb.append("|");
firstPartialSeparator = false;
sb.append(entry.getKey() + "=" + entry.getValue());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f76b92fd/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultValue.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultValue.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultValue.java
index 38abd65..78364d9 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultValue.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultValue.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.pherf.result;
/**
* Generic box container for a result value. This class allows for writing results of any type easily
+ *
* @param <T>
*/
public class ResultValue<T> {
@@ -33,8 +34,7 @@ public class ResultValue<T> {
return resultValue;
}
- @Override
- public String toString() {
+ @Override public String toString() {
return resultValue.toString();
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f76b92fd/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
index 690f7e6..3aa45fa 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
@@ -18,104 +18,91 @@
package org.apache.phoenix.pherf.result;
+import javax.xml.bind.annotation.XmlAttribute;
import java.util.Comparator;
import java.util.Date;
-import javax.xml.bind.annotation.XmlAttribute;
-
public class RunTime implements Comparator<RunTime>, Comparable<RunTime> {
- private Date startTime;
- private Integer elapsedDurationInMs;
- private String message;
- private Long resultRowCount;
- private String explainPlan;
-
- @SuppressWarnings("unused")
- public RunTime() {
- }
-
- @SuppressWarnings("unused")
- public RunTime(Integer elapsedDurationInMs) {
- this(null, elapsedDurationInMs);
- }
-
- public RunTime(Long resultRowCount, Integer elapsedDurationInMs) {
- this(null, resultRowCount, elapsedDurationInMs);
- }
-
- public RunTime(Date startTime, Long resultRowCount, Integer elapsedDurationInMs) {
- this(null, null, startTime, resultRowCount, elapsedDurationInMs);
- }
-
- public RunTime(String message, Date startTime, Long resultRowCount, Integer elapsedDurationInMs) {
- this(message, null, startTime, resultRowCount, elapsedDurationInMs);
- }
-
- public RunTime(String message, String explainPlan, Date startTime, Long resultRowCount, Integer elapsedDurationInMs) {
- this.elapsedDurationInMs = elapsedDurationInMs;
- this.startTime = startTime;
- this.resultRowCount = resultRowCount;
- this.message = message;
- this.explainPlan = explainPlan;
- }
-
- @XmlAttribute()
- public Date getStartTime() {
- return startTime;
- }
-
- @SuppressWarnings("unused")
- public void setStartTime(Date startTime) {
- this.startTime = startTime;
- }
-
- @XmlAttribute()
- public Integer getElapsedDurationInMs() {
- return elapsedDurationInMs;
- }
-
- @SuppressWarnings("unused")
- public void setElapsedDurationInMs(Integer elapsedDurationInMs) {
- this.elapsedDurationInMs = elapsedDurationInMs;
- }
-
- @Override
- public int compare(RunTime r1, RunTime r2) {
- return r1.getElapsedDurationInMs().compareTo(r2.getElapsedDurationInMs());
- }
-
- @Override
- public int compareTo(RunTime o) {
- return compare(this, o);
- }
-
- @XmlAttribute()
- public String getMessage() {
- return message;
- }
-
- @SuppressWarnings("unused")
- public void setMessage(String message) {
- this.message = message;
- }
-
- @XmlAttribute()
- public String getExplainPlan() {
- return explainPlan;
- }
-
- @SuppressWarnings("unused")
- public void setExplainPlan(String explainPlan) {
- this.explainPlan = explainPlan;
- }
-
- @XmlAttribute()
- public Long getResultRowCount() {
- return resultRowCount;
- }
-
- @SuppressWarnings("unused")
- public void setResultRowCount(Long resultRowCount) {
- this.resultRowCount = resultRowCount;
- }
+ private Date startTime;
+ private Integer elapsedDurationInMs;
+ private String message;
+ private Long resultRowCount;
+ private String explainPlan;
+
+ @SuppressWarnings("unused") public RunTime() {
+ }
+
+ @SuppressWarnings("unused") public RunTime(Integer elapsedDurationInMs) {
+ this(null, elapsedDurationInMs);
+ }
+
+ public RunTime(Long resultRowCount, Integer elapsedDurationInMs) {
+ this(null, resultRowCount, elapsedDurationInMs);
+ }
+
+ public RunTime(Date startTime, Long resultRowCount, Integer elapsedDurationInMs) {
+ this(null, null, startTime, resultRowCount, elapsedDurationInMs);
+ }
+
+ public RunTime(String message, Date startTime, Long resultRowCount,
+ Integer elapsedDurationInMs) {
+ this(message, null, startTime, resultRowCount, elapsedDurationInMs);
+ }
+
+ public RunTime(String message, String explainPlan, Date startTime, Long resultRowCount,
+ Integer elapsedDurationInMs) {
+ this.elapsedDurationInMs = elapsedDurationInMs;
+ this.startTime = startTime;
+ this.resultRowCount = resultRowCount;
+ this.message = message;
+ this.explainPlan = explainPlan;
+ }
+
+ @XmlAttribute() public Date getStartTime() {
+ return startTime;
+ }
+
+ @SuppressWarnings("unused") public void setStartTime(Date startTime) {
+ this.startTime = startTime;
+ }
+
+ @XmlAttribute() public Integer getElapsedDurationInMs() {
+ return elapsedDurationInMs;
+ }
+
+ @SuppressWarnings("unused") public void setElapsedDurationInMs(Integer elapsedDurationInMs) {
+ this.elapsedDurationInMs = elapsedDurationInMs;
+ }
+
+ @Override public int compare(RunTime r1, RunTime r2) {
+ return r1.getElapsedDurationInMs().compareTo(r2.getElapsedDurationInMs());
+ }
+
+ @Override public int compareTo(RunTime o) {
+ return compare(this, o);
+ }
+
+ @XmlAttribute() public String getMessage() {
+ return message;
+ }
+
+ @SuppressWarnings("unused") public void setMessage(String message) {
+ this.message = message;
+ }
+
+ @XmlAttribute() public String getExplainPlan() {
+ return explainPlan;
+ }
+
+ @SuppressWarnings("unused") public void setExplainPlan(String explainPlan) {
+ this.explainPlan = explainPlan;
+ }
+
+ @XmlAttribute() public Long getResultRowCount() {
+ return resultRowCount;
+ }
+
+ @SuppressWarnings("unused") public void setResultRowCount(Long resultRowCount) {
+ this.resultRowCount = resultRowCount;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f76b92fd/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ScenarioResult.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ScenarioResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ScenarioResult.java
index b57e424..9cac1c7 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ScenarioResult.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ScenarioResult.java
@@ -18,31 +18,31 @@
package org.apache.phoenix.pherf.result;
+import org.apache.phoenix.pherf.configuration.Scenario;
+
import java.util.ArrayList;
import java.util.List;
-import org.apache.phoenix.pherf.configuration.Scenario;
public class ScenarioResult extends Scenario {
- private List<QuerySetResult> querySetResult = new ArrayList<QuerySetResult>();
-
- public List<QuerySetResult> getQuerySetResult() {
- return querySetResult;
- }
-
- @SuppressWarnings("unused")
- public void setQuerySetResult(List<QuerySetResult> querySetResult) {
- this.querySetResult = querySetResult;
- }
-
- public ScenarioResult() {
- }
-
- public ScenarioResult(Scenario scenario) {
- this.setDataOverride(scenario.getDataOverride());
- this.setPhoenixProperties(scenario.getPhoenixProperties());
- this.setRowCount(scenario.getRowCount());
- this.setTableName(scenario.getTableName());
- this.setName(scenario.getName());
- }
+ private List<QuerySetResult> querySetResult = new ArrayList<>();
+
+ public List<QuerySetResult> getQuerySetResult() {
+ return querySetResult;
+ }
+
+ @SuppressWarnings("unused") public void setQuerySetResult(List<QuerySetResult> querySetResult) {
+ this.querySetResult = querySetResult;
+ }
+
+ public ScenarioResult() {
+ }
+
+ public ScenarioResult(Scenario scenario) {
+ this.setDataOverride(scenario.getDataOverride());
+ this.setPhoenixProperties(scenario.getPhoenixProperties());
+ this.setRowCount(scenario.getRowCount());
+ this.setTableName(scenario.getTableName());
+ this.setName(scenario.getName());
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f76b92fd/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
index f043bec..03b5664 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
@@ -18,13 +18,12 @@
package org.apache.phoenix.pherf.result;
+import javax.xml.bind.annotation.XmlAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
-import javax.xml.bind.annotation.XmlAttribute;
-
public class ThreadTime {
private List<RunTime> runTimesInMs = Collections.synchronizedList(new ArrayList<RunTime>());
private String threadName;
@@ -84,23 +83,22 @@ public class ThreadTime {
return Collections.max(getRunTimesInMs());
}
- @XmlAttribute()
- public String getThreadName() {
+ @XmlAttribute() public String getThreadName() {
return threadName;
}
public void setThreadName(String threadName) {
this.threadName = threadName;
}
-
+
private String parseThreadName(boolean getConcurrency) {
- if (getThreadName() == null || !getThreadName().contains(",")) return null;
- String[] threadNameSet = getThreadName().split(",");
- if (getConcurrency) {
- return threadNameSet[1];}
- else {
- return threadNameSet[0];
- }
+ if (getThreadName() == null || !getThreadName().contains(",")) return null;
+ String[] threadNameSet = getThreadName().split(",");
+ if (getConcurrency) {
+ return threadNameSet[1];
+ } else {
+ return threadNameSet[0];
+ }
}
public List<List<ResultValue>> getCsvPerformanceRepresentation(ResultUtil util) {
@@ -110,11 +108,14 @@ public class ThreadTime {
List<ResultValue> rowValues = new ArrayList(getRunTimesInMs().size());
rowValues.add(new ResultValue(util.convertNull(parseThreadName(false))));
rowValues.add(new ResultValue(util.convertNull(parseThreadName(true))));
- rowValues.add(new ResultValue(String.valueOf(getRunTimesInMs().get(i).getResultRowCount())));
+ rowValues.add(new ResultValue(
+ String.valueOf(getRunTimesInMs().get(i).getResultRowCount())));
if (getRunTimesInMs().get(i).getMessage() == null) {
- rowValues.add(new ResultValue(util.convertNull(String.valueOf(getRunTimesInMs().get(i).getElapsedDurationInMs()))));
+ rowValues.add(new ResultValue(util.convertNull(
+ String.valueOf(getRunTimesInMs().get(i).getElapsedDurationInMs()))));
} else {
- rowValues.add(new ResultValue(util.convertNull(getRunTimesInMs().get(i).getMessage())));
+ rowValues.add(new ResultValue(
+ util.convertNull(getRunTimesInMs().get(i).getMessage())));
}
rows.add(rowValues);
}
@@ -129,7 +130,8 @@ public class ThreadTime {
rowValues.add(new ResultValue(util.convertNull(parseThreadName(false))));
rowValues.add(new ResultValue(util.convertNull(parseThreadName(true))));
rowValues.add(new ResultValue(util.convertNull(getRunTimesInMs().get(i).getMessage())));
- rowValues.add(new ResultValue(util.convertNull(getRunTimesInMs().get(i).getExplainPlan())));
+ rowValues.add(new ResultValue(
+ util.convertNull(getRunTimesInMs().get(i).getExplainPlan())));
rows.add(rowValues);
}
return rows;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f76b92fd/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Extension.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Extension.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Extension.java
index 0df383c..e6a7308 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Extension.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Extension.java
@@ -31,8 +31,7 @@ public enum Extension {
this.extension = extension;
}
- @Override
- public String toString() {
+ @Override public String toString() {
return extension;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f76b92fd/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
index 98e7b30..15e2b9a 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
@@ -20,9 +20,11 @@ package org.apache.phoenix.pherf.result.file;
public enum Header {
EMPTY(""),
- AGGREGATE_PERFORMANCE("START_TIME,QUERY_GROUP,QUERY,TENANT_ID,AVG_MAX_TIME_MS,AVG_TIME_MS,AVG_MIN_TIME_MS,RUN_COUNT"),
- DETAILED_BASE("BASE_TABLE_NAME,SCENARIO_NAME,ZOOKEEPER,ROW_COUNT,EXECUTION_COUNT,EXECUTION_TYPE,PHOENIX_PROPERTIES"
- + ",START_TIME,QUERY_GROUP,QUERY,TENANT_ID,THREAD_NUMBER,CONCURRENCY_LEVEL"),
+ AGGREGATE_PERFORMANCE(
+ "START_TIME,QUERY_GROUP,QUERY,TENANT_ID,AVG_MAX_TIME_MS,AVG_TIME_MS,AVG_MIN_TIME_MS,RUN_COUNT"),
+ DETAILED_BASE(
+ "BASE_TABLE_NAME,SCENARIO_NAME,ZOOKEEPER,ROW_COUNT,EXECUTION_COUNT,EXECUTION_TYPE,PHOENIX_PROPERTIES"
+ + ",START_TIME,QUERY_GROUP,QUERY,TENANT_ID,THREAD_NUMBER,CONCURRENCY_LEVEL"),
DETAILED_PERFORMANCE(DETAILED_BASE + ",RESULT_ROW_COUNT,RUN_TIME_MS"),
DETAILED_FUNCTIONAL(DETAILED_BASE + ",DIFF_STATUS,EXPLAIN_PLAN"),
AGGREGATE_DATA_LOAD("ZK,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"),
@@ -34,8 +36,7 @@ public enum Header {
this.header = header;
}
- @Override
- public String toString() {
+ @Override public String toString() {
return header;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f76b92fd/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/CSVResultHandler.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/CSVResultHandler.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/CSVResultHandler.java
index e7fbb48..e69f600 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/CSVResultHandler.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/CSVResultHandler.java
@@ -18,13 +18,6 @@
package org.apache.phoenix.pherf.result.impl;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVPrinter;
@@ -36,6 +29,13 @@ import org.apache.phoenix.pherf.result.ResultUtil;
import org.apache.phoenix.pherf.result.ResultValue;
import org.apache.phoenix.pherf.result.file.ResultFileDetails;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* TODO Doc this class. Note that each instance that has a non unique file name will overwrite the last
*/
@@ -51,22 +51,22 @@ public class CSVResultHandler implements ResultHandler {
this(resultFileName, resultFileDetails, true);
}
- public CSVResultHandler(String resultFileName, ResultFileDetails resultFileDetails, boolean generateFullFileName) {
+ public CSVResultHandler(String resultFileName, ResultFileDetails resultFileDetails,
+ boolean generateFullFileName) {
this.util = new ResultUtil();
PherfConstants constants = PherfConstants.create();
String resultDir = constants.getProperty("pherf.default.results.dir");
- this.resultFileName = generateFullFileName ?
- resultDir + PherfConstants.PATH_SEPARATOR
- + PherfConstants.RESULT_PREFIX
- + resultFileName + util.getSuffix()
- + resultFileDetails.getExtension().toString()
- : resultFileName;
+ this.resultFileName =
+ generateFullFileName ?
+ resultDir + PherfConstants.PATH_SEPARATOR + PherfConstants.RESULT_PREFIX
+ + resultFileName + util.getSuffix() + resultFileDetails
+ .getExtension().toString() :
+ resultFileName;
this.resultFileDetails = resultFileDetails;
}
- @Override
- public synchronized void write(Result result) throws IOException {
+ @Override public synchronized void write(Result result) throws IOException {
util.ensureBaseResultDirExists();
open(result);
@@ -74,15 +74,13 @@ public class CSVResultHandler implements ResultHandler {
flush();
}
- @Override
- public synchronized void flush() throws IOException {
+ @Override public synchronized void flush() throws IOException {
if (csvPrinter != null) {
csvPrinter.flush();
}
}
- @Override
- public synchronized void close() throws IOException {
+ @Override public synchronized void close() throws IOException {
if (csvPrinter != null) {
csvPrinter.flush();
csvPrinter.close();
@@ -90,8 +88,7 @@ public class CSVResultHandler implements ResultHandler {
}
}
- @Override
- public synchronized List<Result> read() throws IOException {
+ @Override public synchronized List<Result> read() throws IOException {
CSVParser parser = null;
util.ensureBaseResultDirExists();
try {
@@ -131,13 +128,11 @@ public class CSVResultHandler implements ResultHandler {
isClosed = false;
}
- @Override
- public synchronized boolean isClosed() {
+ @Override public synchronized boolean isClosed() {
return isClosed;
}
- @Override
- public ResultFileDetails getResultFileDetails() {
+ @Override public ResultFileDetails getResultFileDetails() {
return resultFileDetails;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f76b92fd/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/ImageResultHandler.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/ImageResultHandler.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/ImageResultHandler.java
index ad3c8fb..5c3eac1 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/ImageResultHandler.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/ImageResultHandler.java
@@ -19,8 +19,8 @@
package org.apache.phoenix.pherf.result.impl;
import org.apache.phoenix.pherf.PherfConstants;
-import org.apache.phoenix.pherf.result.file.ResultFileDetails;
import org.apache.phoenix.pherf.result.*;
+import org.apache.phoenix.pherf.result.file.ResultFileDetails;
import org.jfree.chart.ChartFactory;
import org.jfree.chart.ChartUtilities;
import org.jfree.chart.JFreeChart;
@@ -42,22 +42,22 @@ public class ImageResultHandler implements ResultHandler {
this(resultFileName, resultFileDetails, true);
}
- public ImageResultHandler(String resultFileName, ResultFileDetails resultFileDetails, boolean generateFullFileName) {
+ public ImageResultHandler(String resultFileName, ResultFileDetails resultFileDetails,
+ boolean generateFullFileName) {
ResultUtil util = new ResultUtil();
PherfConstants constants = PherfConstants.create();
String resultDir = constants.getProperty("pherf.default.results.dir");
- this.resultFileName = generateFullFileName ?
- resultDir + PherfConstants.PATH_SEPARATOR
- + PherfConstants.RESULT_PREFIX
- + resultFileName + util.getSuffix()
- + resultFileDetails.getExtension().toString()
- : resultFileName;
+ this.resultFileName =
+ generateFullFileName ?
+ resultDir + PherfConstants.PATH_SEPARATOR + PherfConstants.RESULT_PREFIX
+ + resultFileName + util.getSuffix() + resultFileDetails
+ .getExtension().toString() :
+ resultFileName;
this.resultFileDetails = resultFileDetails;
}
- @Override
- public synchronized void write(Result result) throws Exception {
+ @Override public synchronized void write(Result result) throws Exception {
TimeSeriesCollection timeSeriesCollection = new TimeSeriesCollection();
int rowCount = 0;
int maxLegendCount = 20;
@@ -70,12 +70,16 @@ public class ImageResultHandler implements ResultHandler {
for (QuerySetResult querySetResult : scenarioResult.getQuerySetResult()) {
for (QueryResult queryResult : querySetResult.getQueryResults()) {
for (ThreadTime tt : queryResult.getThreadTimes()) {
- TimeSeries timeSeries = new TimeSeries(queryResult.getStatement() + " :: " + tt.getThreadName());
+ TimeSeries
+ timeSeries =
+ new TimeSeries(
+ queryResult.getStatement() + " :: " + tt.getThreadName());
rowCount++;
synchronized (tt.getRunTimesInMs()) {
for (RunTime rt : tt.getRunTimesInMs()) {
if (rt.getStartTime() != null) {
- timeSeries.add(new Millisecond(rt.getStartTime()), rt.getElapsedDurationInMs());
+ timeSeries.add(new Millisecond(rt.getStartTime()),
+ rt.getElapsedDurationInMs());
}
}
}
@@ -85,10 +89,14 @@ public class ImageResultHandler implements ResultHandler {
}
}
boolean legend = rowCount > maxLegendCount ? false : true;
- JFreeChart chart = ChartFactory.createTimeSeriesChart(dataModelResult.getName()
- , "Time", "Query Time (ms)", timeSeriesCollection,
- legend, true, false);
- StandardXYItemRenderer renderer = new StandardXYItemRenderer(StandardXYItemRenderer.SHAPES_AND_LINES);
+ JFreeChart
+ chart =
+ ChartFactory
+ .createTimeSeriesChart(dataModelResult.getName(), "Time", "Query Time (ms)",
+ timeSeriesCollection, legend, true, false);
+ StandardXYItemRenderer
+ renderer =
+ new StandardXYItemRenderer(StandardXYItemRenderer.SHAPES_AND_LINES);
chart.getXYPlot().setRenderer(renderer);
chart.getXYPlot().setBackgroundPaint(Color.WHITE);
chart.getXYPlot().setRangeGridlinePaint(Color.BLACK);
@@ -96,35 +104,31 @@ public class ImageResultHandler implements ResultHandler {
chart.getXYPlot().getRenderer().setSeriesStroke(i, new BasicStroke(3f));
}
try {
- ChartUtilities.saveChartAsJPEG(new File(resultFileName), chart, chartDimension, chartDimension);
+ ChartUtilities.saveChartAsJPEG(new File(resultFileName), chart, chartDimension,
+ chartDimension);
} catch (IOException e) {
e.printStackTrace();
}
}
- @Override
- public synchronized void flush() throws Exception {
+ @Override public synchronized void flush() throws Exception {
}
- @Override
- public synchronized void close() throws Exception {
+ @Override public synchronized void close() throws Exception {
}
- @Override
- public List<Result> read() throws Exception {
+ @Override public List<Result> read() throws Exception {
return null;
}
- @Override
- public boolean isClosed() {
+ @Override public boolean isClosed() {
return false;
}
- @Override
- public ResultFileDetails getResultFileDetails() {
+ @Override public ResultFileDetails getResultFileDetails() {
return resultFileDetails;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f76b92fd/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/XMLResultHandler.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/XMLResultHandler.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/XMLResultHandler.java
index 8a913ed..009ae21 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/XMLResultHandler.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/XMLResultHandler.java
@@ -19,8 +19,8 @@
package org.apache.phoenix.pherf.result.impl;
import org.apache.phoenix.pherf.PherfConstants;
-import org.apache.phoenix.pherf.result.file.ResultFileDetails;
import org.apache.phoenix.pherf.result.*;
+import org.apache.phoenix.pherf.result.file.ResultFileDetails;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Marshaller;
@@ -30,7 +30,6 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Properties;
public class XMLResultHandler implements ResultHandler {
private final String resultFileName;
@@ -40,22 +39,22 @@ public class XMLResultHandler implements ResultHandler {
this(resultFileName, resultFileDetails, true);
}
- public XMLResultHandler(String resultFileName, ResultFileDetails resultFileDetails, boolean generateFullFileName) {
+ public XMLResultHandler(String resultFileName, ResultFileDetails resultFileDetails,
+ boolean generateFullFileName) {
ResultUtil util = new ResultUtil();
PherfConstants constants = PherfConstants.create();
String resultDir = constants.getProperty("pherf.default.results.dir");
- this.resultFileName = generateFullFileName ?
- resultDir + PherfConstants.PATH_SEPARATOR
- + PherfConstants.RESULT_PREFIX
- + resultFileName + util.getSuffix()
- + resultFileDetails.getExtension().toString()
- : resultFileName;
+ this.resultFileName =
+ generateFullFileName ?
+ resultDir + PherfConstants.PATH_SEPARATOR + PherfConstants.RESULT_PREFIX
+ + resultFileName + util.getSuffix() + resultFileDetails
+ .getExtension().toString() :
+ resultFileName;
this.resultFileDetails = resultFileDetails;
}
- @Override
- public synchronized void write(Result result) throws Exception {
+ @Override public synchronized void write(Result result) throws Exception {
FileOutputStream os = null;
JAXBContext jaxbContext = JAXBContext.newInstance(DataModelResult.class);
Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
@@ -72,18 +71,15 @@ public class XMLResultHandler implements ResultHandler {
}
}
- @Override
- public synchronized void flush() throws IOException {
+ @Override public synchronized void flush() throws IOException {
return;
}
- @Override
- public synchronized void close() throws IOException {
+ @Override public synchronized void close() throws IOException {
return;
}
- @Override
- public synchronized List<Result> read() throws Exception {
+ @Override public synchronized List<Result> read() throws Exception {
JAXBContext jaxbContext = JAXBContext.newInstance(DataModelResult.class);
Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
@@ -95,13 +91,11 @@ public class XMLResultHandler implements ResultHandler {
return results;
}
- @Override
- public boolean isClosed() {
+ @Override public boolean isClosed() {
return true;
}
- @Override
- public ResultFileDetails getResultFileDetails() {
+ @Override public ResultFileDetails getResultFileDetails() {
return resultFileDetails;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f76b92fd/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java
index 4761211..439f87e 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java
@@ -45,7 +45,7 @@ public class SchemaReader {
* @throws Exception
*/
public SchemaReader(final String searchPattern) throws Exception {
- this(new PhoenixUtil(), searchPattern);
+ this(PhoenixUtil.create(), searchPattern);
}
public SchemaReader(PhoenixUtil util, final String searchPattern) throws Exception {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f76b92fd/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
index 83e324d..0156149 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
@@ -30,6 +30,8 @@ import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.phoenix.pherf.configuration.Query;
+import org.apache.phoenix.pherf.configuration.QuerySet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,15 +41,25 @@ public class PhoenixUtil {
private static String zookeeper;
private static int rowCountOverride = 0;
private boolean testEnabled;
+ private static PhoenixUtil instance;
- public PhoenixUtil() {
+ private PhoenixUtil() {
this(false);
}
- public PhoenixUtil(final boolean testEnabled) {
+ private PhoenixUtil(final boolean testEnabled) {
this.testEnabled = testEnabled;
}
+ public static PhoenixUtil create() {
+ return create(false);
+ }
+
+ public static PhoenixUtil create(final boolean testEnabled) {
+ instance = instance != null ? instance : new PhoenixUtil(testEnabled);
+ return instance;
+ }
+
public Connection getConnection() throws Exception{
return getConnection(null);
}
@@ -56,7 +68,7 @@ public class PhoenixUtil {
return getConnection(tenantId, testEnabled);
}
- public Connection getConnection(String tenantId, boolean testEnabled) throws Exception {
+ private Connection getConnection(String tenantId, boolean testEnabled) throws Exception {
if (null == zookeeper) {
throw new IllegalArgumentException(
"Zookeeper must be set before initializing connection!");
@@ -115,17 +127,6 @@ public class PhoenixUtil {
return result;
}
- @SuppressWarnings("unused")
- public ResultSet executeQuery(PreparedStatement preparedStatement, Connection connection) {
- ResultSet resultSet = null;
- try {
- resultSet = preparedStatement.executeQuery();
- } catch (SQLException e) {
- e.printStackTrace();
- }
- return resultSet;
- }
-
/**
* Delete existing tables with schema name set as {@link PherfConstants#PHERF_SCHEMA_NAME} with regex comparison
*
@@ -133,14 +134,14 @@ public class PhoenixUtil {
* @throws SQLException
* @throws Exception
*/
- public void deleteTables(String regexMatch) throws SQLException, Exception {
+ public void deleteTables(String regexMatch) throws Exception {
regexMatch = regexMatch.toUpperCase().replace("ALL", ".*");
Connection conn = getConnection();
try {
ResultSet resultSet = getTableMetaData(PherfConstants.PHERF_SCHEMA_NAME, null, conn);
while (resultSet.next()) {
- String tableName = resultSet.getString("TABLE_SCHEM") == null ? resultSet.getString("TABLE_NAME") :
- resultSet.getString("TABLE_SCHEM") + "." + resultSet.getString("TABLE_NAME");
+ String tableName = resultSet.getString("TABLE_SCHEMA") == null ? resultSet.getString("TABLE_NAME") :
+ resultSet.getString("TABLE_SCHEMA") + "." + resultSet.getString("TABLE_NAME");
if (tableName.matches(regexMatch)) {
logger.info("\nDropping " + tableName);
executeStatement("DROP TABLE " + tableName + " CASCADE", conn);
@@ -183,8 +184,33 @@ public class PhoenixUtil {
return Collections.unmodifiableList(columnList);
}
-
- public static String getZookeeper() {
+
+ /**
+ * Execute all querySet DDLs first based on tenantId if specified. This is executed
+ * first since we don't want to run DDLs in parallel to executing queries.
+ *
+ * @param querySet
+ * @throws Exception
+ */
+ public void executeQuerySetDdls(QuerySet querySet) throws Exception {
+ for (Query query : querySet.getQuery()) {
+ if (null != query.getDdl()) {
+ Connection conn = null;
+ try {
+ logger.info("\nExecuting DDL:" + query.getDdl() + " on tenantId:" + query
+ .getTenantId());
+ executeStatement(query.getDdl(),
+ conn = getConnection(query.getTenantId()));
+ } finally {
+ if (null != conn) {
+ conn.close();
+ }
+ }
+ }
+ }
+ }
+
+ public static String getZookeeper() {
return zookeeper;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f76b92fd/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
new file mode 100644
index 0000000..efb3da9
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Calendar;
+import java.util.Date;
+
+import org.apache.phoenix.pherf.PherfConstants.RunMode;
+
+import org.apache.phoenix.pherf.result.DataModelResult;
+import org.apache.phoenix.pherf.result.ResultManager;
+import org.apache.phoenix.pherf.result.RunTime;
+import org.apache.phoenix.pherf.result.ThreadTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.phoenix.pherf.configuration.Query;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+
+class MultiThreadedRunner implements Runnable {
+ private static final Logger logger = LoggerFactory.getLogger(MultiThreadedRunner.class);
+ private Query query;
+ private ThreadTime threadTime;
+ private PhoenixUtil pUtil = PhoenixUtil.create();
+ private String threadName;
+ private DataModelResult dataModelResult;
+ private long numberOfExecutions;
+ private long executionDurationInMs;
+ private static long lastResultWritten = System.currentTimeMillis() - 1000;
+ private final ResultManager resultManager;
+
+ /**
+ * MultiThreadedRunner
+ *
+ * @param threadName
+ * @param query
+ * @param dataModelResult
+ * @param threadTime
+ * @param numberOfExecutions
+ * @param executionDurationInMs
+ */
+ MultiThreadedRunner(String threadName, Query query, DataModelResult dataModelResult,
+ ThreadTime threadTime, long numberOfExecutions, long executionDurationInMs) {
+ this.query = query;
+ this.threadName = threadName;
+ this.threadTime = threadTime;
+ this.dataModelResult = dataModelResult;
+ this.numberOfExecutions = numberOfExecutions;
+ this.executionDurationInMs = executionDurationInMs;
+ this.resultManager = new ResultManager(dataModelResult.getName(), RunMode.PERFORMANCE);
+ }
+
+ /**
+ * Executes run for a minimum of number of execution or execution duration
+ */
+ public void run() {
+ logger.info("\n\nThread Starting " + threadName + " ; " + query.getStatement() + " for "
+ + numberOfExecutions + "times\n\n");
+ Long start = System.currentTimeMillis();
+ for (long i = numberOfExecutions; (i > 0 && ((System.currentTimeMillis() - start)
+ < executionDurationInMs)); i--) {
+ try {
+ synchronized (resultManager) {
+ timedQuery();
+ if ((System.currentTimeMillis() - lastResultWritten) > 1000) {
+ resultManager.write(dataModelResult);
+ lastResultWritten = System.currentTimeMillis();
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ logger.info("\n\nThread exiting." + threadName + "\n\n");
+ }
+
+ private synchronized ThreadTime getThreadTime() {
+ return threadTime;
+ }
+
+ /**
+ * Timed query execution
+ *
+ * @throws Exception
+ */
+ private void timedQuery() throws Exception {
+ boolean
+ isSelectCountStatement =
+ query.getStatement().toUpperCase().trim().contains("COUNT(*)") ? true : false;
+
+ Connection conn = null;
+ PreparedStatement statement = null;
+ ResultSet rs = null;
+ Long start = System.currentTimeMillis();
+ Date startDate = Calendar.getInstance().getTime();
+ String exception = null;
+ long resultRowCount = 0;
+
+ try {
+ conn = pUtil.getConnection(query.getTenantId());
+ statement = conn.prepareStatement(query.getStatement());
+ boolean isQuery = statement.execute();
+ if (isQuery) {
+ rs = statement.getResultSet();
+ while (rs.next()) {
+ if (null != query.getExpectedAggregateRowCount()) {
+ if (rs.getLong(1) != query.getExpectedAggregateRowCount())
+ throw new RuntimeException(
+ "Aggregate count " + rs.getLong(1) + " does not match expected "
+ + query.getExpectedAggregateRowCount());
+ }
+
+ if (isSelectCountStatement) {
+ resultRowCount = rs.getLong(1);
+ } else {
+ resultRowCount++;
+ }
+ }
+ } else {
+ conn.commit();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ exception = e.getMessage();
+ } finally {
+ getThreadTime().getRunTimesInMs().add(new RunTime(exception, startDate, resultRowCount,
+ (int) (System.currentTimeMillis() - start)));
+
+ if (rs != null) rs.close();
+ if (statement != null) statement.close();
+ if (conn != null) conn.close();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f76b92fd/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
index c78db90..1735754 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
@@ -30,84 +30,69 @@ import org.apache.phoenix.pherf.result.RunTime;
import org.apache.phoenix.pherf.result.ThreadTime;
class MultithreadedDiffer implements Runnable {
- private static final Logger logger = LoggerFactory
- .getLogger(MultithreadedRunner.class);
- private Thread t;
- private Query query;
- private ThreadTime threadTime;
- private String threadName;
- private long numberOfExecutions;
- private long executionDurationInMs;
- private QueryVerifier queryVerifier = new QueryVerifier(true);
+ private static final Logger logger = LoggerFactory.getLogger(MultiThreadedRunner.class);
+ private Thread t;
+ private Query query;
+ private ThreadTime threadTime;
+ private String threadName;
+ private long numberOfExecutions;
+ private long executionDurationInMs;
+ private QueryVerifier queryVerifier = new QueryVerifier(true);
- private synchronized ThreadTime getThreadTime() {
+ private synchronized ThreadTime getThreadTime() {
return threadTime;
}
/**
- * Query Verification
- * @throws Exception
- */
- private void diffQuery() throws Exception {
- Long start = System.currentTimeMillis();
- Date startDate = Calendar.getInstance().getTime();
- String newCSV = queryVerifier.exportCSV(query);
- boolean verifyResult = queryVerifier.doDiff(query, newCSV);
- String explainPlan = queryVerifier.getExplainPlan(query);
- getThreadTime().getRunTimesInMs().add(
- new RunTime(verifyResult == true ? PherfConstants.DIFF_PASS : PherfConstants.DIFF_FAIL,
- explainPlan, startDate, -1L,
- (int)(System.currentTimeMillis() - start)));
- }
-
- /**
- * Multithreaded Differ
- * @param threadName
- * @param query
- * @param threadName
- * @param threadTime
- * @param numberOfExecutions
- * @param executionDurationInMs
- */
- MultithreadedDiffer(String threadName,
- Query query,
- ThreadTime threadTime,
- long numberOfExecutions,
- long executionDurationInMs) {
- this.query = query;
- this.threadName = threadName;
- this.threadTime = threadTime;
- this.numberOfExecutions = numberOfExecutions;
- this.executionDurationInMs = executionDurationInMs;
- }
+ * Query Verification
+ *
+ * @throws Exception
+ */
+ private void diffQuery() throws Exception {
+ Long start = System.currentTimeMillis();
+ Date startDate = Calendar.getInstance().getTime();
+ String newCSV = queryVerifier.exportCSV(query);
+ boolean verifyResult = queryVerifier.doDiff(query, newCSV);
+ String explainPlan = queryVerifier.getExplainPlan(query);
+ getThreadTime().getRunTimesInMs().add(new RunTime(
+ verifyResult == true ? PherfConstants.DIFF_PASS : PherfConstants.DIFF_FAIL,
+ explainPlan, startDate, -1L, (int) (System.currentTimeMillis() - start)));
+ }
- /**
- * Executes verification runs for a minimum of number of execution or execution duration
- */
- public void run() {
- logger.info("\n\nThread Starting " + t.getName() + " ; " + query.getStatement() + " for "
- + numberOfExecutions + "times\n\n");
- Long start = System.currentTimeMillis();
- for (long i = numberOfExecutions; (i > 0 && ((System
- .currentTimeMillis() - start) < executionDurationInMs)); i--) {
- try {
- diffQuery();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- logger.info("\n\nThread exiting." + t.getName() + "\n\n");
- }
+ /**
+ * Multithreaded Differ
+ *
+ * @param threadName
+ * @param query
+ * @param threadName
+ * @param threadTime
+ * @param numberOfExecutions
+ * @param executionDurationInMs
+ */
+ MultithreadedDiffer(String threadName, Query query, ThreadTime threadTime,
+ long numberOfExecutions, long executionDurationInMs) {
+ this.query = query;
+ this.threadName = threadName;
+ this.threadTime = threadTime;
+ this.numberOfExecutions = numberOfExecutions;
+ this.executionDurationInMs = executionDurationInMs;
+ }
- /**
- * Thread start
- * @return
- */
- public Thread start() {
- if (t == null) {
- t = new Thread(this, threadName);
- t.start();
- }
- return t;
- }
+ /**
+ * Executes verification runs for a minimum of number of execution or execution duration
+ */
+ public void run() {
+ logger.info("\n\nThread Starting " + t.getName() + " ; " + query.getStatement() + " for "
+ + numberOfExecutions + "times\n\n");
+ Long start = System.currentTimeMillis();
+ for (long i = numberOfExecutions; (i > 0 && ((System.currentTimeMillis() - start)
+ < executionDurationInMs)); i--) {
+ try {
+ diffQuery();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ logger.info("\n\nThread exiting." + t.getName() + "\n\n");
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f76b92fd/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedRunner.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedRunner.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedRunner.java
deleted file mode 100644
index 237fc17..0000000
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedRunner.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pherf.workload;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.util.Calendar;
-import java.util.Date;
-
-import org.apache.phoenix.pherf.PherfConstants.RunMode;
-
-import org.apache.phoenix.pherf.result.DataModelResult;
-import org.apache.phoenix.pherf.result.ResultManager;
-import org.apache.phoenix.pherf.result.RunTime;
-import org.apache.phoenix.pherf.result.ThreadTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.phoenix.pherf.configuration.Query;
-import org.apache.phoenix.pherf.util.PhoenixUtil;
-
-class MultithreadedRunner implements Runnable {
- private static final Logger logger = LoggerFactory
- .getLogger(MultithreadedRunner.class);
- private Thread t;
- private Query query;
- private ThreadTime threadTime;
- private PhoenixUtil pUtil = new PhoenixUtil();
- private String threadName;
- private DataModelResult dataModelResult;
- private long numberOfExecutions;
- private long executionDurationInMs;
- private static long lastResultWritten = System.currentTimeMillis() - 1000;
- private final ResultManager resultManager;
-
- /**
- * Multithreaded runner
- *
- * @param threadName
- * @param query
- * @param dataModelResult
- * @param threadTime
- * @param numberOfExecutions
- * @param executionDurationInMs
- */
- MultithreadedRunner(String threadName,
- Query query,
- DataModelResult dataModelResult,
- ThreadTime threadTime,
- long numberOfExecutions,
- long executionDurationInMs) {
- this.query = query;
- this.threadName = threadName;
- this.threadTime = threadTime;
- this.dataModelResult = dataModelResult;
- this.numberOfExecutions = numberOfExecutions;
- this.executionDurationInMs = executionDurationInMs;
- this.resultManager = new ResultManager(dataModelResult.getName(), RunMode.PERFORMANCE);
- }
-
- /**
- * Executes run for a minimum of number of execution or execution duration
- */
- public void run() {
- logger.info("\n\nThread Starting " + t.getName() + " ; " + query.getStatement() + " for "
- + numberOfExecutions + "times\n\n");
- Long start = System.currentTimeMillis();
- for (long i = numberOfExecutions; (i > 0 && ((System
- .currentTimeMillis() - start) < executionDurationInMs)); i--) {
- try {
- synchronized (resultManager) {
- timedQuery();
- if ((System.currentTimeMillis() - lastResultWritten) > 1000) {
- resultManager.write(dataModelResult);
- lastResultWritten = System.currentTimeMillis();
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- logger.info("\n\nThread exiting." + t.getName() + "\n\n");
- }
-
- /**
- * Thread start
- * @return
- */
- public Thread start() {
- if (t == null) {
- t = new Thread(this, threadName);
- t.start();
- }
- return t;
- }
-
- private synchronized ThreadTime getThreadTime() {
- return threadTime;
- }
-
- /**
- * Timed query execution
- *
- * @throws Exception
- */
- private void timedQuery() throws Exception {
- boolean isSelectCountStatement = query.getStatement().toUpperCase().trim()
- .contains("COUNT(*)") ? true : false;
-
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- Long start = System.currentTimeMillis();
- Date startDate = Calendar.getInstance().getTime();
- String exception = null;
- long resultRowCount = 0;
-
- try {
- conn = pUtil.getConnection(query.getTenantId());
- statement = conn.prepareStatement(query.getStatement());
- boolean isQuery = statement.execute();
- if (isQuery) {
- rs = statement.getResultSet();
- while (rs.next()) {
- if (null != query.getExpectedAggregateRowCount()) {
- if (rs.getLong(1) != query.getExpectedAggregateRowCount())
- throw new RuntimeException("Aggregate count "
- + rs.getLong(1) + " does not match expected "
- + query.getExpectedAggregateRowCount());
- }
-
- if (isSelectCountStatement) {
- resultRowCount = rs.getLong(1);
- } else {
- resultRowCount++;
- }
- }
- } else {
- conn.commit();
- }
- } catch (Exception e) {
- e.printStackTrace();
- exception = e.getMessage();
- } finally {
- getThreadTime().getRunTimesInMs().add(
- new RunTime(exception, startDate, resultRowCount, (int) (System.currentTimeMillis() - start)));
-
- if (rs != null) rs.close();
- if (statement != null) statement.close();
- if (conn != null) conn.close();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f76b92fd/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
index 6f6e000..624188c 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
@@ -18,227 +18,256 @@
package org.apache.phoenix.pherf.workload;
-import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.phoenix.pherf.PherfConstants.RunMode;
-import org.apache.phoenix.pherf.configuration.XMLConfigParser;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.phoenix.pherf.PherfConstants.RunMode;
+import org.apache.phoenix.pherf.configuration.*;
import org.apache.phoenix.pherf.result.*;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.phoenix.pherf.configuration.DataModel;
-import org.apache.phoenix.pherf.configuration.ExecutionType;
-import org.apache.phoenix.pherf.configuration.Query;
-import org.apache.phoenix.pherf.configuration.QuerySet;
-import org.apache.phoenix.pherf.configuration.Scenario;
-import org.apache.phoenix.pherf.util.PhoenixUtil;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+public class QueryExecutor implements Workload {
+ private static final Logger logger = LoggerFactory.getLogger(QueryExecutor.class);
+ private List<DataModel> dataModels;
+ private String queryHint;
+ private final RunMode runMode;
+ private final boolean exportCSV;
+ private final ExecutorService pool;
+ private final XMLConfigParser parser;
+ private final PhoenixUtil util;
+
+ public QueryExecutor(XMLConfigParser parser, PhoenixUtil util, ExecutorService pool) {
+ this(parser, util, pool, parser.getDataModels(), null, false, RunMode.PERFORMANCE);
+ }
+
+ public QueryExecutor(XMLConfigParser parser, PhoenixUtil util, ExecutorService pool,
+ List<DataModel> dataModels, String queryHint, boolean exportCSV, RunMode runMode) {
+ this.parser = parser;
+ this.queryHint = queryHint;
+ this.exportCSV = exportCSV;
+ this.runMode = runMode;
+ this.dataModels = dataModels;
+ this.pool = pool;
+ this.util = util;
+ }
+
+ @Override public void complete() {
+
+ }
+
+ /**
+ * Calls in Multithreaded Query Executor for all datamodels
+ *
+ * @throws Exception
+ */
+ public Runnable execute() throws Exception {
+ Runnable runnable = null;
+ for (DataModel dataModel : dataModels) {
+ if (exportCSV) {
+ runnable = exportAllScenarios(dataModel);
+ } else {
+ runnable = executeAllScenarios(dataModel);
+ }
+ }
+ return runnable;
+ }
+
+ /**
+ * Export all queries results to CSV
+ *
+ * @param dataModel
+ * @throws Exception
+ */
+ protected Runnable exportAllScenarios(final DataModel dataModel) throws Exception {
+ return new Runnable() {
+ @Override public void run() {
+ try {
+
+ List<Scenario> scenarios = dataModel.getScenarios();
+ QueryVerifier exportRunner = new QueryVerifier(false);
+ for (Scenario scenario : scenarios) {
+ for (QuerySet querySet : scenario.getQuerySet()) {
+ util.executeQuerySetDdls(querySet);
+ for (Query query : querySet.getQuery()) {
+ exportRunner.exportCSV(query);
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("", e);
+ }
+ }
+ };
+ }
+
+ /**
+ * Execute all scenarios
+ *
+ * @param dataModel
+ * @throws Exception
+ */
+ protected Runnable executeAllScenarios(final DataModel dataModel) throws Exception {
+ return new Runnable() {
+ @Override public void run() {
+ List<DataModelResult> dataModelResults = new ArrayList<>();
+ DataModelResult
+ dataModelResult =
+ new DataModelResult(dataModel, PhoenixUtil.getZookeeper());
+ ResultManager
+ resultManager =
+ new ResultManager(dataModelResult.getName(), QueryExecutor.this.runMode);
+
+ dataModelResults.add(dataModelResult);
+ List<Scenario> scenarios = dataModel.getScenarios();
+ Configuration conf = HBaseConfiguration.create();
+ Map<String, String> phoenixProperty = conf.getValByRegex("phoenix");
+ try {
+
+ for (Scenario scenario : scenarios) {
+ ScenarioResult scenarioResult = new ScenarioResult(scenario);
+ scenarioResult.setPhoenixProperties(phoenixProperty);
+ dataModelResult.getScenarioResult().add(scenarioResult);
+ WriteParams writeParams = scenario.getWriteParams();
+
+ if (writeParams != null) {
+ int writerThreadCount = writeParams.getWriterThreadCount();
+ for (int i = 0; i < writerThreadCount; i++) {
+ logger.debug("Inserting write workload ( " + i + " ) of ( "
+ + writerThreadCount + " )");
+ Workload writes = new WriteWorkload(PhoenixUtil.create(), parser);
+ pool.submit(writes.execute());
+ }
+ }
+
+ for (QuerySet querySet : scenario.getQuerySet()) {
+ QuerySetResult querySetResult = new QuerySetResult(querySet);
+ scenarioResult.getQuerySetResult().add(querySetResult);
+
+ util.executeQuerySetDdls(querySet);
+ if (querySet.getExecutionType() == ExecutionType.SERIAL) {
+ executeQuerySetSerial(dataModelResult, querySet, querySetResult);
+ } else {
+ executeQuerySetParallel(dataModelResult, querySet, querySetResult);
+ }
+ }
+ resultManager.write(dataModelResult);
+ }
+ resultManager.write(dataModelResults);
+ } catch (Exception e) {
+ logger.warn("", e);
+ }
+ }
+ };
+ }
-public class QueryExecutor {
- private static final Logger logger = LoggerFactory.getLogger(QueryExecutor.class);
- private List<DataModel> dataModels;
- private String queryHint;
- private RunMode runMode;
+ /**
+ * Execute query set serially
+ *
+ * @param dataModelResult
+ * @param querySet
+ * @param querySetResult
+ * @throws InterruptedException
+ */
+ protected void executeQuerySetSerial(DataModelResult dataModelResult, QuerySet querySet,
+ QuerySetResult querySetResult) throws InterruptedException {
+ for (Query query : querySet.getQuery()) {
+ QueryResult queryResult = new QueryResult(query);
+ querySetResult.getQueryResults().add(queryResult);
+
+ for (int cr = querySet.getMinConcurrency(); cr <= querySet.getMaxConcurrency(); cr++) {
+
+ List<Future> threads = new ArrayList<>();
+
+ for (int i = 0; i < cr; i++) {
+
+ Runnable
+ thread =
+ executeRunner((i + 1) + "," + cr, dataModelResult, queryResult,
+ querySetResult);
+ threads.add(pool.submit(thread));
+ }
+
+ for (Future thread : threads) {
+ try {
+ thread.get();
+ } catch (ExecutionException e) {
+ logger.error("", e);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Execute query set in parallel
+ *
+ * @param dataModelResult
+ * @param querySet
+ * @param querySetResult
+ * @throws InterruptedException
+ */
+ protected void executeQuerySetParallel(DataModelResult dataModelResult, QuerySet querySet,
+ QuerySetResult querySetResult) throws InterruptedException {
+ for (int cr = querySet.getMinConcurrency(); cr <= querySet.getMaxConcurrency(); cr++) {
+ List<Future> threads = new ArrayList<>();
+ for (int i = 0; i < cr; i++) {
+ for (Query query : querySet.getQuery()) {
+ QueryResult queryResult = new QueryResult(query);
+ querySetResult.getQueryResults().add(queryResult);
+
+ Runnable
+ thread =
+ executeRunner((i + 1) + "," + cr, dataModelResult, queryResult,
+ querySetResult);
+ threads.add(pool.submit(thread));
+ }
+
+ for (Future thread : threads) {
+ try {
+ thread.get();
+ } catch (ExecutionException e) {
+ logger.error("", e);
+ }
+ }
+ }
+ }
+ }
- public QueryExecutor(XMLConfigParser parser) {
- this.dataModels = parser.getDataModels();
+ /**
+ * Execute multi-thread runner
+ *
+ * @param name
+ * @param dataModelResult
+ * @param queryResult
+ * @param querySet
+ * @return
+ */
+ protected Runnable executeRunner(String name, DataModelResult dataModelResult,
+ QueryResult queryResult, QuerySet querySet) {
+ ThreadTime threadTime = new ThreadTime();
+ queryResult.getThreadTimes().add(threadTime);
+ threadTime.setThreadName(name);
+ queryResult.setHint(this.queryHint);
+ logger.info("\nExecuting query " + queryResult.getStatement());
+ Runnable thread;
+ if (this.runMode == RunMode.FUNCTIONAL) {
+ thread =
+ new MultithreadedDiffer(threadTime.getThreadName(), queryResult, threadTime,
+ querySet.getNumberOfExecutions(), querySet.getExecutionDurationInMs());
+ } else {
+ thread =
+ new MultiThreadedRunner(threadTime.getThreadName(), queryResult,
+ dataModelResult, threadTime, querySet.getNumberOfExecutions(),
+ querySet.getExecutionDurationInMs());
+ }
+ return thread;
}
-
- /**
- * Calls in Multithreaded Query Executor for all datamodels
- * @throws Exception
- */
- public void execute(String queryHint, boolean exportCSV, RunMode runMode) throws Exception {
- this.queryHint = queryHint;
- this.runMode = runMode;
- for (DataModel dataModel: dataModels) {
- if (exportCSV) {
- exportAllScenarios(dataModel);
- } else {
- executeAllScenarios(dataModel);
- }
- }
- }
-
- /**
- * Export all queries results to CSV
- * @param dataModel
- * @throws Exception
- */
- protected void exportAllScenarios(DataModel dataModel) throws Exception {
- List<Scenario> scenarios = dataModel.getScenarios();
- QueryVerifier exportRunner = new QueryVerifier(false);
- for (Scenario scenario : scenarios) {
- for (QuerySet querySet : scenario.getQuerySet()) {
- executeQuerySetDdls(querySet);
- for (Query query : querySet.getQuery()) {
- exportRunner.exportCSV(query);
- }
- }
- }
- }
-
- /**
- * Execute all scenarios
- * @param dataModel
- * @throws Exception
- */
- protected void executeAllScenarios(DataModel dataModel) throws Exception {
- List<DataModelResult> dataModelResults = new ArrayList<DataModelResult>();
- DataModelResult dataModelResult = new DataModelResult(dataModel, PhoenixUtil.getZookeeper());
- ResultManager resultManager = new ResultManager(dataModelResult.getName(), this.runMode);
-
-
- dataModelResults.add(dataModelResult);
- List<Scenario> scenarios = dataModel.getScenarios();
- Configuration conf = HBaseConfiguration.create();
- Map<String, String> phoenixProperty = conf.getValByRegex("phoenix");
- phoenixProperty.putAll(conf.getValByRegex("sfdc"));
-
- for (Scenario scenario : scenarios) {
- ScenarioResult scenarioResult = new ScenarioResult(scenario);
- scenarioResult.setPhoenixProperties(phoenixProperty);
- dataModelResult.getScenarioResult().add(scenarioResult);
-
- for (QuerySet querySet : scenario.getQuerySet()) {
- QuerySetResult querySetResult = new QuerySetResult(querySet);
- scenarioResult.getQuerySetResult().add(querySetResult);
-
- executeQuerySetDdls(querySet);
-
- if (querySet.getExecutionType() == ExecutionType.SERIAL) {
- execcuteQuerySetSerial(dataModelResult, querySet, querySetResult, scenarioResult);
- } else {
- execcuteQuerySetParallel(dataModelResult, querySet, querySetResult, scenarioResult);
- }
- }
- resultManager.write(dataModelResult);
- }
- resultManager.write(dataModelResults);
- }
-
- /**
- * Execute all querySet DDLs first based on tenantId if specified. This is executed
- * first since we don't want to run DDLs in parallel to executing queries.
- *
- * @param querySet
- * @throws Exception
- */
- protected void executeQuerySetDdls(QuerySet querySet) throws Exception {
- PhoenixUtil pUtil = new PhoenixUtil();
- for (Query query : querySet.getQuery()) {
- if (null != query.getDdl()) {
- Connection conn = null;
- try {
- logger.info("\nExecuting DDL:" + query.getDdl() + " on tenantId:" + query.getTenantId());
- pUtil.executeStatement(query.getDdl(), conn = pUtil.getConnection(query.getTenantId()));
- } finally {
- if (null != conn) {
- conn.close();
- }
- }
- }
- }
- }
-
- /**
- * Execute query set serially
- * @param dataModelResult
- * @param querySet
- * @param querySetResult
- * @param scenario
- * @throws InterruptedException
- */
- protected void execcuteQuerySetSerial(DataModelResult dataModelResult, QuerySet querySet, QuerySetResult querySetResult, Scenario scenario) throws InterruptedException {
- for (Query query : querySet.getQuery()) {
- QueryResult queryResult = new QueryResult(query);
- querySetResult.getQueryResults().add(queryResult);
-
- for (int cr = querySet.getMinConcurrency(); cr <= querySet
- .getMaxConcurrency(); cr++) {
-
- List<Thread> threads = new ArrayList<Thread>();
-
- for (int i = 0; i < cr; i++) {
-
- Thread thread = executeRunner((i + 1) + ","
- + cr, dataModelResult, queryResult,
- querySetResult);
- threads.add(thread);
- }
-
- for (Thread thread : threads) {
- thread.join();
- }
- }
- }
- }
-
- /**
- * Execute query set in parallel
- * @param dataModelResult
- * @param querySet
- * @param querySetResult
- * @param scenario
- * @throws InterruptedException
- */
- protected void execcuteQuerySetParallel(DataModelResult dataModelResult, QuerySet querySet, QuerySetResult querySetResult, Scenario scenario)
- throws InterruptedException {
- for (int cr = querySet.getMinConcurrency(); cr <= querySet
- .getMaxConcurrency(); cr++) {
- List<Thread> threads = new ArrayList<Thread>();
- for (int i = 0; i < cr; i++) {
- for (Query query : querySet.getQuery()) {
- QueryResult queryResult = new QueryResult(query);
- querySetResult.getQueryResults().add(queryResult);
-
- Thread thread = executeRunner((i + 1) + ","
- + cr, dataModelResult, queryResult,
- querySetResult);
- threads.add(thread);
- }
- }
- for (Thread thread : threads) {
- thread.join();
- }
- }
- }
-
- /**
- * Execute multi-thread runner
- * @param name
- * @param dataModelResult
- * @param queryResult
- * @param querySet
- * @return
- */
- protected Thread executeRunner(String name, DataModelResult dataModelResult, QueryResult queryResult, QuerySet querySet) {
- ThreadTime threadTime = new ThreadTime();
- queryResult.getThreadTimes().add(threadTime);
- threadTime.setThreadName(name);
- queryResult.setHint(this.queryHint);
- logger.info("\nExecuting query "
- + queryResult.getStatement());
- Thread thread;
- if (this.runMode == RunMode.FUNCTIONAL) {
- thread = new MultithreadedDiffer(
- threadTime.getThreadName(),
- queryResult,
- threadTime, querySet.getNumberOfExecutions(), querySet.getExecutionDurationInMs())
- .start();
- } else {
- thread = new MultithreadedRunner(
- threadTime.getThreadName(),
- queryResult,
- dataModelResult,
- threadTime, querySet.getNumberOfExecutions(), querySet.getExecutionDurationInMs())
- .start();
- }
- return thread;
- }
-}
+}
\ No newline at end of file