You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/07/02 23:01:20 UTC
[17/47] phoenix git commit: PHOENIX-1920 - Pherf - Add support for
mixed r/w workloads
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java
index 78f18ca..c9333a0 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java
@@ -43,153 +43,160 @@ import difflib.DiffUtils;
import difflib.Patch;
public class QueryVerifier {
- private PhoenixUtil pUtil = new PhoenixUtil();
- private static final Logger logger = LoggerFactory
- .getLogger(QueryVerifier.class);
- private boolean useTemporaryOutput;
- private String directoryLocation;
-
- public QueryVerifier(boolean useTemporaryOutput) {
- this.useTemporaryOutput = useTemporaryOutput;
- this.directoryLocation = this.useTemporaryOutput ?
- PherfConstants.EXPORT_TMP : PherfConstants.EXPORT_DIR;
-
- ensureBaseDirExists();
- }
-
- /***
- * Export query resultSet to CSV file
- * @param query
- * @throws Exception
- */
- public String exportCSV(Query query) throws Exception {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- String fileName = getFileName(query);
- FileOutputStream fos = new FileOutputStream(fileName);
- try {
- conn = pUtil.getConnection(query.getTenantId());
- statement = conn.prepareStatement(query.getStatement());
- boolean isQuery = statement.execute();
- if (isQuery) {
- rs = statement.executeQuery();
- int columnCount = rs.getMetaData().getColumnCount();
- while (rs.next()) {
- for (int columnNum = 1; columnNum <= columnCount; columnNum++) {
- fos.write((rs.getString(columnNum) + PherfConstants.RESULT_FILE_DELIMETER).getBytes());
- }
- fos.write(PherfConstants.NEW_LINE.getBytes());
- }
- } else {
- conn.commit();
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (rs != null) rs.close();
- if (statement != null) statement.close();
- if (conn != null) conn.close();
- fos.flush();
- fos.close();
- }
- return fileName;
- }
-
- /***
- * Do a diff between exported query results and temporary CSV file
- * @param query
- * @param newCSV
- * @return
- */
- public boolean doDiff(Query query, String newCSV) {
+ private PhoenixUtil pUtil = PhoenixUtil.create();
+ private static final Logger logger = LoggerFactory.getLogger(QueryVerifier.class);
+ private boolean useTemporaryOutput;
+ private String directoryLocation;
+
+ public QueryVerifier(boolean useTemporaryOutput) {
+ this.useTemporaryOutput = useTemporaryOutput;
+ this.directoryLocation =
+ this.useTemporaryOutput ? PherfConstants.EXPORT_TMP : PherfConstants.EXPORT_DIR;
+
+ ensureBaseDirExists();
+ }
+
+ /**
+ * Export query resultSet to CSV file
+ *
+ * @param query
+ * @throws Exception
+ */
+ public String exportCSV(Query query) throws Exception {
+ Connection conn = null;
+ PreparedStatement statement = null;
+ ResultSet rs = null;
+ String fileName = getFileName(query);
+ FileOutputStream fos = new FileOutputStream(fileName);
+ try {
+ conn = pUtil.getConnection(query.getTenantId());
+ statement = conn.prepareStatement(query.getStatement());
+ boolean isQuery = statement.execute();
+ if (isQuery) {
+ rs = statement.executeQuery();
+ int columnCount = rs.getMetaData().getColumnCount();
+ while (rs.next()) {
+ for (int columnNum = 1; columnNum <= columnCount; columnNum++) {
+ fos.write((rs.getString(columnNum) + PherfConstants.RESULT_FILE_DELIMETER)
+ .getBytes());
+ }
+ fos.write(PherfConstants.NEW_LINE.getBytes());
+ }
+ } else {
+ conn.commit();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (rs != null) rs.close();
+ if (statement != null) statement.close();
+ if (conn != null) conn.close();
+ fos.flush();
+ fos.close();
+ }
+ return fileName;
+ }
+
+ /**
+ * Do a diff between exported query results and temporary CSV file
+ *
+ * @param query
+ * @param newCSV
+ * @return
+ */
+ public boolean doDiff(Query query, String newCSV) {
List<String> original = fileToLines(getCSVName(query, PherfConstants.EXPORT_DIR, ""));
- List<String> newLines = fileToLines(newCSV);
-
+ List<String> newLines = fileToLines(newCSV);
+
Patch patch = DiffUtils.diff(original, newLines);
if (patch.getDeltas().isEmpty()) {
- logger.info("Match: " + query.getId() + " with " + newCSV);
- return true;
+ logger.info("Match: " + query.getId() + " with " + newCSV);
+ return true;
} else {
- logger.error("DIFF FAILED: " + query.getId() + " with " + newCSV);
- return false;
+ logger.error("DIFF FAILED: " + query.getId() + " with " + newCSV);
+ return false;
}
- }
-
- /***
- * Helper method to load file
- * @param filename
- * @return
- */
+ }
+
+ /**
+ * Helper method to load file
+ *
+ * @param filename
+ * @return
+ */
private static List<String> fileToLines(String filename) {
- List<String> lines = new LinkedList<String>();
- String line = "";
- try {
- BufferedReader in = new BufferedReader(new FileReader(filename));
- while ((line = in.readLine()) != null) {
- lines.add(line);
- }
- in.close();
- } catch (IOException e) {
- e.printStackTrace();
+ List<String> lines = new LinkedList<String>();
+ String line = "";
+ try {
+ BufferedReader in = new BufferedReader(new FileReader(filename));
+ while ((line = in.readLine()) != null) {
+ lines.add(line);
}
-
- return lines;
+ in.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return lines;
}
/**
* Get explain plan for a query
+ *
* @param query
* @return
* @throws SQLException
*/
- public String getExplainPlan(Query query) throws SQLException {
- Connection conn = null;
- ResultSet rs = null;
- PreparedStatement statement = null;
- StringBuilder buf = new StringBuilder();
- try {
- conn = pUtil.getConnection(query.getTenantId());
- statement = conn.prepareStatement("EXPLAIN " + query.getStatement());
- rs = statement.executeQuery();
- while (rs.next()) {
- buf.append(rs.getString(1).trim().replace(",", "-"));
- }
- statement.close();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (rs != null) rs.close();
- if (statement != null) statement.close();
- if (conn != null) conn.close();
- }
- return buf.toString();
- }
-
- /***
+ public String getExplainPlan(Query query) throws SQLException {
+ Connection conn = null;
+ ResultSet rs = null;
+ PreparedStatement statement = null;
+ StringBuilder buf = new StringBuilder();
+ try {
+ conn = pUtil.getConnection(query.getTenantId());
+ statement = conn.prepareStatement("EXPLAIN " + query.getStatement());
+ rs = statement.executeQuery();
+ while (rs.next()) {
+ buf.append(rs.getString(1).trim().replace(",", "-"));
+ }
+ statement.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (rs != null) rs.close();
+ if (statement != null) statement.close();
+ if (conn != null) conn.close();
+ }
+ return buf.toString();
+ }
+
+ /**
* Helper method to generate CSV file name
+ *
* @param query
* @return
* @throws FileNotFoundException
*/
- private String getFileName(Query query) throws FileNotFoundException {
- String tempExt = "";
- if (this.useTemporaryOutput) {
- tempExt = "_" + java.util.UUID.randomUUID().toString();
- }
- return getCSVName(query, this.directoryLocation, tempExt);
- }
-
- private String getCSVName(Query query, String directory, String tempExt) {
- String csvFile = directory + PherfConstants.PATH_SEPARATOR
- + query.getId() + tempExt + Extension.CSV.toString();
- return csvFile;
- }
-
- private void ensureBaseDirExists() {
- File baseDir = new File(this.directoryLocation);
- if (!baseDir.exists()) {
- baseDir.mkdir();
- }
- }
+ private String getFileName(Query query) throws FileNotFoundException {
+ String tempExt = "";
+ if (this.useTemporaryOutput) {
+ tempExt = "_" + java.util.UUID.randomUUID().toString();
+ }
+ return getCSVName(query, this.directoryLocation, tempExt);
+ }
+
+ private String getCSVName(Query query, String directory, String tempExt) {
+ String
+ csvFile =
+ directory + PherfConstants.PATH_SEPARATOR + query.getId() + tempExt + Extension.CSV
+ .toString();
+ return csvFile;
+ }
+
+ private void ensureBaseDirExists() {
+ File baseDir = new File(this.directoryLocation);
+ if (!baseDir.exists()) {
+ baseDir.mkdir();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
new file mode 100644
index 0000000..16a493e
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
@@ -0,0 +1,10 @@
+package org.apache.phoenix.pherf.workload;
+
+public interface Workload {
+ public Runnable execute() throws Exception;
+
+ /**
+ * Use this method to perform any cleanup or forced shutdown of the thread.
+ */
+ public void complete();
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
index cf2f038..a65b4aa 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
@@ -19,95 +19,96 @@
package org.apache.phoenix.pherf.workload;
import org.apache.phoenix.pherf.PherfConstants;
-import org.apache.phoenix.pherf.PherfConstants.RunMode;
-import org.apache.phoenix.pherf.configuration.XMLConfigParser;
-import org.apache.phoenix.pherf.jmx.MonitorManager;
-import org.apache.phoenix.pherf.loaddata.DataLoader;
-
-import org.apache.phoenix.pherf.util.ResourceList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
public class WorkloadExecutor {
private static final Logger logger = LoggerFactory.getLogger(WorkloadExecutor.class);
- private final XMLConfigParser parser;
- private MonitorManager monitor;
- private Future monitorThread;
private final int poolSize;
- private final ExecutorService pool;
+ // Jobs can be accessed by multiple threads
+ private final Map<Workload, Future> jobs = new ConcurrentHashMap<>();
+ private final ExecutorService pool;
public WorkloadExecutor() throws Exception {
this(PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES));
}
- public WorkloadExecutor(Properties properties) throws Exception{
- this(properties,PherfConstants.DEFAULT_FILE_PATTERN);
+ public WorkloadExecutor(Properties properties) throws Exception {
+ this(properties, new ArrayList());
}
- public WorkloadExecutor(Properties properties, String filePattern) throws Exception {
- this(properties,
- new XMLConfigParser(filePattern),
- true);
+ public WorkloadExecutor(Properties properties, List<Workload> workloads) throws Exception {
+ this.poolSize =
+ (properties.getProperty("pherf.default.threadpool") == null) ?
+ PherfConstants.DEFAULT_THREAD_POOL_SIZE :
+ Integer.parseInt(properties.getProperty("pherf.default.threadpool"));
+
+ this.pool = Executors.newFixedThreadPool(this.poolSize);
+ init(workloads);
}
- public WorkloadExecutor(Properties properties, XMLConfigParser parser, boolean monitor) throws Exception {
- this.parser = parser;
- this.poolSize = (properties.getProperty("pherf.default.threadpool") == null)
- ? PherfConstants.DEFAULT_THREAD_POOL_SIZE
- : Integer.parseInt(properties.getProperty("pherf.default.threadpool"));
+ public void add(Workload workload) throws Exception {
+ this.jobs.put(workload, pool.submit(workload.execute()));
+ }
- this.pool = Executors.newFixedThreadPool(this.poolSize);
- if (monitor) {
- initMonitor(Integer.parseInt(properties.getProperty("pherf.default.monitorFrequency")));
+ /**
+ * Blocks on waiting for all workloads to finish. If a
+ * {@link org.apache.phoenix.pherf.workload.Workload} Requires complete() to be called, it must
+ * be called prior to using this method. Otherwise it will block infinitely.
+ */
+ public void get() {
+ for (Workload workload : jobs.keySet()) {
+ get(workload);
}
}
/**
- * Executes all scenarios dataload
+ * Calls the {@link java.util.concurrent.Future#get()} method pertaining to this workflow.
+ * Once the Future competes, the workflow is removed from the list.
*
- * @throws Exception
+ * @param workload Key entry in the HashMap
*/
- public void executeDataLoad() throws Exception {
- logger.info("\n\nStarting Data Loader...");
- DataLoader dataLoader = new DataLoader(parser);
- dataLoader.execute();
+ public void get(Workload workload) {
+ try {
+ Future future = jobs.get(workload);
+ future.get();
+ jobs.remove(workload);
+ } catch (InterruptedException | ExecutionException e) {
+ logger.error("", e);
+ }
}
/**
- * Executes all scenario multi-threaded query sets
- *
- * @param queryHint
- * @throws Exception
+ * Complete all workloads in the list.
+ * Entries in the job Map will persist until {#link WorkloadExecutorNew#get()} is called
*/
- public void executeMultithreadedQueryExecutor(String queryHint, boolean export, RunMode runMode) throws Exception {
- logger.info("\n\nStarting Query Executor...");
- QueryExecutor queryExecutor = new QueryExecutor(parser);
- queryExecutor.execute(queryHint, export, runMode);
+ public void complete() {
+ for (Workload workload : jobs.keySet()) {
+ workload.complete();
+ }
}
- public void shutdown() throws Exception {
- if (null != monitor && monitor.isRunning()) {
- this.monitor.stop();
- this.monitorThread.get(60, TimeUnit.SECONDS);
- this.pool.shutdown();
- }
+ public void shutdown() {
+ // Make sure any Workloads still on pool have been properly shutdown
+ complete();
+ pool.shutdownNow();
}
- // Just used for testing
- public XMLConfigParser getParser() {
- return parser;
+ public ExecutorService getPool() {
+ return pool;
}
- private void initMonitor(int monitorFrequency) throws Exception {
- this.monitor = new MonitorManager(monitorFrequency);
- monitorThread = pool.submit(this.monitor);
+ private void init(List<Workload> workloads) throws Exception {
+ for (Workload workload : workloads) {
+ this.jobs.put(workload, pool.submit(workload.execute()));
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
new file mode 100644
index 0000000..305521b
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload;
+
+import org.apache.phoenix.pherf.PherfConstants;
+import org.apache.phoenix.pherf.configuration.Column;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.configuration.WriteParams;
+import org.apache.phoenix.pherf.configuration.XMLConfigParser;
+import org.apache.phoenix.pherf.exception.PherfException;
+import org.apache.phoenix.pherf.result.DataLoadThreadTime;
+import org.apache.phoenix.pherf.result.DataLoadTimeSummary;
+import org.apache.phoenix.pherf.result.ResultUtil;
+import org.apache.phoenix.pherf.rules.DataValue;
+import org.apache.phoenix.pherf.rules.RulesApplier;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.util.RowCalculator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.sql.*;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+public class WriteWorkload implements Workload {
+ private static final Logger logger = LoggerFactory.getLogger(WriteWorkload.class);
+ private final PhoenixUtil pUtil;
+ private final XMLConfigParser parser;
+ private final RulesApplier rulesApplier;
+ private final ResultUtil resultUtil;
+ private final ExecutorService pool;
+ private final WriteParams writeParams;
+ private final Scenario scenario;
+ private final long threadSleepDuration;
+
+ private final int threadPoolSize;
+ private final int batchSize;
+
+ public WriteWorkload(XMLConfigParser parser) throws Exception {
+ this(PhoenixUtil.create(), parser);
+ }
+
+ public WriteWorkload(PhoenixUtil util, XMLConfigParser parser) throws Exception {
+ this(util, parser, null);
+ }
+
+ public WriteWorkload(PhoenixUtil phoenixUtil, XMLConfigParser parser, Scenario scenario)
+ throws Exception {
+ this(phoenixUtil, PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES),
+ parser, scenario);
+ }
+
+ /**
+ * Default the writers to use up all available cores for threads. If writeParams are used in
+ * the config files, they will override the defaults. writeParams are used for read/write mixed
+ * workloads.
+ * TODO extract notion of the scenario list and have 1 write workload per scenario
+ *
+ * @param phoenixUtil {@link org.apache.phoenix.pherf.util.PhoenixUtil} Query helper
+ * @param properties {@link java.util.Properties} default properties to use
+ * @param parser {@link org.apache.phoenix.pherf.configuration.XMLConfigParser}
+ * @param scenario {@link org.apache.phoenix.pherf.configuration.Scenario} If null is passed
+ * it will run against all scenarios in the parsers list.
+ * @throws Exception
+ */
+ public WriteWorkload(PhoenixUtil phoenixUtil, Properties properties, XMLConfigParser parser,
+ Scenario scenario) throws Exception {
+ this.pUtil = phoenixUtil;
+ this.parser = parser;
+ this.rulesApplier = new RulesApplier(parser);
+ this.resultUtil = new ResultUtil();
+
+ // Overwrite defaults properties with those given in the configuration. This indicates the
+ // scenario is a R/W mixed workload.
+ if (scenario != null) {
+ this.scenario = scenario;
+ writeParams = scenario.getWriteParams();
+ threadSleepDuration = writeParams.getThreadSleepDuration();
+ } else {
+ writeParams = null;
+ this.scenario = null;
+ threadSleepDuration = 0;
+ }
+
+ int size = Integer.parseInt(properties.getProperty("pherf.default.dataloader.threadpool"));
+
+ this.threadPoolSize = (size == 0) ? Runtime.getRuntime().availableProcessors() : size;
+
+ // TODO Move pool management up to WorkloadExecutor
+ this.pool = Executors.newFixedThreadPool(this.threadPoolSize);
+
+ String
+ bSize =
+ (writeParams == null) || (writeParams.getBatchSize() == Long.MIN_VALUE) ?
+ properties.getProperty("pherf.default.dataloader.batchsize") :
+ String.valueOf(writeParams.getBatchSize());
+ this.batchSize =
+ (bSize == null) ? PherfConstants.DEFAULT_BATCH_SIZE : Integer.parseInt(bSize);
+ }
+
+ @Override public void complete() {
+ }
+
+ public Runnable execute() throws Exception {
+ return new Runnable() {
+ @Override public void run() {
+ try {
+ DataLoadTimeSummary dataLoadTimeSummary = new DataLoadTimeSummary();
+ DataLoadThreadTime dataLoadThreadTime = new DataLoadThreadTime();
+
+ if (WriteWorkload.this.scenario == null) {
+ for (Scenario scenario : getParser().getScenarios()) {
+ exec(dataLoadTimeSummary, dataLoadThreadTime, scenario);
+ }
+ } else {
+ exec(dataLoadTimeSummary, dataLoadThreadTime, WriteWorkload.this.scenario);
+ }
+ resultUtil.write(dataLoadTimeSummary);
+ resultUtil.write(dataLoadThreadTime);
+
+ } catch (Exception e) {
+ logger.warn("", e);
+ }
+ }
+ };
+ }
+
+ private synchronized void exec(DataLoadTimeSummary dataLoadTimeSummary,
+ DataLoadThreadTime dataLoadThreadTime, Scenario scenario) throws Exception {
+ logger.info("\nLoading " + scenario.getRowCount() + " rows for " + scenario.getTableName());
+ long start = System.currentTimeMillis();
+
+ List<Future> writeBatches = getBatches(dataLoadThreadTime, scenario);
+
+ waitForBatches(dataLoadTimeSummary, scenario, start, writeBatches);
+
+ // always update stats for Phoenix base tables
+ updatePhoenixStats(scenario.getTableName());
+ }
+
+ private List<Future> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario)
+ throws Exception {
+ RowCalculator
+ rowCalculator =
+ new RowCalculator(getThreadPoolSize(), scenario.getRowCount());
+ List<Future> writeBatches = new ArrayList<>();
+
+ for (int i = 0; i < getThreadPoolSize(); i++) {
+ List<Column>
+ phxMetaCols =
+ pUtil.getColumnsFromPhoenix(scenario.getSchemaName(),
+ scenario.getTableNameWithoutSchemaName(), pUtil.getConnection());
+ int threadRowCount = rowCalculator.getNext();
+ logger.info(
+ "Kick off thread (#" + i + ")for upsert with (" + threadRowCount + ") rows.");
+ Future<Info>
+ write =
+ upsertData(scenario, phxMetaCols, scenario.getTableName(), threadRowCount,
+ dataLoadThreadTime);
+ writeBatches.add(write);
+ }
+ if (writeBatches.isEmpty()) {
+ throw new PherfException(
+ "Holy shit snacks! Throwing up hands in disbelief and exiting. Could not write data for some unknown reason.");
+ }
+
+ return writeBatches;
+ }
+
+ private void waitForBatches(DataLoadTimeSummary dataLoadTimeSummary, Scenario scenario,
+ long start, List<Future> writeBatches)
+ throws InterruptedException, java.util.concurrent.ExecutionException {
+ int sumRows = 0, sumDuration = 0;
+ // Wait for all the batch threads to complete
+ for (Future<Info> write : writeBatches) {
+ Info writeInfo = write.get();
+ sumRows += writeInfo.getRowCount();
+ sumDuration += writeInfo.getDuration();
+ logger.info("Executor (" + this.hashCode() + ") writes complete with row count ("
+ + writeInfo.getRowCount() + ") in Ms (" + writeInfo.getDuration() + ")");
+ }
+ logger.info("Writes completed with total row count (" + sumRows + ") with total time of("
+ + sumDuration + ") Ms");
+ dataLoadTimeSummary
+ .add(scenario.getTableName(), sumRows, (int) (System.currentTimeMillis() - start));
+ }
+
+ /**
+ * TODO Move this method to PhoenixUtil
+ * Update Phoenix table stats
+ *
+ * @param tableName
+ * @throws Exception
+ */
+ public void updatePhoenixStats(String tableName) throws Exception {
+ logger.info("Updating stats for " + tableName);
+ pUtil.executeStatement("UPDATE STATISTICS " + tableName);
+ }
+
+ public Future<Info> upsertData(final Scenario scenario, final List<Column> columns,
+ final String tableName, final int rowCount,
+ final DataLoadThreadTime dataLoadThreadTime) {
+ Future<Info> future = pool.submit(new Callable<Info>() {
+ @Override public Info call() throws Exception {
+ int rowsCreated = 0;
+ long start = 0, duration, totalDuration;
+ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ Connection connection = null;
+ try {
+ connection = pUtil.getConnection();
+ long logStartTime = System.currentTimeMillis();
+ long
+ maxDuration =
+ (WriteWorkload.this.writeParams == null) ?
+ Long.MAX_VALUE :
+ WriteWorkload.this.writeParams.getExecutionDurationInMs();
+
+ for (long i = rowCount; (i > 0) && ((System.currentTimeMillis() - logStartTime)
+ < maxDuration); i--) {
+ String sql = buildSql(columns, tableName);
+ PreparedStatement stmt = connection.prepareStatement(sql);
+ stmt = buildStatement(scenario, columns, stmt, simpleDateFormat);
+ start = System.currentTimeMillis();
+ rowsCreated += stmt.executeUpdate();
+ stmt.close();
+ if ((i % getBatchSize()) == 0) {
+ connection.commit();
+ duration = System.currentTimeMillis() - start;
+ logger.info("Writer (" + Thread.currentThread().getName()
+ + ") committed Batch. Total " + getBatchSize()
+ + " rows for this thread (" + this.hashCode() + ") in ("
+ + duration + ") Ms");
+
+ if (i % PherfConstants.LOG_PER_NROWS == 0 && i != 0) {
+ dataLoadThreadTime
+ .add(tableName, Thread.currentThread().getName(), i,
+ System.currentTimeMillis() - logStartTime);
+ logStartTime = System.currentTimeMillis();
+ }
+
+ // Pause for throttling if configured to do so
+ Thread.sleep(threadSleepDuration);
+ }
+ }
+ } finally {
+ if (connection != null) {
+ try {
+ connection.commit();
+ duration = System.currentTimeMillis() - start;
+ logger.info("Writer ( " + Thread.currentThread().getName()
+ + ") committed Final Batch. Duration (" + duration + ") Ms");
+ connection.close();
+ } catch (SQLException e) {
+ // Swallow since we are closing anyway
+ e.printStackTrace();
+ }
+ }
+ }
+ totalDuration = System.currentTimeMillis() - start;
+ return new Info(totalDuration, rowsCreated);
+ }
+ });
+ return future;
+ }
+
+ private PreparedStatement buildStatement(Scenario scenario, List<Column> columns,
+ PreparedStatement statement, SimpleDateFormat simpleDateFormat) throws Exception {
+ int count = 1;
+ for (Column column : columns) {
+
+ DataValue dataValue = getRulesApplier().getDataForRule(scenario, column);
+ switch (column.getType()) {
+ case VARCHAR:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.VARCHAR);
+ } else {
+ statement.setString(count, dataValue.getValue());
+ }
+ break;
+ case CHAR:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.CHAR);
+ } else {
+ statement.setString(count, dataValue.getValue());
+ }
+ break;
+ case DECIMAL:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.DECIMAL);
+ } else {
+ statement.setBigDecimal(count, new BigDecimal(dataValue.getValue()));
+ }
+ break;
+ case INTEGER:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.INTEGER);
+ } else {
+ statement.setInt(count, Integer.parseInt(dataValue.getValue()));
+ }
+ break;
+ case DATE:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.DATE);
+ } else {
+ Date
+ date =
+ new java.sql.Date(
+ simpleDateFormat.parse(dataValue.getValue()).getTime());
+ statement.setDate(count, date);
+ }
+ break;
+ default:
+ break;
+ }
+ count++;
+ }
+ return statement;
+ }
+
+ private String buildSql(final List<Column> columns, final String tableName) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("upsert into ");
+ builder.append(tableName);
+ builder.append(" (");
+ int count = 1;
+ for (Column column : columns) {
+ builder.append(column.getName());
+ if (count < columns.size()) {
+ builder.append(",");
+ } else {
+ builder.append(")");
+ }
+ count++;
+ }
+ builder.append(" VALUES (");
+ for (int i = 0; i < columns.size(); i++) {
+ if (i < columns.size() - 1) {
+ builder.append("?,");
+ } else {
+ builder.append("?)");
+ }
+ }
+ return builder.toString();
+ }
+
+ public XMLConfigParser getParser() {
+ return parser;
+ }
+
+ public RulesApplier getRulesApplier() {
+ return rulesApplier;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public int getThreadPoolSize() {
+ return threadPoolSize;
+ }
+
+ private class Info {
+
+ private final int rowCount;
+ private final long duration;
+
+ public Info(long duration, int rows) {
+ this.duration = duration;
+ this.rowCount = rows;
+ }
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public int getRowCount() {
+ return rowCount;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
index 9514089..8f93685 100644
--- a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
+++ b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
@@ -304,6 +304,41 @@
</column>
</datamapping>
<scenarios>
+ <scenario tableName="PHERF.PHERF_PROD_TEST_UNSALTED" rowCount="100" name="readWriteScenario">
+ <!-- Scenario level rule overrides will be unsupported in V1.
+ You can use the general datamappings in the mean time-->
+ <dataOverride>
+ <column>
+ <type>VARCHAR</type>
+ <userDefined>true</userDefined>
+ <dataSequence>LIST</dataSequence>
+ <name>TENANT_ID</name>
+ </column>
+ </dataOverride>
+ <writeParams executionDurationInMs="10000">
+ <!--
+ Number of writer it insert into the threadpool
+ -->
+ <writerThreadCount>5</writerThreadCount>
+
+ <!--
+ Time in Ms that each thread will sleep between batch writes. This helps to
+ throttle writers.
+ -->
+ <threadSleepDuration>10</threadSleepDuration>
+
+ <batchSize>100</batchSize>
+ </writeParams>
+ <!--Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first -->
+ <querySet concurrency="1" executionType="PARALLEL" executionDurationInMs="60000"
+ numberOfExecutions="100">
+ <!-- Aggregate queries on a per tenant basis -->
+ <query tenantId="00Dxx0000001gER"
+ ddl="CREATE VIEW IF NOT EXISTS PHERF.PHERF_TEST_VIEW_UNSALTED AS SELECT * FROM PHERF.PHERF_PROD_TEST_UNSALTED"
+ statement="select count(*) from PHERF.PHERF_TEST_VIEW_UNSALTED"/>
+ </querySet>
+
+ </scenario>
<scenario tableName="PHERF.PHERF_PROD_TEST_UNSALTED" rowCount="10">
<!-- Scenario level rule overrides will be unsupported in V1.
You can use the general datamappings in the mean time-->
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
index f362842..6f25fbd 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.pherf;
+import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -27,7 +28,6 @@ import java.util.List;
import org.apache.phoenix.pherf.configuration.*;
import org.apache.phoenix.pherf.rules.DataValue;
-import org.apache.phoenix.pherf.workload.WorkloadExecutor;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,53 +38,55 @@ import javax.xml.bind.Marshaller;
import static org.junit.Assert.*;
-public class ConfigurationParserTest extends ResultBaseTest{
+public class ConfigurationParserTest extends ResultBaseTest {
private static final Logger logger = LoggerFactory.getLogger(ConfigurationParserTest.class);
@Test
- public void testConfigFilesParsing() {
- try {
- WorkloadExecutor workloadExec = new WorkloadExecutor();
- List<Scenario> scenarioList = workloadExec.getParser().getScenarios();
- assertTrue("Could not load the scenarios from xml.", (scenarioList != null) && (scenarioList.size() > 0));
- logger.info("Number of scenarios loaded: " + scenarioList.size());
-
- } catch (Exception e) {
- e.printStackTrace();
- fail();
+ public void testReadWriteWorkloadReader() throws Exception {
+ String scenarioName = "testScenarioRW";
+ List<Scenario> scenarioList = getScenarios();
+ Scenario target = null;
+ for (Scenario scenario : scenarioList) {
+ if (scenarioName.equals(scenario.getName())) {
+ target = scenario;
+ }
}
+ assertNotNull("Could not find scenario: " + scenarioName, target);
+ WriteParams params = target.getWriteParams();
+
+ assertNotNull("Could not find writeParams in scenario: " + scenarioName, params);
+ assertNotNull("Could not find batch size: ", params.getBatchSize());
+ assertNotNull("Could not find execution duration: ", params.getExecutionDurationInMs());
+ assertNotNull("Could not find sleep duration: ", params.getThreadSleepDuration());
+ assertNotNull("Could not find writer count: ", params.getWriterThreadCount());
}
- @Test
+ @Test
// TODO Break this into multiple smaller tests.
- public void testConfigReader(){
- URL resourceUrl = getClass().getResource("/scenario/test_scenario.xml");
- assertNotNull("Test data XML file is missing", resourceUrl);
-
- try {
+ public void testConfigReader() {
+ try {
logger.debug("DataModel: " + writeXML());
- Path resourcePath = Paths.get(resourceUrl.toURI());
- DataModel data = XMLConfigParser.readDataModel(resourcePath);
- List<Scenario> scenarioList = data.getScenarios();
- assertTrue("Could not load the scenarios from xml.", (scenarioList != null) && (scenarioList.size() > 0));
- List<Column> dataMappingColumns = data.getDataMappingColumns();
- assertTrue("Could not load the data columns from xml.", (dataMappingColumns != null) && (dataMappingColumns.size() > 0));
+ List<Scenario> scenarioList = getScenarios();
+ List<Column> dataMappingColumns = getDataModel().getDataMappingColumns();
+ assertTrue("Could not load the data columns from xml.",
+ (dataMappingColumns != null) && (dataMappingColumns.size() > 0));
assertTrue("Could not load the data DataValue list from xml.",
(dataMappingColumns.get(6).getDataValues() != null)
- && (dataMappingColumns.get(6).getDataValues().size() > 0));
+ && (dataMappingColumns.get(6).getDataValues().size() > 0));
assertDateValue(dataMappingColumns);
// Validate column mappings
for (Column column : dataMappingColumns) {
- assertNotNull("Column ("+ column.getName() + ") is missing its type",column.getType());
+ assertNotNull("Column (" + column.getName() + ") is missing its type",
+ column.getType());
}
- Scenario scenario = scenarioList.get(0);
+ Scenario scenario = scenarioList.get(1);
assertNotNull(scenario);
assertEquals("PHERF.TEST_TABLE", scenario.getTableName());
- assertEquals(10, scenario.getRowCount());
+ assertEquals(30, scenario.getRowCount());
assertEquals(1, scenario.getDataOverride().getColumn().size());
QuerySet qs = scenario.getQuerySet().get(0);
assertEquals(ExecutionType.SERIAL, qs.getExecutionType());
@@ -99,27 +101,50 @@ public class ConfigurationParserTest extends ResultBaseTest{
assertEquals("select count(*) from PHERF.TEST_TABLE", firstQuery.getStatement());
assertEquals("123456789012345", firstQuery.getTenantId());
assertEquals(null, firstQuery.getDdl());
- assertEquals(0, (long)firstQuery.getExpectedAggregateRowCount());
+ assertEquals(0, (long) firstQuery.getExpectedAggregateRowCount());
Query secondQuery = qs.getQuery().get(1);
- assertEquals("Could not get statement.", "select sum(SOME_INT) from PHERF.TEST_TABLE", secondQuery.getStatement());
+ assertEquals("Could not get statement.", "select sum(SOME_INT) from PHERF.TEST_TABLE",
+ secondQuery.getStatement());
assertEquals("Could not get queryGroup.", "g1", secondQuery.getQueryGroup());
// Make sure anything in the overrides matches a real column in the data mappings
DataOverride override = scenario.getDataOverride();
for (Column column : override.getColumn()) {
- assertTrue("Could not lookup Column (" + column.getName() + ") in DataMapping columns: " + dataMappingColumns, dataMappingColumns.contains(column));
+ assertTrue("Could not lookup Column (" + column.getName()
+ + ") in DataMapping columns: " + dataMappingColumns,
+ dataMappingColumns.contains(column));
}
- } catch (Exception e) {
- e.printStackTrace();
- fail();
- }
- }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ private URL getResourceUrl() {
+ URL resourceUrl = getClass().getResource("/scenario/test_scenario.xml");
+ assertNotNull("Test data XML file is missing", resourceUrl);
+ return resourceUrl;
+ }
+
+ private List<Scenario> getScenarios() throws URISyntaxException, JAXBException{
+ DataModel data = getDataModel();
+ List<Scenario> scenarioList = data.getScenarios();
+ assertTrue("Could not load the scenarios from xml.",
+ (scenarioList != null) && (scenarioList.size() > 0));
+ return scenarioList;
+ }
+
+ private DataModel getDataModel() throws URISyntaxException, JAXBException {
+ Path resourcePath = Paths.get(getResourceUrl().toURI());
+ return XMLConfigParser.readDataModel(resourcePath);
+ }
private void assertDateValue(List<Column> dataMappingColumns) {
for (Column dataMapping : dataMappingColumns) {
- if ((dataMapping.getType() == DataTypeMapping.DATE) && (dataMapping.getName().equals("CREATED_DATE"))) {
+ if ((dataMapping.getType() == DataTypeMapping.DATE) && (dataMapping.getName()
+ .equals("CREATED_DATE"))) {
// First rule should have min/max set
assertNotNull(dataMapping.getDataValues().get(0).getMinValue());
assertNotNull(dataMapping.getDataValues().get(0).getMaxValue());
@@ -139,7 +164,7 @@ public class ConfigurationParserTest extends ResultBaseTest{
/*
Used for debugging to dump out a simple xml filed based on the bound objects.
*/
- private String writeXML() {
+ private String writeXML() {
DataModel data = new DataModel();
try {
DataValue dataValue = new DataValue();
@@ -156,7 +181,6 @@ public class ConfigurationParserTest extends ResultBaseTest{
List<Column> columnList = new ArrayList<>();
columnList.add(column);
- data.setRelease("192");
data.setDataMappingColumns(columnList);
Scenario scenario = new Scenario();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
index a202437..4ccf95c 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
@@ -33,7 +33,6 @@ import org.apache.phoenix.pherf.result.file.ResultFileDetails;
import org.apache.phoenix.pherf.result.impl.CSVResultHandler;
import org.apache.phoenix.pherf.result.impl.XMLResultHandler;
import org.apache.phoenix.pherf.result.*;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.phoenix.pherf.configuration.Query;
@@ -72,7 +71,7 @@ public class ResultTest extends ResultBaseTest {
public void testMonitorResult() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
MonitorManager monitor = new MonitorManager(100);
- Future future = executorService.submit(monitor);
+ Future future = executorService.submit(monitor.execute());
List<Result> records;
final int TIMEOUT = 30;
@@ -83,7 +82,7 @@ public class ResultTest extends ResultBaseTest {
Thread.sleep(100);
if (ct == max) {
int timer = 0;
- monitor.stop();
+ monitor.complete();
while (monitor.isRunning() && (timer < TIMEOUT)) {
System.out.println("Waiting for monitor to finish. Seconds Waited :" + timer);
Thread.sleep(1000);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
index 15d4608..92604d4 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
@@ -19,7 +19,7 @@
package org.apache.phoenix.pherf;
import org.apache.phoenix.pherf.configuration.*;
-import org.apache.phoenix.pherf.loaddata.DataLoader;
+import org.apache.phoenix.pherf.workload.WriteWorkload;
import org.apache.phoenix.pherf.rules.DataValue;
import org.apache.phoenix.pherf.rules.RulesApplier;
import org.apache.phoenix.pherf.util.PhoenixUtil;
@@ -28,20 +28,19 @@ import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.junit.Test;
-import java.sql.Types;
import java.util.*;
import static org.junit.Assert.*;
public class RuleGeneratorTest {
- static PhoenixUtil util = new PhoenixUtil(true);
- static final String matcherScenario = PherfConstants.SCENARIO_ROOT_PATTERN + ".xml";
+ private static PhoenixUtil util = PhoenixUtil.create(true);
+ private static final String matcherScenario = PherfConstants.SCENARIO_ROOT_PATTERN + ".xml";
@Test
public void testDateGenerator() throws Exception {
XMLConfigParser parser = new XMLConfigParser(matcherScenario);
DataModel model = parser.getDataModels().get(0);
- DataLoader loader = new DataLoader(parser);
+ WriteWorkload loader = new WriteWorkload(parser);
RulesApplier rulesApplier = loader.getRulesApplier();
for (Column dataMapping : model.getDataMappingColumns()) {
@@ -68,7 +67,7 @@ public class RuleGeneratorTest {
public void testNullChance() throws Exception {
XMLConfigParser parser = new XMLConfigParser(matcherScenario);
DataModel model = parser.getDataModels().get(0);
- DataLoader loader = new DataLoader(parser);
+ WriteWorkload loader = new WriteWorkload(parser);
RulesApplier rulesApplier = loader.getRulesApplier();
int sampleSize = 100;
List<String> values = new ArrayList<>(sampleSize);
@@ -96,7 +95,7 @@ public class RuleGeneratorTest {
public void testSequentialDataSequence() throws Exception {
XMLConfigParser parser = new XMLConfigParser(matcherScenario);
DataModel model = parser.getDataModels().get(0);
- DataLoader loader = new DataLoader(parser);
+ WriteWorkload loader = new WriteWorkload(parser);
RulesApplier rulesApplier = loader.getRulesApplier();
Column targetColumn = null;
@@ -181,7 +180,7 @@ public class RuleGeneratorTest {
expectedValues.add("cCCyYhnNbBs9kWr");
XMLConfigParser parser = new XMLConfigParser(".*test_scenario.xml");
- DataLoader loader = new DataLoader(parser);
+ WriteWorkload loader = new WriteWorkload(parser);
RulesApplier rulesApplier = loader.getRulesApplier();
Scenario scenario = parser.getScenarios().get(0);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
index 45d36d2..fddf022 100644
--- a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
+++ b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
@@ -127,10 +127,50 @@
<name>NEWVAL_STRING</name>
<prefix>TSTPRFX</prefix>
</column>
-
</datamapping>
<scenarios>
- <scenario tableName="PHERF.TEST_TABLE" rowCount="10" name="testScenario">
+ <scenario tableName="PHERF.TEST_TABLE" rowCount="100" name="testScenarioRW">
+ <!-- Scenario level rule overrides will be unsupported in V1.
+ You can use the general datamappings in the mean time-->
+ <dataOverride>
+ <column>
+ <type>VARCHAR</type>
+ <userDefined>true</userDefined>
+ <dataSequence>RANDOM</dataSequence>
+ <length>10</length>
+ <name>FIELD</name>
+ </column>
+ </dataOverride>
+
+ <!--
+ This is used to add mixed R/W workloads.
+
+ If this tag exists, a writer pool will be created based on the below properties.
+ These props will override the default values in pherf.properties, but only for this
+ scenario.The write jobs will run in conjunction with the querySet below.
+ -->
+ <writeParams executionDurationInMs="10000">
+ <!--
+ Number of writer it insert into the threadpool
+ -->
+ <writerThreadCount>2</writerThreadCount>
+
+ <!--
+ Time in Ms that each thread will sleep between batch writes. This helps to
+ throttle writers.
+ -->
+ <threadSleepDuration>10</threadSleepDuration>
+
+ <batchSize>1000</batchSize>
+ </writeParams>
+ <querySet concurrency="1" executionType="PARALLEL" executionDurationInMs="10000">
+ <query id="q3" statement="select count(*) from PHERF.TEST_TABLE"/>
+ <query id="q4" statement="select sum(DIVISION) from PHERF.TEST_TABLE"/>
+ </querySet>
+
+ </scenario>
+
+ <scenario tableName="PHERF.TEST_TABLE" rowCount="30" name="testScenario">
<!-- Scenario level rule overrides will be unsupported in V1.
You can use the general datamappings in the mean time-->
<dataOverride>
@@ -145,16 +185,20 @@
<!--Note: 1. Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first
2. DDL included in query are executed only once on start of querySet execution.
-->
- <querySet concurrency="1-3" executionType="SERIAL" executionDurationInMs="5000" numberOfExecutions="100">
- <query id="q1" tenantId="123456789012345" expectedAggregateRowCount="0" statement="select count(*) from PHERF.TEST_TABLE"/>
+ <querySet concurrency="1-3" executionType="SERIAL" executionDurationInMs="5000"
+ numberOfExecutions="100">
+ <query id="q1" tenantId="123456789012345" expectedAggregateRowCount="0"
+ statement="select count(*) from PHERF.TEST_TABLE"/>
<!-- queryGroup is a way to organize queries across tables or scenario files.
The value will be dumped to results. This gives a value to group by on reporting to compare queries -->
- <query id="q2" queryGroup="g1" statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
+ <query id="q2" queryGroup="g1"
+ statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
</querySet>
<!--Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first -->
- <querySet concurrency="2-3" executionType="PARALLEL" executionDurationInMs="10000" numberOfExecutions="10">
+ <querySet concurrency="2-3" executionType="PARALLEL" executionDurationInMs="10000"
+ numberOfExecutions="10">
<query id="q3" statement="select count(*) from PHERF.TEST_TABLE"/>
- <query id="q4" statement="select sum(DIVISION) from PHERF.TEST_TABLE"/>
+ <query id="q4" statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
</querySet>
</scenario>
</scenarios>