You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by co...@apache.org on 2015/06/20 01:35:28 UTC
[1/3] phoenix git commit: PHOENIX-1920 - Pherf - Add support for
mixed r/w workloads
Repository: phoenix
Updated Branches:
refs/heads/master 466eeb35f -> 7175dcbc0
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java
index 78f18ca..c9333a0 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java
@@ -43,153 +43,160 @@ import difflib.DiffUtils;
import difflib.Patch;
public class QueryVerifier {
- private PhoenixUtil pUtil = new PhoenixUtil();
- private static final Logger logger = LoggerFactory
- .getLogger(QueryVerifier.class);
- private boolean useTemporaryOutput;
- private String directoryLocation;
-
- public QueryVerifier(boolean useTemporaryOutput) {
- this.useTemporaryOutput = useTemporaryOutput;
- this.directoryLocation = this.useTemporaryOutput ?
- PherfConstants.EXPORT_TMP : PherfConstants.EXPORT_DIR;
-
- ensureBaseDirExists();
- }
-
- /***
- * Export query resultSet to CSV file
- * @param query
- * @throws Exception
- */
- public String exportCSV(Query query) throws Exception {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- String fileName = getFileName(query);
- FileOutputStream fos = new FileOutputStream(fileName);
- try {
- conn = pUtil.getConnection(query.getTenantId());
- statement = conn.prepareStatement(query.getStatement());
- boolean isQuery = statement.execute();
- if (isQuery) {
- rs = statement.executeQuery();
- int columnCount = rs.getMetaData().getColumnCount();
- while (rs.next()) {
- for (int columnNum = 1; columnNum <= columnCount; columnNum++) {
- fos.write((rs.getString(columnNum) + PherfConstants.RESULT_FILE_DELIMETER).getBytes());
- }
- fos.write(PherfConstants.NEW_LINE.getBytes());
- }
- } else {
- conn.commit();
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (rs != null) rs.close();
- if (statement != null) statement.close();
- if (conn != null) conn.close();
- fos.flush();
- fos.close();
- }
- return fileName;
- }
-
- /***
- * Do a diff between exported query results and temporary CSV file
- * @param query
- * @param newCSV
- * @return
- */
- public boolean doDiff(Query query, String newCSV) {
+ private PhoenixUtil pUtil = PhoenixUtil.create();
+ private static final Logger logger = LoggerFactory.getLogger(QueryVerifier.class);
+ private boolean useTemporaryOutput;
+ private String directoryLocation;
+
+ public QueryVerifier(boolean useTemporaryOutput) {
+ this.useTemporaryOutput = useTemporaryOutput;
+ this.directoryLocation =
+ this.useTemporaryOutput ? PherfConstants.EXPORT_TMP : PherfConstants.EXPORT_DIR;
+
+ ensureBaseDirExists();
+ }
+
+ /**
+ * Export query resultSet to CSV file
+ *
+ * @param query
+ * @throws Exception
+ */
+ public String exportCSV(Query query) throws Exception {
+ Connection conn = null;
+ PreparedStatement statement = null;
+ ResultSet rs = null;
+ String fileName = getFileName(query);
+ FileOutputStream fos = new FileOutputStream(fileName);
+ try {
+ conn = pUtil.getConnection(query.getTenantId());
+ statement = conn.prepareStatement(query.getStatement());
+ boolean isQuery = statement.execute();
+ if (isQuery) {
+ rs = statement.executeQuery();
+ int columnCount = rs.getMetaData().getColumnCount();
+ while (rs.next()) {
+ for (int columnNum = 1; columnNum <= columnCount; columnNum++) {
+ fos.write((rs.getString(columnNum) + PherfConstants.RESULT_FILE_DELIMETER)
+ .getBytes());
+ }
+ fos.write(PherfConstants.NEW_LINE.getBytes());
+ }
+ } else {
+ conn.commit();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (rs != null) rs.close();
+ if (statement != null) statement.close();
+ if (conn != null) conn.close();
+ fos.flush();
+ fos.close();
+ }
+ return fileName;
+ }
+
+ /**
+ * Do a diff between exported query results and temporary CSV file
+ *
+ * @param query
+ * @param newCSV
+ * @return
+ */
+ public boolean doDiff(Query query, String newCSV) {
List<String> original = fileToLines(getCSVName(query, PherfConstants.EXPORT_DIR, ""));
- List<String> newLines = fileToLines(newCSV);
-
+ List<String> newLines = fileToLines(newCSV);
+
Patch patch = DiffUtils.diff(original, newLines);
if (patch.getDeltas().isEmpty()) {
- logger.info("Match: " + query.getId() + " with " + newCSV);
- return true;
+ logger.info("Match: " + query.getId() + " with " + newCSV);
+ return true;
} else {
- logger.error("DIFF FAILED: " + query.getId() + " with " + newCSV);
- return false;
+ logger.error("DIFF FAILED: " + query.getId() + " with " + newCSV);
+ return false;
}
- }
-
- /***
- * Helper method to load file
- * @param filename
- * @return
- */
+ }
+
+ /**
+ * Helper method to load file
+ *
+ * @param filename
+ * @return
+ */
private static List<String> fileToLines(String filename) {
- List<String> lines = new LinkedList<String>();
- String line = "";
- try {
- BufferedReader in = new BufferedReader(new FileReader(filename));
- while ((line = in.readLine()) != null) {
- lines.add(line);
- }
- in.close();
- } catch (IOException e) {
- e.printStackTrace();
+ List<String> lines = new LinkedList<String>();
+ String line = "";
+ try {
+ BufferedReader in = new BufferedReader(new FileReader(filename));
+ while ((line = in.readLine()) != null) {
+ lines.add(line);
}
-
- return lines;
+ in.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return lines;
}
/**
* Get explain plan for a query
+ *
* @param query
* @return
* @throws SQLException
*/
- public String getExplainPlan(Query query) throws SQLException {
- Connection conn = null;
- ResultSet rs = null;
- PreparedStatement statement = null;
- StringBuilder buf = new StringBuilder();
- try {
- conn = pUtil.getConnection(query.getTenantId());
- statement = conn.prepareStatement("EXPLAIN " + query.getStatement());
- rs = statement.executeQuery();
- while (rs.next()) {
- buf.append(rs.getString(1).trim().replace(",", "-"));
- }
- statement.close();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (rs != null) rs.close();
- if (statement != null) statement.close();
- if (conn != null) conn.close();
- }
- return buf.toString();
- }
-
- /***
+ public String getExplainPlan(Query query) throws SQLException {
+ Connection conn = null;
+ ResultSet rs = null;
+ PreparedStatement statement = null;
+ StringBuilder buf = new StringBuilder();
+ try {
+ conn = pUtil.getConnection(query.getTenantId());
+ statement = conn.prepareStatement("EXPLAIN " + query.getStatement());
+ rs = statement.executeQuery();
+ while (rs.next()) {
+ buf.append(rs.getString(1).trim().replace(",", "-"));
+ }
+ statement.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (rs != null) rs.close();
+ if (statement != null) statement.close();
+ if (conn != null) conn.close();
+ }
+ return buf.toString();
+ }
+
+ /**
* Helper method to generate CSV file name
+ *
* @param query
* @return
* @throws FileNotFoundException
*/
- private String getFileName(Query query) throws FileNotFoundException {
- String tempExt = "";
- if (this.useTemporaryOutput) {
- tempExt = "_" + java.util.UUID.randomUUID().toString();
- }
- return getCSVName(query, this.directoryLocation, tempExt);
- }
-
- private String getCSVName(Query query, String directory, String tempExt) {
- String csvFile = directory + PherfConstants.PATH_SEPARATOR
- + query.getId() + tempExt + Extension.CSV.toString();
- return csvFile;
- }
-
- private void ensureBaseDirExists() {
- File baseDir = new File(this.directoryLocation);
- if (!baseDir.exists()) {
- baseDir.mkdir();
- }
- }
+ private String getFileName(Query query) throws FileNotFoundException {
+ String tempExt = "";
+ if (this.useTemporaryOutput) {
+ tempExt = "_" + java.util.UUID.randomUUID().toString();
+ }
+ return getCSVName(query, this.directoryLocation, tempExt);
+ }
+
+ private String getCSVName(Query query, String directory, String tempExt) {
+ String
+ csvFile =
+ directory + PherfConstants.PATH_SEPARATOR + query.getId() + tempExt + Extension.CSV
+ .toString();
+ return csvFile;
+ }
+
+ private void ensureBaseDirExists() {
+ File baseDir = new File(this.directoryLocation);
+ if (!baseDir.exists()) {
+ baseDir.mkdir();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
new file mode 100644
index 0000000..16a493e
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
@@ -0,0 +1,10 @@
+package org.apache.phoenix.pherf.workload;
+
+public interface Workload {
+ public Runnable execute() throws Exception;
+
+ /**
+ * Use this method to perform any cleanup or forced shutdown of the thread.
+ */
+ public void complete();
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
index cf2f038..a65b4aa 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
@@ -19,95 +19,96 @@
package org.apache.phoenix.pherf.workload;
import org.apache.phoenix.pherf.PherfConstants;
-import org.apache.phoenix.pherf.PherfConstants.RunMode;
-import org.apache.phoenix.pherf.configuration.XMLConfigParser;
-import org.apache.phoenix.pherf.jmx.MonitorManager;
-import org.apache.phoenix.pherf.loaddata.DataLoader;
-
-import org.apache.phoenix.pherf.util.ResourceList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
public class WorkloadExecutor {
private static final Logger logger = LoggerFactory.getLogger(WorkloadExecutor.class);
- private final XMLConfigParser parser;
- private MonitorManager monitor;
- private Future monitorThread;
private final int poolSize;
- private final ExecutorService pool;
+ // Jobs can be accessed by multiple threads
+ private final Map<Workload, Future> jobs = new ConcurrentHashMap<>();
+ private final ExecutorService pool;
public WorkloadExecutor() throws Exception {
this(PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES));
}
- public WorkloadExecutor(Properties properties) throws Exception{
- this(properties,PherfConstants.DEFAULT_FILE_PATTERN);
+ public WorkloadExecutor(Properties properties) throws Exception {
+ this(properties, new ArrayList());
}
- public WorkloadExecutor(Properties properties, String filePattern) throws Exception {
- this(properties,
- new XMLConfigParser(filePattern),
- true);
+ public WorkloadExecutor(Properties properties, List<Workload> workloads) throws Exception {
+ this.poolSize =
+ (properties.getProperty("pherf.default.threadpool") == null) ?
+ PherfConstants.DEFAULT_THREAD_POOL_SIZE :
+ Integer.parseInt(properties.getProperty("pherf.default.threadpool"));
+
+ this.pool = Executors.newFixedThreadPool(this.poolSize);
+ init(workloads);
}
- public WorkloadExecutor(Properties properties, XMLConfigParser parser, boolean monitor) throws Exception {
- this.parser = parser;
- this.poolSize = (properties.getProperty("pherf.default.threadpool") == null)
- ? PherfConstants.DEFAULT_THREAD_POOL_SIZE
- : Integer.parseInt(properties.getProperty("pherf.default.threadpool"));
+ public void add(Workload workload) throws Exception {
+ this.jobs.put(workload, pool.submit(workload.execute()));
+ }
- this.pool = Executors.newFixedThreadPool(this.poolSize);
- if (monitor) {
- initMonitor(Integer.parseInt(properties.getProperty("pherf.default.monitorFrequency")));
+ /**
+ * Blocks on waiting for all workloads to finish. If a
+ * {@link org.apache.phoenix.pherf.workload.Workload} Requires complete() to be called, it must
+ * be called prior to using this method. Otherwise it will block infinitely.
+ */
+ public void get() {
+ for (Workload workload : jobs.keySet()) {
+ get(workload);
}
}
/**
- * Executes all scenarios dataload
+ * Calls the {@link java.util.concurrent.Future#get()} method pertaining to this workflow.
+ * Once the Future competes, the workflow is removed from the list.
*
- * @throws Exception
+ * @param workload Key entry in the HashMap
*/
- public void executeDataLoad() throws Exception {
- logger.info("\n\nStarting Data Loader...");
- DataLoader dataLoader = new DataLoader(parser);
- dataLoader.execute();
+ public void get(Workload workload) {
+ try {
+ Future future = jobs.get(workload);
+ future.get();
+ jobs.remove(workload);
+ } catch (InterruptedException | ExecutionException e) {
+ logger.error("", e);
+ }
}
/**
- * Executes all scenario multi-threaded query sets
- *
- * @param queryHint
- * @throws Exception
+ * Complete all workloads in the list.
+ * Entries in the job Map will persist until {#link WorkloadExecutorNew#get()} is called
*/
- public void executeMultithreadedQueryExecutor(String queryHint, boolean export, RunMode runMode) throws Exception {
- logger.info("\n\nStarting Query Executor...");
- QueryExecutor queryExecutor = new QueryExecutor(parser);
- queryExecutor.execute(queryHint, export, runMode);
+ public void complete() {
+ for (Workload workload : jobs.keySet()) {
+ workload.complete();
+ }
}
- public void shutdown() throws Exception {
- if (null != monitor && monitor.isRunning()) {
- this.monitor.stop();
- this.monitorThread.get(60, TimeUnit.SECONDS);
- this.pool.shutdown();
- }
+ public void shutdown() {
+ // Make sure any Workloads still on pool have been properly shutdown
+ complete();
+ pool.shutdownNow();
}
- // Just used for testing
- public XMLConfigParser getParser() {
- return parser;
+ public ExecutorService getPool() {
+ return pool;
}
- private void initMonitor(int monitorFrequency) throws Exception {
- this.monitor = new MonitorManager(monitorFrequency);
- monitorThread = pool.submit(this.monitor);
+ private void init(List<Workload> workloads) throws Exception {
+ for (Workload workload : workloads) {
+ this.jobs.put(workload, pool.submit(workload.execute()));
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
new file mode 100644
index 0000000..305521b
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload;
+
+import org.apache.phoenix.pherf.PherfConstants;
+import org.apache.phoenix.pherf.configuration.Column;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.configuration.WriteParams;
+import org.apache.phoenix.pherf.configuration.XMLConfigParser;
+import org.apache.phoenix.pherf.exception.PherfException;
+import org.apache.phoenix.pherf.result.DataLoadThreadTime;
+import org.apache.phoenix.pherf.result.DataLoadTimeSummary;
+import org.apache.phoenix.pherf.result.ResultUtil;
+import org.apache.phoenix.pherf.rules.DataValue;
+import org.apache.phoenix.pherf.rules.RulesApplier;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.util.RowCalculator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.sql.*;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+public class WriteWorkload implements Workload {
+ private static final Logger logger = LoggerFactory.getLogger(WriteWorkload.class);
+ private final PhoenixUtil pUtil;
+ private final XMLConfigParser parser;
+ private final RulesApplier rulesApplier;
+ private final ResultUtil resultUtil;
+ private final ExecutorService pool;
+ private final WriteParams writeParams;
+ private final Scenario scenario;
+ private final long threadSleepDuration;
+
+ private final int threadPoolSize;
+ private final int batchSize;
+
+ public WriteWorkload(XMLConfigParser parser) throws Exception {
+ this(PhoenixUtil.create(), parser);
+ }
+
+ public WriteWorkload(PhoenixUtil util, XMLConfigParser parser) throws Exception {
+ this(util, parser, null);
+ }
+
+ public WriteWorkload(PhoenixUtil phoenixUtil, XMLConfigParser parser, Scenario scenario)
+ throws Exception {
+ this(phoenixUtil, PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES),
+ parser, scenario);
+ }
+
+ /**
+ * Default the writers to use up all available cores for threads. If writeParams are used in
+ * the config files, they will override the defaults. writeParams are used for read/write mixed
+ * workloads.
+ * TODO extract notion of the scenario list and have 1 write workload per scenario
+ *
+ * @param phoenixUtil {@link org.apache.phoenix.pherf.util.PhoenixUtil} Query helper
+ * @param properties {@link java.util.Properties} default properties to use
+ * @param parser {@link org.apache.phoenix.pherf.configuration.XMLConfigParser}
+ * @param scenario {@link org.apache.phoenix.pherf.configuration.Scenario} If null is passed
+ * it will run against all scenarios in the parsers list.
+ * @throws Exception
+ */
+ public WriteWorkload(PhoenixUtil phoenixUtil, Properties properties, XMLConfigParser parser,
+ Scenario scenario) throws Exception {
+ this.pUtil = phoenixUtil;
+ this.parser = parser;
+ this.rulesApplier = new RulesApplier(parser);
+ this.resultUtil = new ResultUtil();
+
+ // Overwrite defaults properties with those given in the configuration. This indicates the
+ // scenario is a R/W mixed workload.
+ if (scenario != null) {
+ this.scenario = scenario;
+ writeParams = scenario.getWriteParams();
+ threadSleepDuration = writeParams.getThreadSleepDuration();
+ } else {
+ writeParams = null;
+ this.scenario = null;
+ threadSleepDuration = 0;
+ }
+
+ int size = Integer.parseInt(properties.getProperty("pherf.default.dataloader.threadpool"));
+
+ this.threadPoolSize = (size == 0) ? Runtime.getRuntime().availableProcessors() : size;
+
+ // TODO Move pool management up to WorkloadExecutor
+ this.pool = Executors.newFixedThreadPool(this.threadPoolSize);
+
+ String
+ bSize =
+ (writeParams == null) || (writeParams.getBatchSize() == Long.MIN_VALUE) ?
+ properties.getProperty("pherf.default.dataloader.batchsize") :
+ String.valueOf(writeParams.getBatchSize());
+ this.batchSize =
+ (bSize == null) ? PherfConstants.DEFAULT_BATCH_SIZE : Integer.parseInt(bSize);
+ }
+
+ @Override public void complete() {
+ }
+
+ public Runnable execute() throws Exception {
+ return new Runnable() {
+ @Override public void run() {
+ try {
+ DataLoadTimeSummary dataLoadTimeSummary = new DataLoadTimeSummary();
+ DataLoadThreadTime dataLoadThreadTime = new DataLoadThreadTime();
+
+ if (WriteWorkload.this.scenario == null) {
+ for (Scenario scenario : getParser().getScenarios()) {
+ exec(dataLoadTimeSummary, dataLoadThreadTime, scenario);
+ }
+ } else {
+ exec(dataLoadTimeSummary, dataLoadThreadTime, WriteWorkload.this.scenario);
+ }
+ resultUtil.write(dataLoadTimeSummary);
+ resultUtil.write(dataLoadThreadTime);
+
+ } catch (Exception e) {
+ logger.warn("", e);
+ }
+ }
+ };
+ }
+
+ private synchronized void exec(DataLoadTimeSummary dataLoadTimeSummary,
+ DataLoadThreadTime dataLoadThreadTime, Scenario scenario) throws Exception {
+ logger.info("\nLoading " + scenario.getRowCount() + " rows for " + scenario.getTableName());
+ long start = System.currentTimeMillis();
+
+ List<Future> writeBatches = getBatches(dataLoadThreadTime, scenario);
+
+ waitForBatches(dataLoadTimeSummary, scenario, start, writeBatches);
+
+ // always update stats for Phoenix base tables
+ updatePhoenixStats(scenario.getTableName());
+ }
+
+ private List<Future> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario)
+ throws Exception {
+ RowCalculator
+ rowCalculator =
+ new RowCalculator(getThreadPoolSize(), scenario.getRowCount());
+ List<Future> writeBatches = new ArrayList<>();
+
+ for (int i = 0; i < getThreadPoolSize(); i++) {
+ List<Column>
+ phxMetaCols =
+ pUtil.getColumnsFromPhoenix(scenario.getSchemaName(),
+ scenario.getTableNameWithoutSchemaName(), pUtil.getConnection());
+ int threadRowCount = rowCalculator.getNext();
+ logger.info(
+ "Kick off thread (#" + i + ")for upsert with (" + threadRowCount + ") rows.");
+ Future<Info>
+ write =
+ upsertData(scenario, phxMetaCols, scenario.getTableName(), threadRowCount,
+ dataLoadThreadTime);
+ writeBatches.add(write);
+ }
+ if (writeBatches.isEmpty()) {
+ throw new PherfException(
+ "Holy shit snacks! Throwing up hands in disbelief and exiting. Could not write data for some unknown reason.");
+ }
+
+ return writeBatches;
+ }
+
+ private void waitForBatches(DataLoadTimeSummary dataLoadTimeSummary, Scenario scenario,
+ long start, List<Future> writeBatches)
+ throws InterruptedException, java.util.concurrent.ExecutionException {
+ int sumRows = 0, sumDuration = 0;
+ // Wait for all the batch threads to complete
+ for (Future<Info> write : writeBatches) {
+ Info writeInfo = write.get();
+ sumRows += writeInfo.getRowCount();
+ sumDuration += writeInfo.getDuration();
+ logger.info("Executor (" + this.hashCode() + ") writes complete with row count ("
+ + writeInfo.getRowCount() + ") in Ms (" + writeInfo.getDuration() + ")");
+ }
+ logger.info("Writes completed with total row count (" + sumRows + ") with total time of("
+ + sumDuration + ") Ms");
+ dataLoadTimeSummary
+ .add(scenario.getTableName(), sumRows, (int) (System.currentTimeMillis() - start));
+ }
+
+ /**
+ * TODO Move this method to PhoenixUtil
+ * Update Phoenix table stats
+ *
+ * @param tableName
+ * @throws Exception
+ */
+ public void updatePhoenixStats(String tableName) throws Exception {
+ logger.info("Updating stats for " + tableName);
+ pUtil.executeStatement("UPDATE STATISTICS " + tableName);
+ }
+
+ public Future<Info> upsertData(final Scenario scenario, final List<Column> columns,
+ final String tableName, final int rowCount,
+ final DataLoadThreadTime dataLoadThreadTime) {
+ Future<Info> future = pool.submit(new Callable<Info>() {
+ @Override public Info call() throws Exception {
+ int rowsCreated = 0;
+ long start = 0, duration, totalDuration;
+ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ Connection connection = null;
+ try {
+ connection = pUtil.getConnection();
+ long logStartTime = System.currentTimeMillis();
+ long
+ maxDuration =
+ (WriteWorkload.this.writeParams == null) ?
+ Long.MAX_VALUE :
+ WriteWorkload.this.writeParams.getExecutionDurationInMs();
+
+ for (long i = rowCount; (i > 0) && ((System.currentTimeMillis() - logStartTime)
+ < maxDuration); i--) {
+ String sql = buildSql(columns, tableName);
+ PreparedStatement stmt = connection.prepareStatement(sql);
+ stmt = buildStatement(scenario, columns, stmt, simpleDateFormat);
+ start = System.currentTimeMillis();
+ rowsCreated += stmt.executeUpdate();
+ stmt.close();
+ if ((i % getBatchSize()) == 0) {
+ connection.commit();
+ duration = System.currentTimeMillis() - start;
+ logger.info("Writer (" + Thread.currentThread().getName()
+ + ") committed Batch. Total " + getBatchSize()
+ + " rows for this thread (" + this.hashCode() + ") in ("
+ + duration + ") Ms");
+
+ if (i % PherfConstants.LOG_PER_NROWS == 0 && i != 0) {
+ dataLoadThreadTime
+ .add(tableName, Thread.currentThread().getName(), i,
+ System.currentTimeMillis() - logStartTime);
+ logStartTime = System.currentTimeMillis();
+ }
+
+ // Pause for throttling if configured to do so
+ Thread.sleep(threadSleepDuration);
+ }
+ }
+ } finally {
+ if (connection != null) {
+ try {
+ connection.commit();
+ duration = System.currentTimeMillis() - start;
+ logger.info("Writer ( " + Thread.currentThread().getName()
+ + ") committed Final Batch. Duration (" + duration + ") Ms");
+ connection.close();
+ } catch (SQLException e) {
+ // Swallow since we are closing anyway
+ e.printStackTrace();
+ }
+ }
+ }
+ totalDuration = System.currentTimeMillis() - start;
+ return new Info(totalDuration, rowsCreated);
+ }
+ });
+ return future;
+ }
+
+ private PreparedStatement buildStatement(Scenario scenario, List<Column> columns,
+ PreparedStatement statement, SimpleDateFormat simpleDateFormat) throws Exception {
+ int count = 1;
+ for (Column column : columns) {
+
+ DataValue dataValue = getRulesApplier().getDataForRule(scenario, column);
+ switch (column.getType()) {
+ case VARCHAR:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.VARCHAR);
+ } else {
+ statement.setString(count, dataValue.getValue());
+ }
+ break;
+ case CHAR:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.CHAR);
+ } else {
+ statement.setString(count, dataValue.getValue());
+ }
+ break;
+ case DECIMAL:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.DECIMAL);
+ } else {
+ statement.setBigDecimal(count, new BigDecimal(dataValue.getValue()));
+ }
+ break;
+ case INTEGER:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.INTEGER);
+ } else {
+ statement.setInt(count, Integer.parseInt(dataValue.getValue()));
+ }
+ break;
+ case DATE:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.DATE);
+ } else {
+ Date
+ date =
+ new java.sql.Date(
+ simpleDateFormat.parse(dataValue.getValue()).getTime());
+ statement.setDate(count, date);
+ }
+ break;
+ default:
+ break;
+ }
+ count++;
+ }
+ return statement;
+ }
+
+ private String buildSql(final List<Column> columns, final String tableName) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("upsert into ");
+ builder.append(tableName);
+ builder.append(" (");
+ int count = 1;
+ for (Column column : columns) {
+ builder.append(column.getName());
+ if (count < columns.size()) {
+ builder.append(",");
+ } else {
+ builder.append(")");
+ }
+ count++;
+ }
+ builder.append(" VALUES (");
+ for (int i = 0; i < columns.size(); i++) {
+ if (i < columns.size() - 1) {
+ builder.append("?,");
+ } else {
+ builder.append("?)");
+ }
+ }
+ return builder.toString();
+ }
+
+ public XMLConfigParser getParser() {
+ return parser;
+ }
+
+ public RulesApplier getRulesApplier() {
+ return rulesApplier;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public int getThreadPoolSize() {
+ return threadPoolSize;
+ }
+
+ private class Info {
+
+ private final int rowCount;
+ private final long duration;
+
+ public Info(long duration, int rows) {
+ this.duration = duration;
+ this.rowCount = rows;
+ }
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public int getRowCount() {
+ return rowCount;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
index 9514089..8f93685 100644
--- a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
+++ b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
@@ -304,6 +304,41 @@
</column>
</datamapping>
<scenarios>
+ <scenario tableName="PHERF.PHERF_PROD_TEST_UNSALTED" rowCount="100" name="readWriteScenario">
+ <!-- Scenario level rule overrides will be unsupported in V1.
+ You can use the general datamappings in the mean time-->
+ <dataOverride>
+ <column>
+ <type>VARCHAR</type>
+ <userDefined>true</userDefined>
+ <dataSequence>LIST</dataSequence>
+ <name>TENANT_ID</name>
+ </column>
+ </dataOverride>
+ <writeParams executionDurationInMs="10000">
+ <!--
+ Number of writer it insert into the threadpool
+ -->
+ <writerThreadCount>5</writerThreadCount>
+
+ <!--
+ Time in Ms that each thread will sleep between batch writes. This helps to
+ throttle writers.
+ -->
+ <threadSleepDuration>10</threadSleepDuration>
+
+ <batchSize>100</batchSize>
+ </writeParams>
+ <!--Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first -->
+ <querySet concurrency="1" executionType="PARALLEL" executionDurationInMs="60000"
+ numberOfExecutions="100">
+ <!-- Aggregate queries on a per tenant basis -->
+ <query tenantId="00Dxx0000001gER"
+ ddl="CREATE VIEW IF NOT EXISTS PHERF.PHERF_TEST_VIEW_UNSALTED AS SELECT * FROM PHERF.PHERF_PROD_TEST_UNSALTED"
+ statement="select count(*) from PHERF.PHERF_TEST_VIEW_UNSALTED"/>
+ </querySet>
+
+ </scenario>
<scenario tableName="PHERF.PHERF_PROD_TEST_UNSALTED" rowCount="10">
<!-- Scenario level rule overrides will be unsupported in V1.
You can use the general datamappings in the mean time-->
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
index f362842..6f25fbd 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.pherf;
+import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -27,7 +28,6 @@ import java.util.List;
import org.apache.phoenix.pherf.configuration.*;
import org.apache.phoenix.pherf.rules.DataValue;
-import org.apache.phoenix.pherf.workload.WorkloadExecutor;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,53 +38,55 @@ import javax.xml.bind.Marshaller;
import static org.junit.Assert.*;
-public class ConfigurationParserTest extends ResultBaseTest{
+public class ConfigurationParserTest extends ResultBaseTest {
private static final Logger logger = LoggerFactory.getLogger(ConfigurationParserTest.class);
@Test
- public void testConfigFilesParsing() {
- try {
- WorkloadExecutor workloadExec = new WorkloadExecutor();
- List<Scenario> scenarioList = workloadExec.getParser().getScenarios();
- assertTrue("Could not load the scenarios from xml.", (scenarioList != null) && (scenarioList.size() > 0));
- logger.info("Number of scenarios loaded: " + scenarioList.size());
-
- } catch (Exception e) {
- e.printStackTrace();
- fail();
+ public void testReadWriteWorkloadReader() throws Exception {
+ String scenarioName = "testScenarioRW";
+ List<Scenario> scenarioList = getScenarios();
+ Scenario target = null;
+ for (Scenario scenario : scenarioList) {
+ if (scenarioName.equals(scenario.getName())) {
+ target = scenario;
+ }
}
+ assertNotNull("Could not find scenario: " + scenarioName, target);
+ WriteParams params = target.getWriteParams();
+
+ assertNotNull("Could not find writeParams in scenario: " + scenarioName, params);
+ assertNotNull("Could not find batch size: ", params.getBatchSize());
+ assertNotNull("Could not find execution duration: ", params.getExecutionDurationInMs());
+ assertNotNull("Could not find sleep duration: ", params.getThreadSleepDuration());
+ assertNotNull("Could not find writer count: ", params.getWriterThreadCount());
}
- @Test
+ @Test
// TODO Break this into multiple smaller tests.
- public void testConfigReader(){
- URL resourceUrl = getClass().getResource("/scenario/test_scenario.xml");
- assertNotNull("Test data XML file is missing", resourceUrl);
-
- try {
+ public void testConfigReader() {
+ try {
logger.debug("DataModel: " + writeXML());
- Path resourcePath = Paths.get(resourceUrl.toURI());
- DataModel data = XMLConfigParser.readDataModel(resourcePath);
- List<Scenario> scenarioList = data.getScenarios();
- assertTrue("Could not load the scenarios from xml.", (scenarioList != null) && (scenarioList.size() > 0));
- List<Column> dataMappingColumns = data.getDataMappingColumns();
- assertTrue("Could not load the data columns from xml.", (dataMappingColumns != null) && (dataMappingColumns.size() > 0));
+ List<Scenario> scenarioList = getScenarios();
+ List<Column> dataMappingColumns = getDataModel().getDataMappingColumns();
+ assertTrue("Could not load the data columns from xml.",
+ (dataMappingColumns != null) && (dataMappingColumns.size() > 0));
assertTrue("Could not load the data DataValue list from xml.",
(dataMappingColumns.get(6).getDataValues() != null)
- && (dataMappingColumns.get(6).getDataValues().size() > 0));
+ && (dataMappingColumns.get(6).getDataValues().size() > 0));
assertDateValue(dataMappingColumns);
// Validate column mappings
for (Column column : dataMappingColumns) {
- assertNotNull("Column ("+ column.getName() + ") is missing its type",column.getType());
+ assertNotNull("Column (" + column.getName() + ") is missing its type",
+ column.getType());
}
- Scenario scenario = scenarioList.get(0);
+ Scenario scenario = scenarioList.get(1);
assertNotNull(scenario);
assertEquals("PHERF.TEST_TABLE", scenario.getTableName());
- assertEquals(10, scenario.getRowCount());
+ assertEquals(30, scenario.getRowCount());
assertEquals(1, scenario.getDataOverride().getColumn().size());
QuerySet qs = scenario.getQuerySet().get(0);
assertEquals(ExecutionType.SERIAL, qs.getExecutionType());
@@ -99,27 +101,50 @@ public class ConfigurationParserTest extends ResultBaseTest{
assertEquals("select count(*) from PHERF.TEST_TABLE", firstQuery.getStatement());
assertEquals("123456789012345", firstQuery.getTenantId());
assertEquals(null, firstQuery.getDdl());
- assertEquals(0, (long)firstQuery.getExpectedAggregateRowCount());
+ assertEquals(0, (long) firstQuery.getExpectedAggregateRowCount());
Query secondQuery = qs.getQuery().get(1);
- assertEquals("Could not get statement.", "select sum(SOME_INT) from PHERF.TEST_TABLE", secondQuery.getStatement());
+ assertEquals("Could not get statement.", "select sum(SOME_INT) from PHERF.TEST_TABLE",
+ secondQuery.getStatement());
assertEquals("Could not get queryGroup.", "g1", secondQuery.getQueryGroup());
// Make sure anything in the overrides matches a real column in the data mappings
DataOverride override = scenario.getDataOverride();
for (Column column : override.getColumn()) {
- assertTrue("Could not lookup Column (" + column.getName() + ") in DataMapping columns: " + dataMappingColumns, dataMappingColumns.contains(column));
+ assertTrue("Could not lookup Column (" + column.getName()
+ + ") in DataMapping columns: " + dataMappingColumns,
+ dataMappingColumns.contains(column));
}
- } catch (Exception e) {
- e.printStackTrace();
- fail();
- }
- }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ private URL getResourceUrl() {
+ URL resourceUrl = getClass().getResource("/scenario/test_scenario.xml");
+ assertNotNull("Test data XML file is missing", resourceUrl);
+ return resourceUrl;
+ }
+
+ private List<Scenario> getScenarios() throws URISyntaxException, JAXBException{
+ DataModel data = getDataModel();
+ List<Scenario> scenarioList = data.getScenarios();
+ assertTrue("Could not load the scenarios from xml.",
+ (scenarioList != null) && (scenarioList.size() > 0));
+ return scenarioList;
+ }
+
+ private DataModel getDataModel() throws URISyntaxException, JAXBException {
+ Path resourcePath = Paths.get(getResourceUrl().toURI());
+ return XMLConfigParser.readDataModel(resourcePath);
+ }
private void assertDateValue(List<Column> dataMappingColumns) {
for (Column dataMapping : dataMappingColumns) {
- if ((dataMapping.getType() == DataTypeMapping.DATE) && (dataMapping.getName().equals("CREATED_DATE"))) {
+ if ((dataMapping.getType() == DataTypeMapping.DATE) && (dataMapping.getName()
+ .equals("CREATED_DATE"))) {
// First rule should have min/max set
assertNotNull(dataMapping.getDataValues().get(0).getMinValue());
assertNotNull(dataMapping.getDataValues().get(0).getMaxValue());
@@ -139,7 +164,7 @@ public class ConfigurationParserTest extends ResultBaseTest{
/*
Used for debugging to dump out a simple xml filed based on the bound objects.
*/
- private String writeXML() {
+ private String writeXML() {
DataModel data = new DataModel();
try {
DataValue dataValue = new DataValue();
@@ -156,7 +181,6 @@ public class ConfigurationParserTest extends ResultBaseTest{
List<Column> columnList = new ArrayList<>();
columnList.add(column);
- data.setRelease("192");
data.setDataMappingColumns(columnList);
Scenario scenario = new Scenario();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
index a202437..4ccf95c 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
@@ -33,7 +33,6 @@ import org.apache.phoenix.pherf.result.file.ResultFileDetails;
import org.apache.phoenix.pherf.result.impl.CSVResultHandler;
import org.apache.phoenix.pherf.result.impl.XMLResultHandler;
import org.apache.phoenix.pherf.result.*;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.phoenix.pherf.configuration.Query;
@@ -72,7 +71,7 @@ public class ResultTest extends ResultBaseTest {
public void testMonitorResult() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
MonitorManager monitor = new MonitorManager(100);
- Future future = executorService.submit(monitor);
+ Future future = executorService.submit(monitor.execute());
List<Result> records;
final int TIMEOUT = 30;
@@ -83,7 +82,7 @@ public class ResultTest extends ResultBaseTest {
Thread.sleep(100);
if (ct == max) {
int timer = 0;
- monitor.stop();
+ monitor.complete();
while (monitor.isRunning() && (timer < TIMEOUT)) {
System.out.println("Waiting for monitor to finish. Seconds Waited :" + timer);
Thread.sleep(1000);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
index 15d4608..92604d4 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
@@ -19,7 +19,7 @@
package org.apache.phoenix.pherf;
import org.apache.phoenix.pherf.configuration.*;
-import org.apache.phoenix.pherf.loaddata.DataLoader;
+import org.apache.phoenix.pherf.workload.WriteWorkload;
import org.apache.phoenix.pherf.rules.DataValue;
import org.apache.phoenix.pherf.rules.RulesApplier;
import org.apache.phoenix.pherf.util.PhoenixUtil;
@@ -28,20 +28,19 @@ import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.junit.Test;
-import java.sql.Types;
import java.util.*;
import static org.junit.Assert.*;
public class RuleGeneratorTest {
- static PhoenixUtil util = new PhoenixUtil(true);
- static final String matcherScenario = PherfConstants.SCENARIO_ROOT_PATTERN + ".xml";
+ private static PhoenixUtil util = PhoenixUtil.create(true);
+ private static final String matcherScenario = PherfConstants.SCENARIO_ROOT_PATTERN + ".xml";
@Test
public void testDateGenerator() throws Exception {
XMLConfigParser parser = new XMLConfigParser(matcherScenario);
DataModel model = parser.getDataModels().get(0);
- DataLoader loader = new DataLoader(parser);
+ WriteWorkload loader = new WriteWorkload(parser);
RulesApplier rulesApplier = loader.getRulesApplier();
for (Column dataMapping : model.getDataMappingColumns()) {
@@ -68,7 +67,7 @@ public class RuleGeneratorTest {
public void testNullChance() throws Exception {
XMLConfigParser parser = new XMLConfigParser(matcherScenario);
DataModel model = parser.getDataModels().get(0);
- DataLoader loader = new DataLoader(parser);
+ WriteWorkload loader = new WriteWorkload(parser);
RulesApplier rulesApplier = loader.getRulesApplier();
int sampleSize = 100;
List<String> values = new ArrayList<>(sampleSize);
@@ -96,7 +95,7 @@ public class RuleGeneratorTest {
public void testSequentialDataSequence() throws Exception {
XMLConfigParser parser = new XMLConfigParser(matcherScenario);
DataModel model = parser.getDataModels().get(0);
- DataLoader loader = new DataLoader(parser);
+ WriteWorkload loader = new WriteWorkload(parser);
RulesApplier rulesApplier = loader.getRulesApplier();
Column targetColumn = null;
@@ -181,7 +180,7 @@ public class RuleGeneratorTest {
expectedValues.add("cCCyYhnNbBs9kWr");
XMLConfigParser parser = new XMLConfigParser(".*test_scenario.xml");
- DataLoader loader = new DataLoader(parser);
+ WriteWorkload loader = new WriteWorkload(parser);
RulesApplier rulesApplier = loader.getRulesApplier();
Scenario scenario = parser.getScenarios().get(0);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
index 45d36d2..fddf022 100644
--- a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
+++ b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
@@ -127,10 +127,50 @@
<name>NEWVAL_STRING</name>
<prefix>TSTPRFX</prefix>
</column>
-
</datamapping>
<scenarios>
- <scenario tableName="PHERF.TEST_TABLE" rowCount="10" name="testScenario">
+ <scenario tableName="PHERF.TEST_TABLE" rowCount="100" name="testScenarioRW">
+ <!-- Scenario level rule overrides will be unsupported in V1.
+ You can use the general datamappings in the mean time-->
+ <dataOverride>
+ <column>
+ <type>VARCHAR</type>
+ <userDefined>true</userDefined>
+ <dataSequence>RANDOM</dataSequence>
+ <length>10</length>
+ <name>FIELD</name>
+ </column>
+ </dataOverride>
+
+ <!--
+ This is used to add mixed R/W workloads.
+
+ If this tag exists, a writer pool will be created based on the below properties.
+ These props will override the default values in pherf.properties, but only for this
+ scenario.The write jobs will run in conjunction with the querySet below.
+ -->
+ <writeParams executionDurationInMs="10000">
+ <!--
+ Number of writer it insert into the threadpool
+ -->
+ <writerThreadCount>2</writerThreadCount>
+
+ <!--
+ Time in Ms that each thread will sleep between batch writes. This helps to
+ throttle writers.
+ -->
+ <threadSleepDuration>10</threadSleepDuration>
+
+ <batchSize>1000</batchSize>
+ </writeParams>
+ <querySet concurrency="1" executionType="PARALLEL" executionDurationInMs="10000">
+ <query id="q3" statement="select count(*) from PHERF.TEST_TABLE"/>
+ <query id="q4" statement="select sum(DIVISION) from PHERF.TEST_TABLE"/>
+ </querySet>
+
+ </scenario>
+
+ <scenario tableName="PHERF.TEST_TABLE" rowCount="30" name="testScenario">
<!-- Scenario level rule overrides will be unsupported in V1.
You can use the general datamappings in the mean time-->
<dataOverride>
@@ -145,16 +185,20 @@
<!--Note: 1. Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first
2. DDL included in query are executed only once on start of querySet execution.
-->
- <querySet concurrency="1-3" executionType="SERIAL" executionDurationInMs="5000" numberOfExecutions="100">
- <query id="q1" tenantId="123456789012345" expectedAggregateRowCount="0" statement="select count(*) from PHERF.TEST_TABLE"/>
+ <querySet concurrency="1-3" executionType="SERIAL" executionDurationInMs="5000"
+ numberOfExecutions="100">
+ <query id="q1" tenantId="123456789012345" expectedAggregateRowCount="0"
+ statement="select count(*) from PHERF.TEST_TABLE"/>
<!-- queryGroup is a way to organize queries across tables or scenario files.
The value will be dumped to results. This gives a value to group by on reporting to compare queries -->
- <query id="q2" queryGroup="g1" statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
+ <query id="q2" queryGroup="g1"
+ statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
</querySet>
<!--Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first -->
- <querySet concurrency="2-3" executionType="PARALLEL" executionDurationInMs="10000" numberOfExecutions="10">
+ <querySet concurrency="2-3" executionType="PARALLEL" executionDurationInMs="10000"
+ numberOfExecutions="10">
<query id="q3" statement="select count(*) from PHERF.TEST_TABLE"/>
- <query id="q4" statement="select sum(DIVISION) from PHERF.TEST_TABLE"/>
+ <query id="q4" statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
</querySet>
</scenario>
</scenarios>
[2/3] phoenix git commit: PHOENIX-1920 - Pherf - Add support for
mixed r/w workloads
Posted by co...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java
index 523feb4..39d6a9c 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java
@@ -33,17 +33,13 @@ public class ResultManager {
private final ResultUtil util;
private final PherfConstants.RunMode runMode;
-
public ResultManager(String fileNameSeed, PherfConstants.RunMode runMode) {
- this(runMode, Arrays.asList(
- new XMLResultHandler(fileNameSeed, ResultFileDetails.XML),
+ this(runMode, Arrays.asList(new XMLResultHandler(fileNameSeed, ResultFileDetails.XML),
new ImageResultHandler(fileNameSeed, ResultFileDetails.IMAGE),
- new CSVResultHandler(
- fileNameSeed,
- runMode == RunMode.PERFORMANCE ? ResultFileDetails.CSV_DETAILED_PERFORMANCE
- : ResultFileDetails.CSV_DETAILED_FUNCTIONAL),
- new CSVResultHandler(fileNameSeed, ResultFileDetails.CSV_AGGREGATE_PERFORMANCE)
- ));
+ new CSVResultHandler(fileNameSeed, runMode == RunMode.PERFORMANCE ?
+ ResultFileDetails.CSV_DETAILED_PERFORMANCE :
+ ResultFileDetails.CSV_DETAILED_FUNCTIONAL),
+ new CSVResultHandler(fileNameSeed, ResultFileDetails.CSV_AGGREGATE_PERFORMANCE)));
}
public ResultManager(PherfConstants.RunMode runMode, List<ResultHandler> resultHandlers) {
@@ -81,6 +77,7 @@ public class ResultManager {
/**
* Write a combined set of results for each result in the list.
+ *
* @param dataModelResults List<{@link DataModelResult > </>}
* @throws Exception
*/
@@ -89,7 +86,9 @@ public class ResultManager {
CSVResultHandler detailsCSVWriter = null;
try {
- detailsCSVWriter = new CSVResultHandler(PherfConstants.COMBINED_FILE_NAME, ResultFileDetails.CSV_DETAILED_PERFORMANCE);
+ detailsCSVWriter =
+ new CSVResultHandler(PherfConstants.COMBINED_FILE_NAME,
+ ResultFileDetails.CSV_DETAILED_PERFORMANCE);
for (DataModelResult dataModelResult : dataModelResults) {
util.write(detailsCSVWriter, dataModelResult, runMode);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
index fd960d1..07dfa86 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
@@ -22,15 +22,16 @@ import org.apache.phoenix.pherf.PherfConstants;
import org.apache.phoenix.pherf.PherfConstants.RunMode;
import org.apache.phoenix.pherf.result.file.ResultFileDetails;
import org.apache.phoenix.pherf.result.impl.CSVResultHandler;
-import org.apache.phoenix.pherf.result.impl.ImageResultHandler;
-import org.apache.phoenix.pherf.result.impl.XMLResultHandler;
import org.apache.phoenix.pherf.util.PhoenixUtil;
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
import java.text.Format;
import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
+import java.util.Map;
public class ResultUtil {
@@ -54,7 +55,10 @@ public class ResultUtil {
List<ResultValue> rowValues = new ArrayList<>();
rowValues.add(new ResultValue(PhoenixUtil.getZookeeper()));
rowValues.addAll(writeThreadTime.getCsvRepresentation(this));
- Result result = new Result(ResultFileDetails.CSV_DETAILED_PERFORMANCE, "ZK," + dataLoadThreadTime.getCsvTitle(), rowValues);
+ Result
+ result =
+ new Result(ResultFileDetails.CSV_DETAILED_PERFORMANCE,
+ "ZK," + dataLoadThreadTime.getCsvTitle(), rowValues);
writer.write(result);
}
}
@@ -83,7 +87,10 @@ public class ResultUtil {
List<ResultValue> rowValues = new ArrayList<>();
rowValues.add(new ResultValue(PhoenixUtil.getZookeeper()));
rowValues.addAll(loadTime.getCsvRepresentation(this));
- Result result = new Result(resultFileDetails, resultFileDetails.getHeader().toString(), rowValues);
+ Result
+ result =
+ new Result(resultFileDetails, resultFileDetails.getHeader().toString(),
+ rowValues);
writer.write(result);
}
} finally {
@@ -94,23 +101,29 @@ public class ResultUtil {
}
}
- public synchronized void write(ResultHandler resultHandler, DataModelResult dataModelResult, RunMode runMode) throws Exception {
+ public synchronized void write(ResultHandler resultHandler, DataModelResult dataModelResult,
+ RunMode runMode) throws Exception {
ResultFileDetails resultFileDetails = resultHandler.getResultFileDetails();
switch (resultFileDetails) {
- case CSV_AGGREGATE_PERFORMANCE:
- case CSV_DETAILED_PERFORMANCE:
- case CSV_DETAILED_FUNCTIONAL:
- List<List<ResultValue>> rowDetails = getCSVResults(dataModelResult, resultFileDetails, runMode);
- for (List<ResultValue> row : rowDetails) {
- Result result = new Result(resultFileDetails, resultFileDetails.getHeader().toString(), row);
- resultHandler.write(result);
- }
- break;
- default:
- List<ResultValue> resultValue = new ArrayList();
- resultValue.add(new ResultValue<>(dataModelResult));
- resultHandler.write(new Result(resultFileDetails, null, resultValue));
- break;
+ case CSV_AGGREGATE_PERFORMANCE:
+ case CSV_DETAILED_PERFORMANCE:
+ case CSV_DETAILED_FUNCTIONAL:
+ List<List<ResultValue>>
+ rowDetails =
+ getCSVResults(dataModelResult, resultFileDetails, runMode);
+ for (List<ResultValue> row : rowDetails) {
+ Result
+ result =
+ new Result(resultFileDetails, resultFileDetails.getHeader().toString(),
+ row);
+ resultHandler.write(result);
+ }
+ break;
+ default:
+ List<ResultValue> resultValue = new ArrayList();
+ resultValue.add(new ResultValue<>(dataModelResult));
+ resultHandler.write(new Result(resultFileDetails, null, resultValue));
+ break;
}
}
@@ -146,40 +159,47 @@ public class ResultUtil {
return str;
}
- private List<List<ResultValue>> getCSVResults(DataModelResult dataModelResult, ResultFileDetails resultFileDetails, RunMode runMode) {
+ private List<List<ResultValue>> getCSVResults(DataModelResult dataModelResult,
+ ResultFileDetails resultFileDetails, RunMode runMode) {
List<List<ResultValue>> rowList = new ArrayList<>();
for (ScenarioResult result : dataModelResult.getScenarioResult()) {
for (QuerySetResult querySetResult : result.getQuerySetResult()) {
for (QueryResult queryResult : querySetResult.getQueryResults()) {
switch (resultFileDetails) {
- case CSV_AGGREGATE_PERFORMANCE:
- List<ResultValue> csvResult = queryResult.getCsvRepresentation(this);
- rowList.add(csvResult);
- break;
- case CSV_DETAILED_PERFORMANCE:
- case CSV_DETAILED_FUNCTIONAL:
- List<List<ResultValue>> detailedRows = queryResult.getCsvDetailedRepresentation(this, runMode);
- for (List<ResultValue> detailedRowList : detailedRows) {
- List<ResultValue> valueList = new ArrayList<>();
- valueList.add(new ResultValue(convertNull(result.getTableName())));
- valueList.add(new ResultValue(convertNull(result.getName())));
- valueList.add(new ResultValue(convertNull(dataModelResult.getZookeeper())));
- valueList.add(new ResultValue(convertNull(String.valueOf(result.getRowCount()))));
- valueList.add(new ResultValue(convertNull(String.valueOf(querySetResult.getNumberOfExecutions()))));
- valueList.add(new ResultValue(convertNull(String.valueOf(querySetResult.getExecutionType()))));
- if (result.getPhoenixProperties() != null) {
- String props = buildProperty(result);
- valueList.add(new ResultValue(convertNull(props)));
- } else {
- valueList.add(new ResultValue("null"));
- }
- valueList.addAll(detailedRowList);
- rowList.add(valueList);
+ case CSV_AGGREGATE_PERFORMANCE:
+ List<ResultValue> csvResult = queryResult.getCsvRepresentation(this);
+ rowList.add(csvResult);
+ break;
+ case CSV_DETAILED_PERFORMANCE:
+ case CSV_DETAILED_FUNCTIONAL:
+ List<List<ResultValue>>
+ detailedRows =
+ queryResult.getCsvDetailedRepresentation(this, runMode);
+ for (List<ResultValue> detailedRowList : detailedRows) {
+ List<ResultValue> valueList = new ArrayList<>();
+ valueList.add(new ResultValue(convertNull(result.getTableName())));
+ valueList.add(new ResultValue(convertNull(result.getName())));
+ valueList.add(new ResultValue(
+ convertNull(dataModelResult.getZookeeper())));
+ valueList.add(new ResultValue(
+ convertNull(String.valueOf(result.getRowCount()))));
+ valueList.add(new ResultValue(convertNull(
+ String.valueOf(querySetResult.getNumberOfExecutions()))));
+ valueList.add(new ResultValue(convertNull(
+ String.valueOf(querySetResult.getExecutionType()))));
+ if (result.getPhoenixProperties() != null) {
+ String props = buildProperty(result);
+ valueList.add(new ResultValue(convertNull(props)));
+ } else {
+ valueList.add(new ResultValue("null"));
}
- break;
- default:
- break;
+ valueList.addAll(detailedRowList);
+ rowList.add(valueList);
+ }
+ break;
+ default:
+ break;
}
}
}
@@ -192,8 +212,7 @@ public class ResultUtil {
boolean firstPartialSeparator = true;
for (Map.Entry<String, String> entry : result.getPhoenixProperties().entrySet()) {
- if (!firstPartialSeparator)
- sb.append("|");
+ if (!firstPartialSeparator) sb.append("|");
firstPartialSeparator = false;
sb.append(entry.getKey() + "=" + entry.getValue());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultValue.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultValue.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultValue.java
index 38abd65..78364d9 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultValue.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultValue.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.pherf.result;
/**
* Generic box container for a result value. This class allows for writing results of any type easily
+ *
* @param <T>
*/
public class ResultValue<T> {
@@ -33,8 +34,7 @@ public class ResultValue<T> {
return resultValue;
}
- @Override
- public String toString() {
+ @Override public String toString() {
return resultValue.toString();
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
index 690f7e6..3aa45fa 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
@@ -18,104 +18,91 @@
package org.apache.phoenix.pherf.result;
+import javax.xml.bind.annotation.XmlAttribute;
import java.util.Comparator;
import java.util.Date;
-import javax.xml.bind.annotation.XmlAttribute;
-
public class RunTime implements Comparator<RunTime>, Comparable<RunTime> {
- private Date startTime;
- private Integer elapsedDurationInMs;
- private String message;
- private Long resultRowCount;
- private String explainPlan;
-
- @SuppressWarnings("unused")
- public RunTime() {
- }
-
- @SuppressWarnings("unused")
- public RunTime(Integer elapsedDurationInMs) {
- this(null, elapsedDurationInMs);
- }
-
- public RunTime(Long resultRowCount, Integer elapsedDurationInMs) {
- this(null, resultRowCount, elapsedDurationInMs);
- }
-
- public RunTime(Date startTime, Long resultRowCount, Integer elapsedDurationInMs) {
- this(null, null, startTime, resultRowCount, elapsedDurationInMs);
- }
-
- public RunTime(String message, Date startTime, Long resultRowCount, Integer elapsedDurationInMs) {
- this(message, null, startTime, resultRowCount, elapsedDurationInMs);
- }
-
- public RunTime(String message, String explainPlan, Date startTime, Long resultRowCount, Integer elapsedDurationInMs) {
- this.elapsedDurationInMs = elapsedDurationInMs;
- this.startTime = startTime;
- this.resultRowCount = resultRowCount;
- this.message = message;
- this.explainPlan = explainPlan;
- }
-
- @XmlAttribute()
- public Date getStartTime() {
- return startTime;
- }
-
- @SuppressWarnings("unused")
- public void setStartTime(Date startTime) {
- this.startTime = startTime;
- }
-
- @XmlAttribute()
- public Integer getElapsedDurationInMs() {
- return elapsedDurationInMs;
- }
-
- @SuppressWarnings("unused")
- public void setElapsedDurationInMs(Integer elapsedDurationInMs) {
- this.elapsedDurationInMs = elapsedDurationInMs;
- }
-
- @Override
- public int compare(RunTime r1, RunTime r2) {
- return r1.getElapsedDurationInMs().compareTo(r2.getElapsedDurationInMs());
- }
-
- @Override
- public int compareTo(RunTime o) {
- return compare(this, o);
- }
-
- @XmlAttribute()
- public String getMessage() {
- return message;
- }
-
- @SuppressWarnings("unused")
- public void setMessage(String message) {
- this.message = message;
- }
-
- @XmlAttribute()
- public String getExplainPlan() {
- return explainPlan;
- }
-
- @SuppressWarnings("unused")
- public void setExplainPlan(String explainPlan) {
- this.explainPlan = explainPlan;
- }
-
- @XmlAttribute()
- public Long getResultRowCount() {
- return resultRowCount;
- }
-
- @SuppressWarnings("unused")
- public void setResultRowCount(Long resultRowCount) {
- this.resultRowCount = resultRowCount;
- }
+ private Date startTime;
+ private Integer elapsedDurationInMs;
+ private String message;
+ private Long resultRowCount;
+ private String explainPlan;
+
+ @SuppressWarnings("unused") public RunTime() {
+ }
+
+ @SuppressWarnings("unused") public RunTime(Integer elapsedDurationInMs) {
+ this(null, elapsedDurationInMs);
+ }
+
+ public RunTime(Long resultRowCount, Integer elapsedDurationInMs) {
+ this(null, resultRowCount, elapsedDurationInMs);
+ }
+
+ public RunTime(Date startTime, Long resultRowCount, Integer elapsedDurationInMs) {
+ this(null, null, startTime, resultRowCount, elapsedDurationInMs);
+ }
+
+ public RunTime(String message, Date startTime, Long resultRowCount,
+ Integer elapsedDurationInMs) {
+ this(message, null, startTime, resultRowCount, elapsedDurationInMs);
+ }
+
+ public RunTime(String message, String explainPlan, Date startTime, Long resultRowCount,
+ Integer elapsedDurationInMs) {
+ this.elapsedDurationInMs = elapsedDurationInMs;
+ this.startTime = startTime;
+ this.resultRowCount = resultRowCount;
+ this.message = message;
+ this.explainPlan = explainPlan;
+ }
+
+ @XmlAttribute() public Date getStartTime() {
+ return startTime;
+ }
+
+ @SuppressWarnings("unused") public void setStartTime(Date startTime) {
+ this.startTime = startTime;
+ }
+
+ @XmlAttribute() public Integer getElapsedDurationInMs() {
+ return elapsedDurationInMs;
+ }
+
+ @SuppressWarnings("unused") public void setElapsedDurationInMs(Integer elapsedDurationInMs) {
+ this.elapsedDurationInMs = elapsedDurationInMs;
+ }
+
+ @Override public int compare(RunTime r1, RunTime r2) {
+ return r1.getElapsedDurationInMs().compareTo(r2.getElapsedDurationInMs());
+ }
+
+ @Override public int compareTo(RunTime o) {
+ return compare(this, o);
+ }
+
+ @XmlAttribute() public String getMessage() {
+ return message;
+ }
+
+ @SuppressWarnings("unused") public void setMessage(String message) {
+ this.message = message;
+ }
+
+ @XmlAttribute() public String getExplainPlan() {
+ return explainPlan;
+ }
+
+ @SuppressWarnings("unused") public void setExplainPlan(String explainPlan) {
+ this.explainPlan = explainPlan;
+ }
+
+ @XmlAttribute() public Long getResultRowCount() {
+ return resultRowCount;
+ }
+
+ @SuppressWarnings("unused") public void setResultRowCount(Long resultRowCount) {
+ this.resultRowCount = resultRowCount;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ScenarioResult.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ScenarioResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ScenarioResult.java
index b57e424..9cac1c7 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ScenarioResult.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ScenarioResult.java
@@ -18,31 +18,31 @@
package org.apache.phoenix.pherf.result;
+import org.apache.phoenix.pherf.configuration.Scenario;
+
import java.util.ArrayList;
import java.util.List;
-import org.apache.phoenix.pherf.configuration.Scenario;
public class ScenarioResult extends Scenario {
- private List<QuerySetResult> querySetResult = new ArrayList<QuerySetResult>();
-
- public List<QuerySetResult> getQuerySetResult() {
- return querySetResult;
- }
-
- @SuppressWarnings("unused")
- public void setQuerySetResult(List<QuerySetResult> querySetResult) {
- this.querySetResult = querySetResult;
- }
-
- public ScenarioResult() {
- }
-
- public ScenarioResult(Scenario scenario) {
- this.setDataOverride(scenario.getDataOverride());
- this.setPhoenixProperties(scenario.getPhoenixProperties());
- this.setRowCount(scenario.getRowCount());
- this.setTableName(scenario.getTableName());
- this.setName(scenario.getName());
- }
+ private List<QuerySetResult> querySetResult = new ArrayList<>();
+
+ public List<QuerySetResult> getQuerySetResult() {
+ return querySetResult;
+ }
+
+ @SuppressWarnings("unused") public void setQuerySetResult(List<QuerySetResult> querySetResult) {
+ this.querySetResult = querySetResult;
+ }
+
+ public ScenarioResult() {
+ }
+
+ public ScenarioResult(Scenario scenario) {
+ this.setDataOverride(scenario.getDataOverride());
+ this.setPhoenixProperties(scenario.getPhoenixProperties());
+ this.setRowCount(scenario.getRowCount());
+ this.setTableName(scenario.getTableName());
+ this.setName(scenario.getName());
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
index f043bec..03b5664 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
@@ -18,13 +18,12 @@
package org.apache.phoenix.pherf.result;
+import javax.xml.bind.annotation.XmlAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
-import javax.xml.bind.annotation.XmlAttribute;
-
public class ThreadTime {
private List<RunTime> runTimesInMs = Collections.synchronizedList(new ArrayList<RunTime>());
private String threadName;
@@ -84,23 +83,22 @@ public class ThreadTime {
return Collections.max(getRunTimesInMs());
}
- @XmlAttribute()
- public String getThreadName() {
+ @XmlAttribute() public String getThreadName() {
return threadName;
}
public void setThreadName(String threadName) {
this.threadName = threadName;
}
-
+
private String parseThreadName(boolean getConcurrency) {
- if (getThreadName() == null || !getThreadName().contains(",")) return null;
- String[] threadNameSet = getThreadName().split(",");
- if (getConcurrency) {
- return threadNameSet[1];}
- else {
- return threadNameSet[0];
- }
+ if (getThreadName() == null || !getThreadName().contains(",")) return null;
+ String[] threadNameSet = getThreadName().split(",");
+ if (getConcurrency) {
+ return threadNameSet[1];
+ } else {
+ return threadNameSet[0];
+ }
}
public List<List<ResultValue>> getCsvPerformanceRepresentation(ResultUtil util) {
@@ -110,11 +108,14 @@ public class ThreadTime {
List<ResultValue> rowValues = new ArrayList(getRunTimesInMs().size());
rowValues.add(new ResultValue(util.convertNull(parseThreadName(false))));
rowValues.add(new ResultValue(util.convertNull(parseThreadName(true))));
- rowValues.add(new ResultValue(String.valueOf(getRunTimesInMs().get(i).getResultRowCount())));
+ rowValues.add(new ResultValue(
+ String.valueOf(getRunTimesInMs().get(i).getResultRowCount())));
if (getRunTimesInMs().get(i).getMessage() == null) {
- rowValues.add(new ResultValue(util.convertNull(String.valueOf(getRunTimesInMs().get(i).getElapsedDurationInMs()))));
+ rowValues.add(new ResultValue(util.convertNull(
+ String.valueOf(getRunTimesInMs().get(i).getElapsedDurationInMs()))));
} else {
- rowValues.add(new ResultValue(util.convertNull(getRunTimesInMs().get(i).getMessage())));
+ rowValues.add(new ResultValue(
+ util.convertNull(getRunTimesInMs().get(i).getMessage())));
}
rows.add(rowValues);
}
@@ -129,7 +130,8 @@ public class ThreadTime {
rowValues.add(new ResultValue(util.convertNull(parseThreadName(false))));
rowValues.add(new ResultValue(util.convertNull(parseThreadName(true))));
rowValues.add(new ResultValue(util.convertNull(getRunTimesInMs().get(i).getMessage())));
- rowValues.add(new ResultValue(util.convertNull(getRunTimesInMs().get(i).getExplainPlan())));
+ rowValues.add(new ResultValue(
+ util.convertNull(getRunTimesInMs().get(i).getExplainPlan())));
rows.add(rowValues);
}
return rows;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Extension.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Extension.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Extension.java
index 0df383c..e6a7308 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Extension.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Extension.java
@@ -31,8 +31,7 @@ public enum Extension {
this.extension = extension;
}
- @Override
- public String toString() {
+ @Override public String toString() {
return extension;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
index 98e7b30..15e2b9a 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
@@ -20,9 +20,11 @@ package org.apache.phoenix.pherf.result.file;
public enum Header {
EMPTY(""),
- AGGREGATE_PERFORMANCE("START_TIME,QUERY_GROUP,QUERY,TENANT_ID,AVG_MAX_TIME_MS,AVG_TIME_MS,AVG_MIN_TIME_MS,RUN_COUNT"),
- DETAILED_BASE("BASE_TABLE_NAME,SCENARIO_NAME,ZOOKEEPER,ROW_COUNT,EXECUTION_COUNT,EXECUTION_TYPE,PHOENIX_PROPERTIES"
- + ",START_TIME,QUERY_GROUP,QUERY,TENANT_ID,THREAD_NUMBER,CONCURRENCY_LEVEL"),
+ AGGREGATE_PERFORMANCE(
+ "START_TIME,QUERY_GROUP,QUERY,TENANT_ID,AVG_MAX_TIME_MS,AVG_TIME_MS,AVG_MIN_TIME_MS,RUN_COUNT"),
+ DETAILED_BASE(
+ "BASE_TABLE_NAME,SCENARIO_NAME,ZOOKEEPER,ROW_COUNT,EXECUTION_COUNT,EXECUTION_TYPE,PHOENIX_PROPERTIES"
+ + ",START_TIME,QUERY_GROUP,QUERY,TENANT_ID,THREAD_NUMBER,CONCURRENCY_LEVEL"),
DETAILED_PERFORMANCE(DETAILED_BASE + ",RESULT_ROW_COUNT,RUN_TIME_MS"),
DETAILED_FUNCTIONAL(DETAILED_BASE + ",DIFF_STATUS,EXPLAIN_PLAN"),
AGGREGATE_DATA_LOAD("ZK,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"),
@@ -34,8 +36,7 @@ public enum Header {
this.header = header;
}
- @Override
- public String toString() {
+ @Override public String toString() {
return header;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/CSVResultHandler.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/CSVResultHandler.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/CSVResultHandler.java
index e7fbb48..e69f600 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/CSVResultHandler.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/CSVResultHandler.java
@@ -18,13 +18,6 @@
package org.apache.phoenix.pherf.result.impl;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVPrinter;
@@ -36,6 +29,13 @@ import org.apache.phoenix.pherf.result.ResultUtil;
import org.apache.phoenix.pherf.result.ResultValue;
import org.apache.phoenix.pherf.result.file.ResultFileDetails;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* TODO Doc this class. Note that each instance that has a non unique file name will overwrite the last
*/
@@ -51,22 +51,22 @@ public class CSVResultHandler implements ResultHandler {
this(resultFileName, resultFileDetails, true);
}
- public CSVResultHandler(String resultFileName, ResultFileDetails resultFileDetails, boolean generateFullFileName) {
+ public CSVResultHandler(String resultFileName, ResultFileDetails resultFileDetails,
+ boolean generateFullFileName) {
this.util = new ResultUtil();
PherfConstants constants = PherfConstants.create();
String resultDir = constants.getProperty("pherf.default.results.dir");
- this.resultFileName = generateFullFileName ?
- resultDir + PherfConstants.PATH_SEPARATOR
- + PherfConstants.RESULT_PREFIX
- + resultFileName + util.getSuffix()
- + resultFileDetails.getExtension().toString()
- : resultFileName;
+ this.resultFileName =
+ generateFullFileName ?
+ resultDir + PherfConstants.PATH_SEPARATOR + PherfConstants.RESULT_PREFIX
+ + resultFileName + util.getSuffix() + resultFileDetails
+ .getExtension().toString() :
+ resultFileName;
this.resultFileDetails = resultFileDetails;
}
- @Override
- public synchronized void write(Result result) throws IOException {
+ @Override public synchronized void write(Result result) throws IOException {
util.ensureBaseResultDirExists();
open(result);
@@ -74,15 +74,13 @@ public class CSVResultHandler implements ResultHandler {
flush();
}
- @Override
- public synchronized void flush() throws IOException {
+ @Override public synchronized void flush() throws IOException {
if (csvPrinter != null) {
csvPrinter.flush();
}
}
- @Override
- public synchronized void close() throws IOException {
+ @Override public synchronized void close() throws IOException {
if (csvPrinter != null) {
csvPrinter.flush();
csvPrinter.close();
@@ -90,8 +88,7 @@ public class CSVResultHandler implements ResultHandler {
}
}
- @Override
- public synchronized List<Result> read() throws IOException {
+ @Override public synchronized List<Result> read() throws IOException {
CSVParser parser = null;
util.ensureBaseResultDirExists();
try {
@@ -131,13 +128,11 @@ public class CSVResultHandler implements ResultHandler {
isClosed = false;
}
- @Override
- public synchronized boolean isClosed() {
+ @Override public synchronized boolean isClosed() {
return isClosed;
}
- @Override
- public ResultFileDetails getResultFileDetails() {
+ @Override public ResultFileDetails getResultFileDetails() {
return resultFileDetails;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/ImageResultHandler.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/ImageResultHandler.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/ImageResultHandler.java
index ad3c8fb..5c3eac1 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/ImageResultHandler.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/ImageResultHandler.java
@@ -19,8 +19,8 @@
package org.apache.phoenix.pherf.result.impl;
import org.apache.phoenix.pherf.PherfConstants;
-import org.apache.phoenix.pherf.result.file.ResultFileDetails;
import org.apache.phoenix.pherf.result.*;
+import org.apache.phoenix.pherf.result.file.ResultFileDetails;
import org.jfree.chart.ChartFactory;
import org.jfree.chart.ChartUtilities;
import org.jfree.chart.JFreeChart;
@@ -42,22 +42,22 @@ public class ImageResultHandler implements ResultHandler {
this(resultFileName, resultFileDetails, true);
}
- public ImageResultHandler(String resultFileName, ResultFileDetails resultFileDetails, boolean generateFullFileName) {
+ public ImageResultHandler(String resultFileName, ResultFileDetails resultFileDetails,
+ boolean generateFullFileName) {
ResultUtil util = new ResultUtil();
PherfConstants constants = PherfConstants.create();
String resultDir = constants.getProperty("pherf.default.results.dir");
- this.resultFileName = generateFullFileName ?
- resultDir + PherfConstants.PATH_SEPARATOR
- + PherfConstants.RESULT_PREFIX
- + resultFileName + util.getSuffix()
- + resultFileDetails.getExtension().toString()
- : resultFileName;
+ this.resultFileName =
+ generateFullFileName ?
+ resultDir + PherfConstants.PATH_SEPARATOR + PherfConstants.RESULT_PREFIX
+ + resultFileName + util.getSuffix() + resultFileDetails
+ .getExtension().toString() :
+ resultFileName;
this.resultFileDetails = resultFileDetails;
}
- @Override
- public synchronized void write(Result result) throws Exception {
+ @Override public synchronized void write(Result result) throws Exception {
TimeSeriesCollection timeSeriesCollection = new TimeSeriesCollection();
int rowCount = 0;
int maxLegendCount = 20;
@@ -70,12 +70,16 @@ public class ImageResultHandler implements ResultHandler {
for (QuerySetResult querySetResult : scenarioResult.getQuerySetResult()) {
for (QueryResult queryResult : querySetResult.getQueryResults()) {
for (ThreadTime tt : queryResult.getThreadTimes()) {
- TimeSeries timeSeries = new TimeSeries(queryResult.getStatement() + " :: " + tt.getThreadName());
+ TimeSeries
+ timeSeries =
+ new TimeSeries(
+ queryResult.getStatement() + " :: " + tt.getThreadName());
rowCount++;
synchronized (tt.getRunTimesInMs()) {
for (RunTime rt : tt.getRunTimesInMs()) {
if (rt.getStartTime() != null) {
- timeSeries.add(new Millisecond(rt.getStartTime()), rt.getElapsedDurationInMs());
+ timeSeries.add(new Millisecond(rt.getStartTime()),
+ rt.getElapsedDurationInMs());
}
}
}
@@ -85,10 +89,14 @@ public class ImageResultHandler implements ResultHandler {
}
}
boolean legend = rowCount > maxLegendCount ? false : true;
- JFreeChart chart = ChartFactory.createTimeSeriesChart(dataModelResult.getName()
- , "Time", "Query Time (ms)", timeSeriesCollection,
- legend, true, false);
- StandardXYItemRenderer renderer = new StandardXYItemRenderer(StandardXYItemRenderer.SHAPES_AND_LINES);
+ JFreeChart
+ chart =
+ ChartFactory
+ .createTimeSeriesChart(dataModelResult.getName(), "Time", "Query Time (ms)",
+ timeSeriesCollection, legend, true, false);
+ StandardXYItemRenderer
+ renderer =
+ new StandardXYItemRenderer(StandardXYItemRenderer.SHAPES_AND_LINES);
chart.getXYPlot().setRenderer(renderer);
chart.getXYPlot().setBackgroundPaint(Color.WHITE);
chart.getXYPlot().setRangeGridlinePaint(Color.BLACK);
@@ -96,35 +104,31 @@ public class ImageResultHandler implements ResultHandler {
chart.getXYPlot().getRenderer().setSeriesStroke(i, new BasicStroke(3f));
}
try {
- ChartUtilities.saveChartAsJPEG(new File(resultFileName), chart, chartDimension, chartDimension);
+ ChartUtilities.saveChartAsJPEG(new File(resultFileName), chart, chartDimension,
+ chartDimension);
} catch (IOException e) {
e.printStackTrace();
}
}
- @Override
- public synchronized void flush() throws Exception {
+ @Override public synchronized void flush() throws Exception {
}
- @Override
- public synchronized void close() throws Exception {
+ @Override public synchronized void close() throws Exception {
}
- @Override
- public List<Result> read() throws Exception {
+ @Override public List<Result> read() throws Exception {
return null;
}
- @Override
- public boolean isClosed() {
+ @Override public boolean isClosed() {
return false;
}
- @Override
- public ResultFileDetails getResultFileDetails() {
+ @Override public ResultFileDetails getResultFileDetails() {
return resultFileDetails;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/XMLResultHandler.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/XMLResultHandler.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/XMLResultHandler.java
index 8a913ed..009ae21 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/XMLResultHandler.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/XMLResultHandler.java
@@ -19,8 +19,8 @@
package org.apache.phoenix.pherf.result.impl;
import org.apache.phoenix.pherf.PherfConstants;
-import org.apache.phoenix.pherf.result.file.ResultFileDetails;
import org.apache.phoenix.pherf.result.*;
+import org.apache.phoenix.pherf.result.file.ResultFileDetails;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Marshaller;
@@ -30,7 +30,6 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Properties;
public class XMLResultHandler implements ResultHandler {
private final String resultFileName;
@@ -40,22 +39,22 @@ public class XMLResultHandler implements ResultHandler {
this(resultFileName, resultFileDetails, true);
}
- public XMLResultHandler(String resultFileName, ResultFileDetails resultFileDetails, boolean generateFullFileName) {
+ public XMLResultHandler(String resultFileName, ResultFileDetails resultFileDetails,
+ boolean generateFullFileName) {
ResultUtil util = new ResultUtil();
PherfConstants constants = PherfConstants.create();
String resultDir = constants.getProperty("pherf.default.results.dir");
- this.resultFileName = generateFullFileName ?
- resultDir + PherfConstants.PATH_SEPARATOR
- + PherfConstants.RESULT_PREFIX
- + resultFileName + util.getSuffix()
- + resultFileDetails.getExtension().toString()
- : resultFileName;
+ this.resultFileName =
+ generateFullFileName ?
+ resultDir + PherfConstants.PATH_SEPARATOR + PherfConstants.RESULT_PREFIX
+ + resultFileName + util.getSuffix() + resultFileDetails
+ .getExtension().toString() :
+ resultFileName;
this.resultFileDetails = resultFileDetails;
}
- @Override
- public synchronized void write(Result result) throws Exception {
+ @Override public synchronized void write(Result result) throws Exception {
FileOutputStream os = null;
JAXBContext jaxbContext = JAXBContext.newInstance(DataModelResult.class);
Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
@@ -72,18 +71,15 @@ public class XMLResultHandler implements ResultHandler {
}
}
- @Override
- public synchronized void flush() throws IOException {
+ @Override public synchronized void flush() throws IOException {
return;
}
- @Override
- public synchronized void close() throws IOException {
+ @Override public synchronized void close() throws IOException {
return;
}
- @Override
- public synchronized List<Result> read() throws Exception {
+ @Override public synchronized List<Result> read() throws Exception {
JAXBContext jaxbContext = JAXBContext.newInstance(DataModelResult.class);
Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
@@ -95,13 +91,11 @@ public class XMLResultHandler implements ResultHandler {
return results;
}
- @Override
- public boolean isClosed() {
+ @Override public boolean isClosed() {
return true;
}
- @Override
- public ResultFileDetails getResultFileDetails() {
+ @Override public ResultFileDetails getResultFileDetails() {
return resultFileDetails;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java
index 4761211..439f87e 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java
@@ -45,7 +45,7 @@ public class SchemaReader {
* @throws Exception
*/
public SchemaReader(final String searchPattern) throws Exception {
- this(new PhoenixUtil(), searchPattern);
+ this(PhoenixUtil.create(), searchPattern);
}
public SchemaReader(PhoenixUtil util, final String searchPattern) throws Exception {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
index 83e324d..0156149 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
@@ -30,6 +30,8 @@ import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.phoenix.pherf.configuration.Query;
+import org.apache.phoenix.pherf.configuration.QuerySet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,15 +41,25 @@ public class PhoenixUtil {
private static String zookeeper;
private static int rowCountOverride = 0;
private boolean testEnabled;
+ private static PhoenixUtil instance;
- public PhoenixUtil() {
+ private PhoenixUtil() {
this(false);
}
- public PhoenixUtil(final boolean testEnabled) {
+ private PhoenixUtil(final boolean testEnabled) {
this.testEnabled = testEnabled;
}
+ public static PhoenixUtil create() {
+ return create(false);
+ }
+
+ public static PhoenixUtil create(final boolean testEnabled) {
+ instance = instance != null ? instance : new PhoenixUtil(testEnabled);
+ return instance;
+ }
+
public Connection getConnection() throws Exception{
return getConnection(null);
}
@@ -56,7 +68,7 @@ public class PhoenixUtil {
return getConnection(tenantId, testEnabled);
}
- public Connection getConnection(String tenantId, boolean testEnabled) throws Exception {
+ private Connection getConnection(String tenantId, boolean testEnabled) throws Exception {
if (null == zookeeper) {
throw new IllegalArgumentException(
"Zookeeper must be set before initializing connection!");
@@ -115,17 +127,6 @@ public class PhoenixUtil {
return result;
}
- @SuppressWarnings("unused")
- public ResultSet executeQuery(PreparedStatement preparedStatement, Connection connection) {
- ResultSet resultSet = null;
- try {
- resultSet = preparedStatement.executeQuery();
- } catch (SQLException e) {
- e.printStackTrace();
- }
- return resultSet;
- }
-
/**
* Delete existing tables with schema name set as {@link PherfConstants#PHERF_SCHEMA_NAME} with regex comparison
*
@@ -133,14 +134,14 @@ public class PhoenixUtil {
* @throws SQLException
* @throws Exception
*/
- public void deleteTables(String regexMatch) throws SQLException, Exception {
+ public void deleteTables(String regexMatch) throws Exception {
regexMatch = regexMatch.toUpperCase().replace("ALL", ".*");
Connection conn = getConnection();
try {
ResultSet resultSet = getTableMetaData(PherfConstants.PHERF_SCHEMA_NAME, null, conn);
while (resultSet.next()) {
- String tableName = resultSet.getString("TABLE_SCHEM") == null ? resultSet.getString("TABLE_NAME") :
- resultSet.getString("TABLE_SCHEM") + "." + resultSet.getString("TABLE_NAME");
+ String tableName = resultSet.getString("TABLE_SCHEMA") == null ? resultSet.getString("TABLE_NAME") :
+ resultSet.getString("TABLE_SCHEMA") + "." + resultSet.getString("TABLE_NAME");
if (tableName.matches(regexMatch)) {
logger.info("\nDropping " + tableName);
executeStatement("DROP TABLE " + tableName + " CASCADE", conn);
@@ -183,8 +184,33 @@ public class PhoenixUtil {
return Collections.unmodifiableList(columnList);
}
-
- public static String getZookeeper() {
+
+ /**
+ * Execute all querySet DDLs first based on tenantId if specified. This is executed
+ * first since we don't want to run DDLs in parallel to executing queries.
+ *
+ * @param querySet
+ * @throws Exception
+ */
+ public void executeQuerySetDdls(QuerySet querySet) throws Exception {
+ for (Query query : querySet.getQuery()) {
+ if (null != query.getDdl()) {
+ Connection conn = null;
+ try {
+ logger.info("\nExecuting DDL:" + query.getDdl() + " on tenantId:" + query
+ .getTenantId());
+ executeStatement(query.getDdl(),
+ conn = getConnection(query.getTenantId()));
+ } finally {
+ if (null != conn) {
+ conn.close();
+ }
+ }
+ }
+ }
+ }
+
+ public static String getZookeeper() {
return zookeeper;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
new file mode 100644
index 0000000..efb3da9
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Calendar;
+import java.util.Date;
+
+import org.apache.phoenix.pherf.PherfConstants.RunMode;
+
+import org.apache.phoenix.pherf.result.DataModelResult;
+import org.apache.phoenix.pherf.result.ResultManager;
+import org.apache.phoenix.pherf.result.RunTime;
+import org.apache.phoenix.pherf.result.ThreadTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.phoenix.pherf.configuration.Query;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+
+class MultiThreadedRunner implements Runnable {
+ private static final Logger logger = LoggerFactory.getLogger(MultiThreadedRunner.class);
+ private Query query;
+ private ThreadTime threadTime;
+ private PhoenixUtil pUtil = PhoenixUtil.create();
+ private String threadName;
+ private DataModelResult dataModelResult;
+ private long numberOfExecutions;
+ private long executionDurationInMs;
+ private static long lastResultWritten = System.currentTimeMillis() - 1000;
+ private final ResultManager resultManager;
+
+ /**
+ * MultiThreadedRunner
+ *
+ * @param threadName
+ * @param query
+ * @param dataModelResult
+ * @param threadTime
+ * @param numberOfExecutions
+ * @param executionDurationInMs
+ */
+ MultiThreadedRunner(String threadName, Query query, DataModelResult dataModelResult,
+ ThreadTime threadTime, long numberOfExecutions, long executionDurationInMs) {
+ this.query = query;
+ this.threadName = threadName;
+ this.threadTime = threadTime;
+ this.dataModelResult = dataModelResult;
+ this.numberOfExecutions = numberOfExecutions;
+ this.executionDurationInMs = executionDurationInMs;
+ this.resultManager = new ResultManager(dataModelResult.getName(), RunMode.PERFORMANCE);
+ }
+
+ /**
+ * Executes run for a minimum of number of execution or execution duration
+ */
+ public void run() {
+ logger.info("\n\nThread Starting " + threadName + " ; " + query.getStatement() + " for "
+ + numberOfExecutions + "times\n\n");
+ Long start = System.currentTimeMillis();
+ for (long i = numberOfExecutions; (i > 0 && ((System.currentTimeMillis() - start)
+ < executionDurationInMs)); i--) {
+ try {
+ synchronized (resultManager) {
+ timedQuery();
+ if ((System.currentTimeMillis() - lastResultWritten) > 1000) {
+ resultManager.write(dataModelResult);
+ lastResultWritten = System.currentTimeMillis();
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ logger.info("\n\nThread exiting." + threadName + "\n\n");
+ }
+
+ private synchronized ThreadTime getThreadTime() {
+ return threadTime;
+ }
+
+ /**
+ * Timed query execution
+ *
+ * @throws Exception
+ */
+ private void timedQuery() throws Exception {
+ boolean
+ isSelectCountStatement =
+ query.getStatement().toUpperCase().trim().contains("COUNT(*)") ? true : false;
+
+ Connection conn = null;
+ PreparedStatement statement = null;
+ ResultSet rs = null;
+ Long start = System.currentTimeMillis();
+ Date startDate = Calendar.getInstance().getTime();
+ String exception = null;
+ long resultRowCount = 0;
+
+ try {
+ conn = pUtil.getConnection(query.getTenantId());
+ statement = conn.prepareStatement(query.getStatement());
+ boolean isQuery = statement.execute();
+ if (isQuery) {
+ rs = statement.getResultSet();
+ while (rs.next()) {
+ if (null != query.getExpectedAggregateRowCount()) {
+ if (rs.getLong(1) != query.getExpectedAggregateRowCount())
+ throw new RuntimeException(
+ "Aggregate count " + rs.getLong(1) + " does not match expected "
+ + query.getExpectedAggregateRowCount());
+ }
+
+ if (isSelectCountStatement) {
+ resultRowCount = rs.getLong(1);
+ } else {
+ resultRowCount++;
+ }
+ }
+ } else {
+ conn.commit();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ exception = e.getMessage();
+ } finally {
+ getThreadTime().getRunTimesInMs().add(new RunTime(exception, startDate, resultRowCount,
+ (int) (System.currentTimeMillis() - start)));
+
+ if (rs != null) rs.close();
+ if (statement != null) statement.close();
+ if (conn != null) conn.close();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
index c78db90..1735754 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
@@ -30,84 +30,69 @@ import org.apache.phoenix.pherf.result.RunTime;
import org.apache.phoenix.pherf.result.ThreadTime;
class MultithreadedDiffer implements Runnable {
- private static final Logger logger = LoggerFactory
- .getLogger(MultithreadedRunner.class);
- private Thread t;
- private Query query;
- private ThreadTime threadTime;
- private String threadName;
- private long numberOfExecutions;
- private long executionDurationInMs;
- private QueryVerifier queryVerifier = new QueryVerifier(true);
+ private static final Logger logger = LoggerFactory.getLogger(MultiThreadedRunner.class);
+ private Thread t;
+ private Query query;
+ private ThreadTime threadTime;
+ private String threadName;
+ private long numberOfExecutions;
+ private long executionDurationInMs;
+ private QueryVerifier queryVerifier = new QueryVerifier(true);
- private synchronized ThreadTime getThreadTime() {
+ private synchronized ThreadTime getThreadTime() {
return threadTime;
}
/**
- * Query Verification
- * @throws Exception
- */
- private void diffQuery() throws Exception {
- Long start = System.currentTimeMillis();
- Date startDate = Calendar.getInstance().getTime();
- String newCSV = queryVerifier.exportCSV(query);
- boolean verifyResult = queryVerifier.doDiff(query, newCSV);
- String explainPlan = queryVerifier.getExplainPlan(query);
- getThreadTime().getRunTimesInMs().add(
- new RunTime(verifyResult == true ? PherfConstants.DIFF_PASS : PherfConstants.DIFF_FAIL,
- explainPlan, startDate, -1L,
- (int)(System.currentTimeMillis() - start)));
- }
-
- /**
- * Multithreaded Differ
- * @param threadName
- * @param query
- * @param threadName
- * @param threadTime
- * @param numberOfExecutions
- * @param executionDurationInMs
- */
- MultithreadedDiffer(String threadName,
- Query query,
- ThreadTime threadTime,
- long numberOfExecutions,
- long executionDurationInMs) {
- this.query = query;
- this.threadName = threadName;
- this.threadTime = threadTime;
- this.numberOfExecutions = numberOfExecutions;
- this.executionDurationInMs = executionDurationInMs;
- }
+ * Query Verification
+ *
+ * @throws Exception
+ */
+ private void diffQuery() throws Exception {
+ Long start = System.currentTimeMillis();
+ Date startDate = Calendar.getInstance().getTime();
+ String newCSV = queryVerifier.exportCSV(query);
+ boolean verifyResult = queryVerifier.doDiff(query, newCSV);
+ String explainPlan = queryVerifier.getExplainPlan(query);
+ getThreadTime().getRunTimesInMs().add(new RunTime(
+ verifyResult == true ? PherfConstants.DIFF_PASS : PherfConstants.DIFF_FAIL,
+ explainPlan, startDate, -1L, (int) (System.currentTimeMillis() - start)));
+ }
- /**
- * Executes verification runs for a minimum of number of execution or execution duration
- */
- public void run() {
- logger.info("\n\nThread Starting " + t.getName() + " ; " + query.getStatement() + " for "
- + numberOfExecutions + "times\n\n");
- Long start = System.currentTimeMillis();
- for (long i = numberOfExecutions; (i > 0 && ((System
- .currentTimeMillis() - start) < executionDurationInMs)); i--) {
- try {
- diffQuery();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- logger.info("\n\nThread exiting." + t.getName() + "\n\n");
- }
+ /**
+ * Multithreaded Differ
+ *
+ * @param threadName
+ * @param query
+ * @param threadName
+ * @param threadTime
+ * @param numberOfExecutions
+ * @param executionDurationInMs
+ */
+ MultithreadedDiffer(String threadName, Query query, ThreadTime threadTime,
+ long numberOfExecutions, long executionDurationInMs) {
+ this.query = query;
+ this.threadName = threadName;
+ this.threadTime = threadTime;
+ this.numberOfExecutions = numberOfExecutions;
+ this.executionDurationInMs = executionDurationInMs;
+ }
- /**
- * Thread start
- * @return
- */
- public Thread start() {
- if (t == null) {
- t = new Thread(this, threadName);
- t.start();
- }
- return t;
- }
+ /**
+ * Executes verification runs for a minimum of number of execution or execution duration
+ */
+ public void run() {
+ logger.info("\n\nThread Starting " + t.getName() + " ; " + query.getStatement() + " for "
+ + numberOfExecutions + "times\n\n");
+ Long start = System.currentTimeMillis();
+ for (long i = numberOfExecutions; (i > 0 && ((System.currentTimeMillis() - start)
+ < executionDurationInMs)); i--) {
+ try {
+ diffQuery();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ logger.info("\n\nThread exiting." + t.getName() + "\n\n");
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedRunner.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedRunner.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedRunner.java
deleted file mode 100644
index 237fc17..0000000
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedRunner.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pherf.workload;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.util.Calendar;
-import java.util.Date;
-
-import org.apache.phoenix.pherf.PherfConstants.RunMode;
-
-import org.apache.phoenix.pherf.result.DataModelResult;
-import org.apache.phoenix.pherf.result.ResultManager;
-import org.apache.phoenix.pherf.result.RunTime;
-import org.apache.phoenix.pherf.result.ThreadTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.phoenix.pherf.configuration.Query;
-import org.apache.phoenix.pherf.util.PhoenixUtil;
-
-class MultithreadedRunner implements Runnable {
- private static final Logger logger = LoggerFactory
- .getLogger(MultithreadedRunner.class);
- private Thread t;
- private Query query;
- private ThreadTime threadTime;
- private PhoenixUtil pUtil = new PhoenixUtil();
- private String threadName;
- private DataModelResult dataModelResult;
- private long numberOfExecutions;
- private long executionDurationInMs;
- private static long lastResultWritten = System.currentTimeMillis() - 1000;
- private final ResultManager resultManager;
-
- /**
- * Multithreaded runner
- *
- * @param threadName
- * @param query
- * @param dataModelResult
- * @param threadTime
- * @param numberOfExecutions
- * @param executionDurationInMs
- */
- MultithreadedRunner(String threadName,
- Query query,
- DataModelResult dataModelResult,
- ThreadTime threadTime,
- long numberOfExecutions,
- long executionDurationInMs) {
- this.query = query;
- this.threadName = threadName;
- this.threadTime = threadTime;
- this.dataModelResult = dataModelResult;
- this.numberOfExecutions = numberOfExecutions;
- this.executionDurationInMs = executionDurationInMs;
- this.resultManager = new ResultManager(dataModelResult.getName(), RunMode.PERFORMANCE);
- }
-
- /**
- * Executes run for a minimum of number of execution or execution duration
- */
- public void run() {
- logger.info("\n\nThread Starting " + t.getName() + " ; " + query.getStatement() + " for "
- + numberOfExecutions + "times\n\n");
- Long start = System.currentTimeMillis();
- for (long i = numberOfExecutions; (i > 0 && ((System
- .currentTimeMillis() - start) < executionDurationInMs)); i--) {
- try {
- synchronized (resultManager) {
- timedQuery();
- if ((System.currentTimeMillis() - lastResultWritten) > 1000) {
- resultManager.write(dataModelResult);
- lastResultWritten = System.currentTimeMillis();
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- logger.info("\n\nThread exiting." + t.getName() + "\n\n");
- }
-
- /**
- * Thread start
- * @return
- */
- public Thread start() {
- if (t == null) {
- t = new Thread(this, threadName);
- t.start();
- }
- return t;
- }
-
- private synchronized ThreadTime getThreadTime() {
- return threadTime;
- }
-
- /**
- * Timed query execution
- *
- * @throws Exception
- */
- private void timedQuery() throws Exception {
- boolean isSelectCountStatement = query.getStatement().toUpperCase().trim()
- .contains("COUNT(*)") ? true : false;
-
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- Long start = System.currentTimeMillis();
- Date startDate = Calendar.getInstance().getTime();
- String exception = null;
- long resultRowCount = 0;
-
- try {
- conn = pUtil.getConnection(query.getTenantId());
- statement = conn.prepareStatement(query.getStatement());
- boolean isQuery = statement.execute();
- if (isQuery) {
- rs = statement.getResultSet();
- while (rs.next()) {
- if (null != query.getExpectedAggregateRowCount()) {
- if (rs.getLong(1) != query.getExpectedAggregateRowCount())
- throw new RuntimeException("Aggregate count "
- + rs.getLong(1) + " does not match expected "
- + query.getExpectedAggregateRowCount());
- }
-
- if (isSelectCountStatement) {
- resultRowCount = rs.getLong(1);
- } else {
- resultRowCount++;
- }
- }
- } else {
- conn.commit();
- }
- } catch (Exception e) {
- e.printStackTrace();
- exception = e.getMessage();
- } finally {
- getThreadTime().getRunTimesInMs().add(
- new RunTime(exception, startDate, resultRowCount, (int) (System.currentTimeMillis() - start)));
-
- if (rs != null) rs.close();
- if (statement != null) statement.close();
- if (conn != null) conn.close();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
index 6f6e000..624188c 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
@@ -18,227 +18,256 @@
package org.apache.phoenix.pherf.workload;
-import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.phoenix.pherf.PherfConstants.RunMode;
-import org.apache.phoenix.pherf.configuration.XMLConfigParser;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.phoenix.pherf.PherfConstants.RunMode;
+import org.apache.phoenix.pherf.configuration.*;
import org.apache.phoenix.pherf.result.*;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.phoenix.pherf.configuration.DataModel;
-import org.apache.phoenix.pherf.configuration.ExecutionType;
-import org.apache.phoenix.pherf.configuration.Query;
-import org.apache.phoenix.pherf.configuration.QuerySet;
-import org.apache.phoenix.pherf.configuration.Scenario;
-import org.apache.phoenix.pherf.util.PhoenixUtil;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+public class QueryExecutor implements Workload {
+ private static final Logger logger = LoggerFactory.getLogger(QueryExecutor.class);
+ private List<DataModel> dataModels;
+ private String queryHint;
+ private final RunMode runMode;
+ private final boolean exportCSV;
+ private final ExecutorService pool;
+ private final XMLConfigParser parser;
+ private final PhoenixUtil util;
+
+ public QueryExecutor(XMLConfigParser parser, PhoenixUtil util, ExecutorService pool) {
+ this(parser, util, pool, parser.getDataModels(), null, false, RunMode.PERFORMANCE);
+ }
+
+ public QueryExecutor(XMLConfigParser parser, PhoenixUtil util, ExecutorService pool,
+ List<DataModel> dataModels, String queryHint, boolean exportCSV, RunMode runMode) {
+ this.parser = parser;
+ this.queryHint = queryHint;
+ this.exportCSV = exportCSV;
+ this.runMode = runMode;
+ this.dataModels = dataModels;
+ this.pool = pool;
+ this.util = util;
+ }
+
+ @Override public void complete() {
+
+ }
+
+ /**
+ * Calls in Multithreaded Query Executor for all datamodels
+ *
+ * @throws Exception
+ */
+ public Runnable execute() throws Exception {
+ Runnable runnable = null;
+ for (DataModel dataModel : dataModels) {
+ if (exportCSV) {
+ runnable = exportAllScenarios(dataModel);
+ } else {
+ runnable = executeAllScenarios(dataModel);
+ }
+ }
+ return runnable;
+ }
+
+ /**
+ * Export all queries results to CSV
+ *
+ * @param dataModel
+ * @throws Exception
+ */
+ protected Runnable exportAllScenarios(final DataModel dataModel) throws Exception {
+ return new Runnable() {
+ @Override public void run() {
+ try {
+
+ List<Scenario> scenarios = dataModel.getScenarios();
+ QueryVerifier exportRunner = new QueryVerifier(false);
+ for (Scenario scenario : scenarios) {
+ for (QuerySet querySet : scenario.getQuerySet()) {
+ util.executeQuerySetDdls(querySet);
+ for (Query query : querySet.getQuery()) {
+ exportRunner.exportCSV(query);
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("", e);
+ }
+ }
+ };
+ }
+
+ /**
+ * Execute all scenarios
+ *
+ * @param dataModel
+ * @throws Exception
+ */
+ protected Runnable executeAllScenarios(final DataModel dataModel) throws Exception {
+ return new Runnable() {
+ @Override public void run() {
+ List<DataModelResult> dataModelResults = new ArrayList<>();
+ DataModelResult
+ dataModelResult =
+ new DataModelResult(dataModel, PhoenixUtil.getZookeeper());
+ ResultManager
+ resultManager =
+ new ResultManager(dataModelResult.getName(), QueryExecutor.this.runMode);
+
+ dataModelResults.add(dataModelResult);
+ List<Scenario> scenarios = dataModel.getScenarios();
+ Configuration conf = HBaseConfiguration.create();
+ Map<String, String> phoenixProperty = conf.getValByRegex("phoenix");
+ try {
+
+ for (Scenario scenario : scenarios) {
+ ScenarioResult scenarioResult = new ScenarioResult(scenario);
+ scenarioResult.setPhoenixProperties(phoenixProperty);
+ dataModelResult.getScenarioResult().add(scenarioResult);
+ WriteParams writeParams = scenario.getWriteParams();
+
+ if (writeParams != null) {
+ int writerThreadCount = writeParams.getWriterThreadCount();
+ for (int i = 0; i < writerThreadCount; i++) {
+ logger.debug("Inserting write workload ( " + i + " ) of ( "
+ + writerThreadCount + " )");
+ Workload writes = new WriteWorkload(PhoenixUtil.create(), parser);
+ pool.submit(writes.execute());
+ }
+ }
+
+ for (QuerySet querySet : scenario.getQuerySet()) {
+ QuerySetResult querySetResult = new QuerySetResult(querySet);
+ scenarioResult.getQuerySetResult().add(querySetResult);
+
+ util.executeQuerySetDdls(querySet);
+ if (querySet.getExecutionType() == ExecutionType.SERIAL) {
+ executeQuerySetSerial(dataModelResult, querySet, querySetResult);
+ } else {
+ executeQuerySetParallel(dataModelResult, querySet, querySetResult);
+ }
+ }
+ resultManager.write(dataModelResult);
+ }
+ resultManager.write(dataModelResults);
+ } catch (Exception e) {
+ logger.warn("", e);
+ }
+ }
+ };
+ }
-public class QueryExecutor {
- private static final Logger logger = LoggerFactory.getLogger(QueryExecutor.class);
- private List<DataModel> dataModels;
- private String queryHint;
- private RunMode runMode;
+ /**
+ * Execute query set serially
+ *
+ * @param dataModelResult
+ * @param querySet
+ * @param querySetResult
+ * @throws InterruptedException
+ */
+ protected void executeQuerySetSerial(DataModelResult dataModelResult, QuerySet querySet,
+ QuerySetResult querySetResult) throws InterruptedException {
+ for (Query query : querySet.getQuery()) {
+ QueryResult queryResult = new QueryResult(query);
+ querySetResult.getQueryResults().add(queryResult);
+
+ for (int cr = querySet.getMinConcurrency(); cr <= querySet.getMaxConcurrency(); cr++) {
+
+ List<Future> threads = new ArrayList<>();
+
+ for (int i = 0; i < cr; i++) {
+
+ Runnable
+ thread =
+ executeRunner((i + 1) + "," + cr, dataModelResult, queryResult,
+ querySetResult);
+ threads.add(pool.submit(thread));
+ }
+
+ for (Future thread : threads) {
+ try {
+ thread.get();
+ } catch (ExecutionException e) {
+ logger.error("", e);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Execute query set in parallel
+ *
+ * @param dataModelResult
+ * @param querySet
+ * @param querySetResult
+ * @throws InterruptedException
+ */
+ protected void executeQuerySetParallel(DataModelResult dataModelResult, QuerySet querySet,
+ QuerySetResult querySetResult) throws InterruptedException {
+ for (int cr = querySet.getMinConcurrency(); cr <= querySet.getMaxConcurrency(); cr++) {
+ List<Future> threads = new ArrayList<>();
+ for (int i = 0; i < cr; i++) {
+ for (Query query : querySet.getQuery()) {
+ QueryResult queryResult = new QueryResult(query);
+ querySetResult.getQueryResults().add(queryResult);
+
+ Runnable
+ thread =
+ executeRunner((i + 1) + "," + cr, dataModelResult, queryResult,
+ querySetResult);
+ threads.add(pool.submit(thread));
+ }
+
+ for (Future thread : threads) {
+ try {
+ thread.get();
+ } catch (ExecutionException e) {
+ logger.error("", e);
+ }
+ }
+ }
+ }
+ }
- public QueryExecutor(XMLConfigParser parser) {
- this.dataModels = parser.getDataModels();
+ /**
+ * Execute multi-thread runner
+ *
+ * @param name
+ * @param dataModelResult
+ * @param queryResult
+ * @param querySet
+ * @return
+ */
+ protected Runnable executeRunner(String name, DataModelResult dataModelResult,
+ QueryResult queryResult, QuerySet querySet) {
+ ThreadTime threadTime = new ThreadTime();
+ queryResult.getThreadTimes().add(threadTime);
+ threadTime.setThreadName(name);
+ queryResult.setHint(this.queryHint);
+ logger.info("\nExecuting query " + queryResult.getStatement());
+ Runnable thread;
+ if (this.runMode == RunMode.FUNCTIONAL) {
+ thread =
+ new MultithreadedDiffer(threadTime.getThreadName(), queryResult, threadTime,
+ querySet.getNumberOfExecutions(), querySet.getExecutionDurationInMs());
+ } else {
+ thread =
+ new MultiThreadedRunner(threadTime.getThreadName(), queryResult,
+ dataModelResult, threadTime, querySet.getNumberOfExecutions(),
+ querySet.getExecutionDurationInMs());
+ }
+ return thread;
}
-
- /**
- * Calls in Multithreaded Query Executor for all datamodels
- * @throws Exception
- */
- public void execute(String queryHint, boolean exportCSV, RunMode runMode) throws Exception {
- this.queryHint = queryHint;
- this.runMode = runMode;
- for (DataModel dataModel: dataModels) {
- if (exportCSV) {
- exportAllScenarios(dataModel);
- } else {
- executeAllScenarios(dataModel);
- }
- }
- }
-
- /**
- * Export all queries results to CSV
- * @param dataModel
- * @throws Exception
- */
- protected void exportAllScenarios(DataModel dataModel) throws Exception {
- List<Scenario> scenarios = dataModel.getScenarios();
- QueryVerifier exportRunner = new QueryVerifier(false);
- for (Scenario scenario : scenarios) {
- for (QuerySet querySet : scenario.getQuerySet()) {
- executeQuerySetDdls(querySet);
- for (Query query : querySet.getQuery()) {
- exportRunner.exportCSV(query);
- }
- }
- }
- }
-
- /**
- * Execute all scenarios
- * @param dataModel
- * @throws Exception
- */
- protected void executeAllScenarios(DataModel dataModel) throws Exception {
- List<DataModelResult> dataModelResults = new ArrayList<DataModelResult>();
- DataModelResult dataModelResult = new DataModelResult(dataModel, PhoenixUtil.getZookeeper());
- ResultManager resultManager = new ResultManager(dataModelResult.getName(), this.runMode);
-
-
- dataModelResults.add(dataModelResult);
- List<Scenario> scenarios = dataModel.getScenarios();
- Configuration conf = HBaseConfiguration.create();
- Map<String, String> phoenixProperty = conf.getValByRegex("phoenix");
- phoenixProperty.putAll(conf.getValByRegex("sfdc"));
-
- for (Scenario scenario : scenarios) {
- ScenarioResult scenarioResult = new ScenarioResult(scenario);
- scenarioResult.setPhoenixProperties(phoenixProperty);
- dataModelResult.getScenarioResult().add(scenarioResult);
-
- for (QuerySet querySet : scenario.getQuerySet()) {
- QuerySetResult querySetResult = new QuerySetResult(querySet);
- scenarioResult.getQuerySetResult().add(querySetResult);
-
- executeQuerySetDdls(querySet);
-
- if (querySet.getExecutionType() == ExecutionType.SERIAL) {
- execcuteQuerySetSerial(dataModelResult, querySet, querySetResult, scenarioResult);
- } else {
- execcuteQuerySetParallel(dataModelResult, querySet, querySetResult, scenarioResult);
- }
- }
- resultManager.write(dataModelResult);
- }
- resultManager.write(dataModelResults);
- }
-
- /**
- * Execute all querySet DDLs first based on tenantId if specified. This is executed
- * first since we don't want to run DDLs in parallel to executing queries.
- *
- * @param querySet
- * @throws Exception
- */
- protected void executeQuerySetDdls(QuerySet querySet) throws Exception {
- PhoenixUtil pUtil = new PhoenixUtil();
- for (Query query : querySet.getQuery()) {
- if (null != query.getDdl()) {
- Connection conn = null;
- try {
- logger.info("\nExecuting DDL:" + query.getDdl() + " on tenantId:" + query.getTenantId());
- pUtil.executeStatement(query.getDdl(), conn = pUtil.getConnection(query.getTenantId()));
- } finally {
- if (null != conn) {
- conn.close();
- }
- }
- }
- }
- }
-
- /**
- * Execute query set serially
- * @param dataModelResult
- * @param querySet
- * @param querySetResult
- * @param scenario
- * @throws InterruptedException
- */
- protected void execcuteQuerySetSerial(DataModelResult dataModelResult, QuerySet querySet, QuerySetResult querySetResult, Scenario scenario) throws InterruptedException {
- for (Query query : querySet.getQuery()) {
- QueryResult queryResult = new QueryResult(query);
- querySetResult.getQueryResults().add(queryResult);
-
- for (int cr = querySet.getMinConcurrency(); cr <= querySet
- .getMaxConcurrency(); cr++) {
-
- List<Thread> threads = new ArrayList<Thread>();
-
- for (int i = 0; i < cr; i++) {
-
- Thread thread = executeRunner((i + 1) + ","
- + cr, dataModelResult, queryResult,
- querySetResult);
- threads.add(thread);
- }
-
- for (Thread thread : threads) {
- thread.join();
- }
- }
- }
- }
-
- /**
- * Execute query set in parallel
- * @param dataModelResult
- * @param querySet
- * @param querySetResult
- * @param scenario
- * @throws InterruptedException
- */
- protected void execcuteQuerySetParallel(DataModelResult dataModelResult, QuerySet querySet, QuerySetResult querySetResult, Scenario scenario)
- throws InterruptedException {
- for (int cr = querySet.getMinConcurrency(); cr <= querySet
- .getMaxConcurrency(); cr++) {
- List<Thread> threads = new ArrayList<Thread>();
- for (int i = 0; i < cr; i++) {
- for (Query query : querySet.getQuery()) {
- QueryResult queryResult = new QueryResult(query);
- querySetResult.getQueryResults().add(queryResult);
-
- Thread thread = executeRunner((i + 1) + ","
- + cr, dataModelResult, queryResult,
- querySetResult);
- threads.add(thread);
- }
- }
- for (Thread thread : threads) {
- thread.join();
- }
- }
- }
-
- /**
- * Execute multi-thread runner
- * @param name
- * @param dataModelResult
- * @param queryResult
- * @param querySet
- * @return
- */
- protected Thread executeRunner(String name, DataModelResult dataModelResult, QueryResult queryResult, QuerySet querySet) {
- ThreadTime threadTime = new ThreadTime();
- queryResult.getThreadTimes().add(threadTime);
- threadTime.setThreadName(name);
- queryResult.setHint(this.queryHint);
- logger.info("\nExecuting query "
- + queryResult.getStatement());
- Thread thread;
- if (this.runMode == RunMode.FUNCTIONAL) {
- thread = new MultithreadedDiffer(
- threadTime.getThreadName(),
- queryResult,
- threadTime, querySet.getNumberOfExecutions(), querySet.getExecutionDurationInMs())
- .start();
- } else {
- thread = new MultithreadedRunner(
- threadTime.getThreadName(),
- queryResult,
- dataModelResult,
- threadTime, querySet.getNumberOfExecutions(), querySet.getExecutionDurationInMs())
- .start();
- }
- return thread;
- }
-}
+}
\ No newline at end of file
[3/3] phoenix git commit: PHOENIX-1920 - Pherf - Add support for
mixed r/w workloads
Posted by co...@apache.org.
PHOENIX-1920 - Pherf - Add support for mixed r/w workloads
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7175dcbc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7175dcbc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7175dcbc
Branch: refs/heads/master
Commit: 7175dcbc011dff48f6d041697ec84da98f80f729
Parents: 466eeb3
Author: cmarcel <cm...@salesforce.com>
Authored: Fri Jun 19 16:34:41 2015 -0700
Committer: cmarcel <cm...@salesforce.com>
Committed: Fri Jun 19 16:34:41 2015 -0700
----------------------------------------------------------------------
.gitignore | 2 +
phoenix-pherf/pom.xml | 10 +-
.../org/apache/phoenix/pherf/DataIngestIT.java | 134 ++++--
.../org/apache/phoenix/pherf/PherfMainIT.java | 36 ++
.../apache/phoenix/pherf/ResultBaseTestIT.java | 31 +-
.../apache/phoenix/pherf/SchemaReaderIT.java | 17 +-
.../java/org/apache/phoenix/pherf/Pherf.java | 179 +++++---
.../apache/phoenix/pherf/PherfConstants.java | 8 +-
.../phoenix/pherf/configuration/DataModel.java | 10 -
.../phoenix/pherf/configuration/Scenario.java | 12 +-
.../pherf/configuration/WriteParams.java | 72 +++
.../pherf/configuration/XMLConfigParser.java | 25 +-
.../phoenix/pherf/jmx/MonitorManager.java | 153 ++++---
.../phoenix/pherf/loaddata/DataLoader.java | 332 --------------
.../pherf/result/DataLoadThreadTime.java | 87 ++--
.../pherf/result/DataLoadTimeSummary.java | 54 +--
.../phoenix/pherf/result/DataModelResult.java | 68 ++-
.../phoenix/pherf/result/QueryResult.java | 17 +-
.../phoenix/pherf/result/QuerySetResult.java | 40 +-
.../org/apache/phoenix/pherf/result/Result.java | 11 +-
.../phoenix/pherf/result/ResultHandler.java | 5 +
.../phoenix/pherf/result/ResultManager.java | 19 +-
.../apache/phoenix/pherf/result/ResultUtil.java | 119 +++--
.../phoenix/pherf/result/ResultValue.java | 4 +-
.../apache/phoenix/pherf/result/RunTime.java | 179 ++++----
.../phoenix/pherf/result/ScenarioResult.java | 44 +-
.../apache/phoenix/pherf/result/ThreadTime.java | 34 +-
.../phoenix/pherf/result/file/Extension.java | 3 +-
.../phoenix/pherf/result/file/Header.java | 11 +-
.../pherf/result/impl/CSVResultHandler.java | 47 +-
.../pherf/result/impl/ImageResultHandler.java | 58 +--
.../pherf/result/impl/XMLResultHandler.java | 36 +-
.../phoenix/pherf/schema/SchemaReader.java | 2 +-
.../apache/phoenix/pherf/util/PhoenixUtil.java | 64 ++-
.../pherf/workload/MultiThreadedRunner.java | 153 +++++++
.../pherf/workload/MultithreadedDiffer.java | 131 +++---
.../pherf/workload/MultithreadedRunner.java | 170 -------
.../phoenix/pherf/workload/QueryExecutor.java | 459 ++++++++++---------
.../phoenix/pherf/workload/QueryVerifier.java | 265 +++++------
.../apache/phoenix/pherf/workload/Workload.java | 10 +
.../pherf/workload/WorkloadExecutor.java | 109 ++---
.../phoenix/pherf/workload/WriteWorkload.java | 403 ++++++++++++++++
.../scenario/prod_test_unsalted_scenario.xml | 35 ++
.../phoenix/pherf/ConfigurationParserTest.java | 102 +++--
.../org/apache/phoenix/pherf/ResultTest.java | 5 +-
.../apache/phoenix/pherf/RuleGeneratorTest.java | 15 +-
.../test/resources/scenario/test_scenario.xml | 58 ++-
47 files changed, 2171 insertions(+), 1667 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index fc0e4af..b918d76 100644
--- a/.gitignore
+++ b/.gitignore
@@ -22,3 +22,5 @@
target/
release/
RESULTS/
+CSV_EXPORT/
+
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-pherf/pom.xml b/phoenix-pherf/pom.xml
index 1667c66..0facbde 100644
--- a/phoenix-pherf/pom.xml
+++ b/phoenix-pherf/pom.xml
@@ -16,7 +16,8 @@
~ limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -30,7 +31,7 @@
<name>Phoenix - Pherf</name>
<properties>
- <top.dir>${project.basedir}/..</top.dir>
+ <top.dir>${project.basedir}/..</top.dir>
</properties>
<profiles>
@@ -233,6 +234,11 @@
<!-- Test Dependencies -->
<dependency>
+ <groupId>com.jcabi</groupId>
+ <artifactId>jcabi-jdbc</artifactId>
+ <version>0.15</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java
index 2b56f43..828ac38 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java
@@ -18,70 +18,122 @@
package org.apache.phoenix.pherf;
+import com.jcabi.jdbc.JdbcSession;
+import com.jcabi.jdbc.Outcome;
import org.apache.phoenix.pherf.configuration.Column;
+import org.apache.phoenix.pherf.configuration.DataModel;
import org.apache.phoenix.pherf.configuration.DataTypeMapping;
import org.apache.phoenix.pherf.configuration.Scenario;
-import org.apache.phoenix.pherf.configuration.XMLConfigParser;
-import org.apache.phoenix.pherf.loaddata.DataLoader;
import org.apache.phoenix.pherf.rules.DataValue;
import org.apache.phoenix.pherf.rules.RulesApplier;
-import org.apache.phoenix.pherf.schema.SchemaReader;
-import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.QueryExecutor;
+import org.apache.phoenix.pherf.workload.WorkloadExecutor;
+import org.apache.phoenix.pherf.workload.WriteWorkload;
+import org.junit.Before;
import org.junit.Test;
-import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
public class DataIngestIT extends ResultBaseTestIT {
- protected static PhoenixUtil util = new PhoenixUtil(true);
- static final String matcherScenario = ".*scenario/.*test.*xml";
- static final String matcherSchema = ".*datamodel/.*test.*sql";
- @Test
- public void generateData() throws Exception {
- util.setZookeeper("localhost");
- SchemaReader reader = new SchemaReader(util, matcherSchema);
- XMLConfigParser parser = new XMLConfigParser(matcherScenario);
+ @Before
+ public void applySchema() throws Exception {
+ reader.applySchema();
+ resources = new ArrayList<>(reader.getResourceList());
- // 1. Generate table schema from file
- List<Path> resources = new ArrayList<>(reader.getResourceList());
assertTrue("Could not pull list of schema files.", resources.size() > 0);
assertNotNull("Could not read schema file.", reader.resourceToString(resources.get(0)));
- reader.applySchema();
+ }
+
+ @Test
+ public void testColumnRulesApplied() {
+
+ Scenario scenario = null;
+ try {
+ scenario = parser.getScenarioByName("testScenario");
+ List<Column>
+ columnListFromPhoenix =
+ util.getColumnsFromPhoenix(scenario.getSchemaName(),
+ scenario.getTableNameWithoutSchemaName(), util.getConnection());
+ assertTrue("Could not get phoenix columns.", columnListFromPhoenix.size() > 0);
+
+ WriteWorkload loader = new WriteWorkload(util, parser, scenario);
+ WorkloadExecutor executor = new WorkloadExecutor();
+ executor.add(loader);
+
+ RulesApplier rulesApplier = loader.getRulesApplier();
+ List<Map> modelList = rulesApplier.getModelList();
+ assertTrue("Could not generate the modelList", modelList.size() > 0);
+
+ for (Column column : columnListFromPhoenix) {
+ DataValue data = rulesApplier.getDataForRule(scenario, column);
- // 2. Load the metadata of for the test tables
- Scenario scenario = parser.getScenarios().get(0);
- List<Column> columnListFromPhoenix = util.getColumnsFromPhoenix(scenario.getSchemaName(), scenario.getTableNameWithoutSchemaName(), util.getConnection());
- assertTrue("Could not get phoenix columns.", columnListFromPhoenix.size() > 0);
- DataLoader loader = new DataLoader(util,parser);
- RulesApplier rulesApplier = loader.getRulesApplier();
- List<Map> modelList = rulesApplier.getModelList();
- assertTrue("Could not generate the modelList", modelList.size() > 0);
-
- for (Column column : columnListFromPhoenix) {
- DataValue data = rulesApplier.getDataForRule(scenario, column);
-
- // We are generating data values so the value should have been specified by this point.
- assertTrue("Failed to retrieve data for column type: " + column.getType(), data != null);
-
- // Test that we still retrieve the GENERAL_CHAR rule even after an override is applied to another CHAR type.
- // NEWVAL_STRING Column does not specify an override so we should get the default rule.
- if ((column.getType() == DataTypeMapping.VARCHAR) && (column.getName().equals("NEWVAL_STRING"))) {
- assertTrue("Failed to retrieve data for column type: ", data.getDistribution() == Integer.MIN_VALUE);
+ // We are generating data values
+ // so the value should have been specified by this point.
+ assertTrue("Failed to retrieve data for column type: " + column.getType(),
+ data != null);
+
+ // Test that we still retrieve the GENERAL_CHAR rule even after an override is
+ // applied to another CHAR type. NEWVAL_STRING Column does not specify an override
+ // so we should get the default rule.
+ if ((column.getType() == DataTypeMapping.VARCHAR) && (column.getName()
+ .equals("NEWVAL_STRING"))) {
+ assertTrue("Failed to retrieve data for column type: ",
+ data.getDistribution() == Integer.MIN_VALUE);
+ }
}
+ } catch (Exception e) {
+ fail("We had an exception: " + e.getMessage());
}
+ }
+
+ @Test
+ public void testRWWorkload() throws Exception {
+
+ Connection connection = util.getConnection();
+
+ WorkloadExecutor executor = new WorkloadExecutor();
+ DataModel dataModel = parser.getDataModelByName("test_scenario");
+ List<DataModel> dataModels = new ArrayList<>();
+ dataModels.add(dataModel);
+ QueryExecutor
+ qe =
+ new QueryExecutor(parser, util, executor.getPool(), dataModels, null, false,
+ PherfConstants.RunMode.PERFORMANCE);
+ executor.add(qe);
+ Scenario scenario = parser.getScenarioByName("testScenarioRW");
+
+ String sql = "select count(*) from " + scenario.getTableName();
- // Load up the data.
try {
- loader.execute();
+ // Wait for data to load up.
+ executor.get();
+ executor.shutdown();
+
+ // Verify data has been loaded
+ Integer count = new JdbcSession(connection).sql(sql).select(new Outcome<Integer>() {
+ @Override public Integer handle(ResultSet resultSet, Statement statement)
+ throws SQLException {
+ while (resultSet.next()) {
+ return resultSet.getInt(1);
+ }
+ return null;
+ }
+ });
+ assertNotNull("Could not retrieve count. " + count);
+
+ // It would be better to sum up all the rowcounts for the scenarios, but this is fine
+ assertTrue("Could not query any rows for in " + scenario.getTableName(), count > 0);
} catch (Exception e) {
- fail("Failed to lead data. An exception was thrown: " + e.getMessage());
+ fail("Failed to load data. An exception was thrown: " + e.getMessage());
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
new file mode 100644
index 0000000..2407ef4
--- /dev/null
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.ExpectedSystemExit;
+
+public class PherfMainIT extends ResultBaseTestIT {
+ @Rule
+ public final ExpectedSystemExit exit = ExpectedSystemExit.none();
+
+ @Test
+ public void testPherfMain() {
+ String[] args = { "-q",
+ "--scenarioFile", ".*prod_test_unsalted_scenario.*",
+ "-m", "--monitorFrequency", "10" };
+ Pherf.main(args);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java
index 6e103b8..d2c5173 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java
@@ -19,27 +19,38 @@
package org.apache.phoenix.pherf;
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.pherf.configuration.XMLConfigParser;
import org.apache.phoenix.pherf.result.ResultUtil;
+import org.apache.phoenix.pherf.schema.SchemaReader;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
import org.junit.BeforeClass;
+import java.nio.file.Path;
+import java.util.List;
import java.util.Properties;
public class ResultBaseTestIT extends BaseHBaseManagedTimeIT {
- private static boolean isSetUpDone = false;
+ protected static final String matcherScenario = ".*scenario/.*test.*xml";
+ protected static final String matcherSchema = ".*datamodel/.*test.*sql";
- @BeforeClass
- public static void setUp() throws Exception {
- if (isSetUpDone) {
- return;
- }
+ protected static PhoenixUtil util = PhoenixUtil.create(true);
+ protected static Properties properties;
+ protected static SchemaReader reader;
+ protected static XMLConfigParser parser;
+ protected static List<Path> resources;
+ protected static ResultUtil resultUtil = new ResultUtil();
+
+ @BeforeClass public static void setUp() throws Exception {
- ResultUtil util = new ResultUtil();
PherfConstants constants = PherfConstants.create();
- Properties properties = constants.getProperties(PherfConstants.PHERF_PROPERTIES);
+ properties = constants.getProperties(PherfConstants.PHERF_PROPERTIES);
String dir = properties.getProperty("pherf.default.results.dir");
String targetDir = "target/" + dir;
properties.setProperty("pherf.default.results.dir", targetDir);
- util.ensureBaseDirExists(targetDir);
- isSetUpDone = true;
+ resultUtil.ensureBaseDirExists(targetDir);
+
+ util.setZookeeper("localhost");
+ reader = new SchemaReader(util, matcherSchema);
+ parser = new XMLConfigParser(matcherScenario);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java
index 2cb7c13..bce1e91 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java
@@ -34,15 +34,12 @@ import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
public class SchemaReaderIT extends BaseHBaseManagedTimeIT {
- protected static PhoenixUtil util = new PhoenixUtil(true);
+ protected static PhoenixUtil util = PhoenixUtil.create(true);
- @Test
- public void testSchemaReader() {
+ @Test public void testSchemaReader() {
// Test for the unit test version of the schema files.
assertApplySchemaTest();
}
@@ -55,7 +52,8 @@ public class SchemaReaderIT extends BaseHBaseManagedTimeIT {
List<Path> resources = new ArrayList<>(reader.getResourceList());
assertTrue("Could not pull list of schema files.", resources.size() > 0);
assertNotNull("Could not read schema file.", this.getClass().getResourceAsStream(
- PherfConstants.RESOURCE_DATAMODEL + "/" + resources.get(0).getFileName().toString()));
+ PherfConstants.RESOURCE_DATAMODEL + "/" + resources.get(0).getFileName()
+ .toString()));
assertNotNull("Could not read schema file.", reader.resourceToString(resources.get(0)));
reader.applySchema();
@@ -67,7 +65,10 @@ public class SchemaReaderIT extends BaseHBaseManagedTimeIT {
DataModel data = XMLConfigParser.readDataModel(resourcePath);
List<Scenario> scenarioList = data.getScenarios();
Scenario scenario = scenarioList.get(0);
- List<Column> columnList = util.getColumnsFromPhoenix(scenario.getSchemaName(), scenario.getTableNameWithoutSchemaName(), connection);
+ List<Column>
+ columnList =
+ util.getColumnsFromPhoenix(scenario.getSchemaName(),
+ scenario.getTableNameWithoutSchemaName(), connection);
assertTrue("Could not retrieve Metadata from Phoenix", columnList.size() > 0);
} catch (Exception e) {
fail("Could not initialize SchemaReader");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
index 073c661..5a9f45f 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
@@ -18,44 +18,61 @@
package org.apache.phoenix.pherf;
+import org.apache.commons.cli.*;
import org.apache.phoenix.pherf.configuration.XMLConfigParser;
+import org.apache.phoenix.pherf.jmx.MonitorManager;
import org.apache.phoenix.pherf.schema.SchemaReader;
import org.apache.phoenix.pherf.util.PhoenixUtil;
import org.apache.phoenix.pherf.util.ResourceList;
+import org.apache.phoenix.pherf.workload.QueryExecutor;
+import org.apache.phoenix.pherf.workload.Workload;
import org.apache.phoenix.pherf.workload.WorkloadExecutor;
-
-import org.apache.commons.cli.*;
+import org.apache.phoenix.pherf.workload.WriteWorkload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Properties;
public class Pherf {
private static final Logger logger = LoggerFactory.getLogger(Pherf.class);
private static final Options options = new Options();
+ private final PhoenixUtil phoenixUtil = PhoenixUtil.create();
static {
+ options.addOption("disableSchemaApply", false, "Set to disable schema from being applied.");
+ options.addOption("z", "zookeeper", true,
+ "HBase Zookeeper address for connection. Default: localhost");
+ options.addOption("q", "query", false, "Executes multi-threaded query sets");
+ options.addOption("listFiles", false, "List available resource files");
+ options.addOption("l", "load", false,
+ "Pre-loads data according to specified configuration values.");
+ options.addOption("scenarioFile", true,
+ "Regex or file name for the Test Scenario configuration .xml file to use.");
+ options.addOption("drop", true, "Regex drop all tables with schema name as PHERF. "
+ + "\nExample drop Event tables: -drop .*(EVENT).* Drop all: -drop .* or -drop all");
+ options.addOption("schemaFile", true,
+ "Regex or file name for the Test phoenix table schema .sql to use.");
options.addOption("m", "monitor", false, "Launch the stats profilers");
- options.addOption("monitorFrequency", true, "Override for frequency in Ms for which monitor should log stats. " +
- "\n See pherf.default.monitorFrequency in pherf.properties");
- options.addOption("d", "debug", false, "Put tool in debug mode");
- options.addOption("z", "zookeeper", true, "HBase Zookeeper address for connection. Default: localhost");
- options.addOption("l", "load", false, "Loads data according to specified configuration values.");
- options.addOption("scenarioFile", true, "Regex or file name for the Test Scenario configuration .xml file to use.");
- options.addOption("drop", true, "Regex drop all tables with schema name as PHERF. " +
- "\nExample drop Event tables: -drop .*(EVENT).* Drop all: -drop .* or -drop all");
- options.addOption("schemaFile", true, "Regex or file name for the Test phoenix table schema .sql to use.");
- options.addOption("rowCountOverride", true, "Row count override to use instead of one specified in scenario.");
+ options.addOption("monitorFrequency", true,
+ "Override for frequency in Ms for which monitor should log stats. "
+ + "\n See pherf.default.monitorFrequency in pherf.properties");
+ options.addOption("rowCountOverride", true,
+ "Row count override to use instead of one specified in scenario.");
options.addOption("hint", true, "Executes all queries with specified hint. Example SMALL");
- options.addOption("diff", false, "Run pherf in verification mode and diff with exported results");
- options.addOption("export", false, "Exports query results to CSV files in " + PherfConstants.EXPORT_DIR + " directory");
- options.addOption("listFiles", false, "List available resource files");
- options.addOption("writerThreadSize", true, "Override the default number of writer threads. " +
- "See pherf.default.dataloader.threadpool in Pherf.properties.");
- options.addOption("q", "query", false, "Executes multi-threaded query sets");
+ options.addOption("diff", false,
+ "Run pherf in verification mode and diff with exported results");
+ options.addOption("export", false,
+ "Exports query results to CSV files in " + PherfConstants.EXPORT_DIR
+ + " directory");
+ options.addOption("writerThreadSize", true,
+ "Override the default number of writer threads. "
+ + "See pherf.default.dataloader.threadpool in Pherf.properties.");
options.addOption("h", "help", false, "Get help on using this utility.");
+ options.addOption("d", "debug", false, "Put tool in debug mode");
}
private final String zookeeper;
@@ -63,14 +80,15 @@ public class Pherf {
private final String schemaFile;
private final String queryHint;
private final Properties properties;
- private final boolean loadData;
+ private final boolean preLoadData;
private final String dropPherfTablesRegEx;
private final boolean executeQuerySets;
private final boolean exportCSV;
private final boolean diff;
private final boolean monitor;
private final int rowCountOverride;
- private final boolean listFiles;
+ private final boolean listFiles;
+ private final boolean applySchema;
public Pherf(String[] args) throws Exception {
CommandLineParser parser = new PosixParser();
@@ -87,30 +105,35 @@ public class Pherf {
properties = PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES);
dropPherfTablesRegEx = command.getOptionValue("drop", null);
monitor = command.hasOption("m");
- String monitorFrequency = (command.hasOption("m") && command.hasOption("monitorFrequency"))
- ? command.getOptionValue("monitorFrequency")
- : properties.getProperty("pherf.default.monitorFrequency");
+ String
+ monitorFrequency =
+ (command.hasOption("m") && command.hasOption("monitorFrequency")) ?
+ command.getOptionValue("monitorFrequency") :
+ properties.getProperty("pherf.default.monitorFrequency");
properties.setProperty("pherf.default.monitorFrequency", monitorFrequency);
logger.debug("Using Monitor: " + monitor);
logger.debug("Monitor Frequency Ms:" + monitorFrequency);
- loadData = command.hasOption("l");
+ preLoadData = command.hasOption("l");
executeQuerySets = command.hasOption("q");
zookeeper = command.getOptionValue("z", "localhost");
queryHint = command.getOptionValue("hint", null);
exportCSV = command.hasOption("export");
diff = command.hasOption("diff");
listFiles = command.hasOption("listFiles");
- scenarioFile = command.hasOption("scenarioFile") ? command.getOptionValue("scenarioFile") : null;
+ applySchema = !command.hasOption("disableSchemaApply");
+ scenarioFile =
+ command.hasOption("scenarioFile") ? command.getOptionValue("scenarioFile") : null;
schemaFile = command.hasOption("schemaFile") ? command.getOptionValue("schemaFile") : null;
rowCountOverride = Integer.parseInt(command.getOptionValue("rowCountOverride", "0"));
- String writerThreadPoolSize = command.getOptionValue("writerThreadSize",
- properties.getProperty("pherf.default.dataloader.threadpool"));
+ String
+ writerThreadPoolSize =
+ command.getOptionValue("writerThreadSize",
+ properties.getProperty("pherf.default.dataloader.threadpool"));
properties.setProperty("pherf. default.dataloader.threadpool", writerThreadPoolSize);
-
- if ((command.hasOption("h") || (args == null || args.length == 0))
- && !command.hasOption("listFiles")) {
+ if ((command.hasOption("h") || (args == null || args.length == 0)) && !command
+ .hasOption("listFiles")) {
hf.printHelp("Pherf", options);
System.exit(1);
}
@@ -128,17 +151,22 @@ public class Pherf {
}
public void run() throws Exception {
- WorkloadExecutor workloadExec = null;
+ MonitorManager monitorManager = null;
+ List<Workload> workloads = new ArrayList<>();
+ WorkloadExecutor workloadExecutor = new WorkloadExecutor(properties, workloads);
try {
if (listFiles) {
ResourceList list = new ResourceList(PherfConstants.RESOURCE_DATAMODEL);
- Collection<Path> schemaFiles = list.getResourceList(PherfConstants.SCHEMA_ROOT_PATTERN + ".sql");
+ Collection<Path>
+ schemaFiles =
+ list.getResourceList(PherfConstants.SCHEMA_ROOT_PATTERN + ".sql");
System.out.println("Schema Files:");
for (Path path : schemaFiles) {
System.out.println(path);
}
list = new ResourceList(PherfConstants.RESOURCE_SCENARIO);
- Collection<Path> scenarioFiles =
+ Collection<Path>
+ scenarioFiles =
list.getResourceList(PherfConstants.SCENARIO_ROOT_PATTERN + ".xml");
System.out.println("Scenario Files:");
for (Path path : scenarioFiles) {
@@ -146,49 +174,86 @@ public class Pherf {
}
return;
}
- workloadExec = (scenarioFile == null)
- ? new WorkloadExecutor(properties,
- new XMLConfigParser(PherfConstants.DEFAULT_FILE_PATTERN),
- monitor)
- : new WorkloadExecutor(properties,
- new XMLConfigParser(scenarioFile),
- monitor);
+ XMLConfigParser parser = new XMLConfigParser(scenarioFile);
// Drop tables with PHERF schema and regex comparison
if (null != dropPherfTablesRegEx) {
- logger.info("\nDropping existing table with PHERF namename and "
- + dropPherfTablesRegEx + " regex expression.");
- new PhoenixUtil().deleteTables(dropPherfTablesRegEx);
+ logger.info(
+ "\nDropping existing table with PHERF namename and " + dropPherfTablesRegEx
+ + " regex expression.");
+ phoenixUtil.deleteTables(dropPherfTablesRegEx);
}
- // Schema and Data Load
- if (loadData) {
+ if (monitor) {
+ monitorManager =
+ new MonitorManager(Integer.parseInt(
+ properties.getProperty("pherf.default.monitorFrequency")));
+ workloadExecutor.add(monitorManager);
+ }
+
+ if (applySchema) {
logger.info("\nStarting to apply schema...");
- SchemaReader reader = (schemaFile == null)
- ? new SchemaReader(".*.sql")
- : new SchemaReader(schemaFile);
+ SchemaReader
+ reader =
+ (schemaFile == null) ?
+ new SchemaReader(".*.sql") :
+ new SchemaReader(schemaFile);
reader.applySchema();
+ }
+ // Schema and Data Load
+ if (preLoadData) {
logger.info("\nStarting Data Load...");
- workloadExec.executeDataLoad();
+ WriteWorkload workload = new WriteWorkload(parser);
+ workloadExecutor.add(workload);
+
+ // Wait for dataLoad to complete
+ workloadExecutor.get(workload);
logger.info("\nGenerate query gold files after data load");
- workloadExec.executeMultithreadedQueryExecutor(queryHint, true, PherfConstants.RunMode.FUNCTIONAL);
+ QueryExecutor
+ goldFileGenerator =
+ new QueryExecutor(parser, phoenixUtil, workloadExecutor.getPool(),
+ parser.getDataModels(), queryHint, true,
+ PherfConstants.RunMode.FUNCTIONAL);
+ workloadExecutor
+ .add(goldFileGenerator);
+
+ // Wait for dataLoad to complete
+ workloadExecutor.get(goldFileGenerator);
} else {
- logger.info("\nSKIPPED: Data Load and schema creation as -l argument not specified");
+ logger.info(
+ "\nSKIPPED: Data Load and schema creation as -l argument not specified");
}
// Execute multi-threaded query sets
if (executeQuerySets) {
- logger.info("\nStarting to apply schema...");
- workloadExec.executeMultithreadedQueryExecutor(queryHint, exportCSV, diff ? PherfConstants.RunMode.FUNCTIONAL : PherfConstants.RunMode.PERFORMANCE);
+ logger.info("\nStarting to apply Execute Queries...");
+
+ workloadExecutor
+ .add(new QueryExecutor(parser, phoenixUtil, workloadExecutor.getPool(),
+ parser.getDataModels(), queryHint, exportCSV, diff ?
+ PherfConstants.RunMode.FUNCTIONAL :
+ PherfConstants.RunMode.PERFORMANCE));
+
} else {
- logger.info("\nSKIPPED: Multithreaded query set execution as -q argument not specified");
+ logger.info(
+ "\nSKIPPED: Multithreaded query set execution as -q argument not specified");
+ }
+
+ // Clean up the monitor explicitly
+ if (monitorManager != null) {
+ logger.info("Run completed. Shutting down Monitor.");
+ monitorManager.complete();
}
+
+ // Collect any final jobs
+ workloadExecutor.get();
+
} finally {
- if (workloadExec != null) {
- logger.info("Run completed. Shutting down Monitor if it was running.");
- workloadExec.shutdown();
+ if (workloadExecutor != null) {
+ logger.info("Run completed. Shutting down thread pool.");
+ workloadExecutor.shutdown();
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/PherfConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/PherfConstants.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/PherfConstants.java
index 493f5a8..e060e53 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/PherfConstants.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/PherfConstants.java
@@ -28,14 +28,13 @@ public class PherfConstants {
public static final int DEFAULT_THREAD_POOL_SIZE = 10;
public static final int DEFAULT_BATCH_SIZE = 1000;
public static final String DEFAULT_DATE_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";
- public static final String DEFAULT_FILE_PATTERN = ".*scenario.xml";
public static final String RESOURCE_SCENARIO = "/scenario";
public static final String
SCENARIO_ROOT_PATTERN =
".*" + PherfConstants.RESOURCE_SCENARIO.substring(1) + ".*";
public static final String SCHEMA_ROOT_PATTERN = ".*";
public static final String PHERF_PROPERTIES = "pherf.properties";
-// public static final String RESULT_DIR = "RESULTS";
+
public static final String EXPORT_DIR = "CSV_EXPORT";
public static final String RESULT_PREFIX = "RESULT_";
public static final String PATH_SEPARATOR = "/";
@@ -51,6 +50,7 @@ public class PherfConstants {
public static final String PHERF_SCHEMA_NAME = "PHERF";
+ // TODO MOve to properties
// log out data load per n rows
public static final int LOG_PER_NROWS = 1000000;
public static final String COMBINED_FILE_NAME = "COMBINED";
@@ -86,7 +86,9 @@ public class PherfConstants {
InputStream is = null;
try {
is = getClass().getClassLoader().getResourceAsStream(fileName);
- properties.load(is);
+ if (is != null) {
+ properties.load(is);
+ }
} finally {
if (is != null) {
is.close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java
index 25c0df1..8eb42ff 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java
@@ -26,7 +26,6 @@ import java.util.List;
@XmlRootElement(name = "datamodel")
public class DataModel {
- private String release;
private String name;
private List<Scenario> scenarios;
private List<Column> dataMappingColumns;
@@ -34,15 +33,6 @@ public class DataModel {
public DataModel() {
}
- public String getRelease() {
- return this.release;
- }
-
- @XmlAttribute()
- public void setRelease(String release) {
- this.release = release;
- }
-
public List<Scenario> getScenarios() {
return scenarios;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
index d2f113a..7de96cc 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
@@ -34,10 +34,12 @@ public class Scenario {
private int rowCount;
private Map<String, String> phoenixProperties;
private DataOverride dataOverride;
- private List<QuerySet> querySet = new ArrayList<QuerySet>();
+ private List<QuerySet> querySet = new ArrayList<>();
+ private WriteParams writeParams;
private String name;
public Scenario() {
+ writeParams = new WriteParams();
}
/**
@@ -161,6 +163,14 @@ public class Scenario {
this.name = name;
}
+ public WriteParams getWriteParams() {
+ return writeParams;
+ }
+
+ public void setWriteParams(WriteParams writeParams) {
+ this.writeParams = writeParams;
+ }
+
@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/WriteParams.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/WriteParams.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/WriteParams.java
new file mode 100644
index 0000000..04be239
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/WriteParams.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.configuration;
+
+import javax.xml.bind.annotation.XmlAttribute;
+
+public class WriteParams {
+ private int writerThreadCount;
+ private long threadSleepDuration;
+ private long batchSize;
+ private long executionDurationInMs;
+
+ public WriteParams() {
+ this.batchSize = Long.MIN_VALUE;
+ this.writerThreadCount = Integer.MIN_VALUE;
+ this.threadSleepDuration = Long.MIN_VALUE;
+ this.executionDurationInMs = Long.MAX_VALUE;
+ }
+
+ public long getThreadSleepDuration() {
+ return threadSleepDuration;
+ }
+
+ @SuppressWarnings("unused")
+ public void setThreadSleepDuration(long threadSleepDuration) {
+ this.threadSleepDuration = threadSleepDuration;
+ }
+
+ public long getBatchSize() {
+ return batchSize;
+ }
+
+ @SuppressWarnings("unused")
+ public void setBatchSize(long batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ public int getWriterThreadCount() {
+ return writerThreadCount;
+ }
+
+ @SuppressWarnings("unused")
+ public void setWriterThreadCount(int writerThreadCount) {
+ this.writerThreadCount = writerThreadCount;
+ }
+
+ @XmlAttribute()
+ public long getExecutionDurationInMs() {
+ return executionDurationInMs;
+ }
+
+ @SuppressWarnings("unused")
+ public void setExecutionDurationInMs(long executionDurationInMs) {
+ this.executionDurationInMs = executionDurationInMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java
index 9b5a9e9..393fa7e 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java
@@ -52,6 +52,24 @@ public class XMLConfigParser {
return dataModels;
}
+ public DataModel getDataModelByName(String name) {
+ for (DataModel dataModel : getDataModels()) {
+ if (dataModel.getName().equals(name)) {
+ return dataModel;
+ }
+ }
+ return null;
+ }
+
+ public Scenario getScenarioByName(String name) throws Exception {
+ for (Scenario scenario : getScenarios()) {
+ if (scenario.getName().equals(name)) {
+ return scenario;
+ }
+ }
+ return null;
+ }
+
public synchronized Collection<Path> getPaths(String strPattern) throws Exception {
if (paths != null) {
return paths;
@@ -87,7 +105,8 @@ public class XMLConfigParser {
* Unmarshall an XML data file
*
* @param file Name of File
- * @return
+ * @return {@link org.apache.phoenix.pherf.configuration.DataModel} Returns DataModel from
+ * XML configuration
* @throws JAXBException
*/
// TODO Remove static calls
@@ -151,8 +170,6 @@ public class XMLConfigParser {
}
private Collection<Path> getResources(String pattern) throws Exception {
- Collection<Path> resourceFiles = new ArrayList<Path>();
- resourceFiles = resourceList.getResourceList(pattern);
- return resourceFiles;
+ return resourceList.getResourceList(pattern);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java
index 6f97551..5b39b2b 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java
@@ -21,48 +21,54 @@ package org.apache.phoenix.pherf.jmx;
import org.apache.phoenix.pherf.PherfConstants;
import org.apache.phoenix.pherf.exception.FileLoaderRuntimeException;
import org.apache.phoenix.pherf.jmx.monitors.Monitor;
-import org.apache.phoenix.pherf.result.file.ResultFileDetails;
-import org.apache.phoenix.pherf.result.impl.CSVResultHandler;
import org.apache.phoenix.pherf.result.Result;
import org.apache.phoenix.pherf.result.ResultHandler;
+import org.apache.phoenix.pherf.result.file.ResultFileDetails;
+import org.apache.phoenix.pherf.result.impl.CSVResultHandler;
+import org.apache.phoenix.pherf.workload.Workload;
import org.apache.phoenix.util.DateUtil;
-import javax.management.*;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
- * This class starts JMX stats for the configured monitors. Monitors should be configured in MonitorDetails Enum.
+ * This class starts JMX stats for the configured monitors.
+ * Monitors should be configured in MonitorDetails Enum.
* Each stat implements {@link org.apache.phoenix.pherf.jmx.monitors.Monitor}.
*
- * For the duration of any Pherf run, when the configured {@link org.apache.phoenix.pherf.PherfConstants#MONITOR_FREQUENCY}
- * is reached a snapshot of each monitor is taken and dumped out to a log file.
+ * For the duration of any Pherf run, when the configured
+ * {@link org.apache.phoenix.pherf.PherfConstants#MONITOR_FREQUENCY} is reached a snapshot of
+ * each monitor is taken and dumped out to a log file.
*/
-public class MonitorManager implements Runnable {
+public class MonitorManager implements Workload {
// List of MonitorDetails for all the running monitors.
// TODO Move this out to config. Possible use Guice and use IOC to inject it in.
- private static final List<MonitorDetails> MONITOR_DETAILS_LIST =
+ private static final List<MonitorDetails>
+ MONITOR_DETAILS_LIST =
Arrays.asList(MonitorDetails.values());
private final ResultHandler resultHandler;
- private final long monitorFrequency;
- private AtomicLong rowCount;
- private volatile boolean shouldStop = false;
- private volatile boolean isRunning = false;
+ private final AtomicLong monitorFrequency;
+ private final AtomicLong rowCount;
+ private final AtomicBoolean shouldStop = new AtomicBoolean(false);
+ private final AtomicBoolean isRunning = new AtomicBoolean(false);
- @SuppressWarnings("unused")
- public MonitorManager() throws Exception {
+ @SuppressWarnings("unused") public MonitorManager() throws Exception {
this(PherfConstants.MONITOR_FREQUENCY);
}
/**
- *
* @param monitorFrequency Frequency at which monitor stats are written to a log file.
* @throws Exception
*/
public MonitorManager(long monitorFrequency) throws Exception {
- this.monitorFrequency = monitorFrequency;
+ this.monitorFrequency = new AtomicLong(monitorFrequency);
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
// Register all the monitors to JMX
@@ -77,74 +83,87 @@ public class MonitorManager implements Runnable {
}
}
rowCount = new AtomicLong(0);
- this.resultHandler = new CSVResultHandler(PherfConstants.MONITOR_FILE_NAME, ResultFileDetails.CSV);
+ this.resultHandler =
+ new CSVResultHandler(PherfConstants.MONITOR_FILE_NAME, ResultFileDetails.CSV);
}
- @Override
- public void run() {
- try {
- while (!shouldStop()) {
- isRunning = true;
- List rowValues = new ArrayList<String>();
- synchronized (resultHandler) {
- for (MonitorDetails monitorDetails : MONITOR_DETAILS_LIST) {
- rowValues.clear();
- try {
- StandardMBean bean = new StandardMBean(monitorDetails.getMonitor(), Monitor.class);
-
- Calendar calendar = new GregorianCalendar();
- rowValues.add(monitorDetails);
-
- rowValues.add(((Monitor) bean.getImplementation()).getStat());
- rowValues.add(DateUtil.DEFAULT_MS_DATE_FORMATTER.format(calendar.getTime()));
- Result
- result = new Result(ResultFileDetails.CSV, ResultFileDetails.CSV_MONITOR.getHeader().toString(), rowValues);
- resultHandler.write(result);
- } catch (Exception e) {
- throw new FileLoaderRuntimeException("Could not log monitor result.", e);
+ @Override public synchronized void complete() {
+ this.shouldStop.set(true);
+ }
+
+ @Override public Runnable execute() {
+ return new Runnable() {
+ @Override public void run() {
+ try {
+ while (!shouldStop()) {
+ isRunning.set(true);
+ List rowValues = new ArrayList<String>();
+ synchronized (resultHandler) {
+ for (MonitorDetails monitorDetails : MONITOR_DETAILS_LIST) {
+ rowValues.clear();
+ try {
+ StandardMBean
+ bean =
+ new StandardMBean(monitorDetails.getMonitor(),
+ Monitor.class);
+
+ Calendar calendar = new GregorianCalendar();
+ rowValues.add(monitorDetails);
+
+ rowValues.add(((Monitor) bean.getImplementation()).getStat());
+ rowValues.add(DateUtil.DEFAULT_MS_DATE_FORMATTER
+ .format(calendar.getTime()));
+ Result
+ result =
+ new Result(ResultFileDetails.CSV,
+ ResultFileDetails.CSV_MONITOR.getHeader()
+ .toString(), rowValues);
+ resultHandler.write(result);
+ } catch (Exception e) {
+ throw new FileLoaderRuntimeException(
+ "Could not log monitor result.", e);
+ }
+ rowCount.getAndIncrement();
+ }
+ try {
+ resultHandler.flush();
+ Thread.sleep(getMonitorFrequency());
+ } catch (Exception e) {
+ Thread.currentThread().interrupt();
+ e.printStackTrace();
+ }
}
- rowCount.getAndIncrement();
}
+ } finally {
try {
- resultHandler.flush();
- Thread.sleep(getMonitorFrequency());
+ isRunning.set(false);
+ if (resultHandler != null) {
+ resultHandler.close();
+ }
} catch (Exception e) {
- Thread.currentThread().interrupt();
- e.printStackTrace();
+ throw new FileLoaderRuntimeException("Could not close monitor results.", e);
}
}
}
- } finally {
- try {
- isRunning = false;
- if (resultHandler != null) {
- resultHandler.close();
- }
- } catch (Exception e) {
- throw new FileLoaderRuntimeException("Could not close monitor results.", e);
- }
- }
-
+ };
}
public long getMonitorFrequency() {
- return monitorFrequency;
- }
-
- public synchronized boolean shouldStop() {
- return shouldStop;
+ return monitorFrequency.get();
}
- public synchronized void stop() {
- this.shouldStop = true;
+ public boolean shouldStop() {
+ return shouldStop.get();
}
- public synchronized long getRowCount() {
+ // Convenience method for testing.
+ @SuppressWarnings("unused")
+ public long getRowCount() {
return rowCount.get();
}
- public synchronized boolean isRunning() {
- return isRunning;
+ public boolean isRunning() {
+ return isRunning.get();
}
/**
@@ -157,7 +176,9 @@ public class MonitorManager implements Runnable {
ResultHandler handler = null;
try {
if (resultHandler.isClosed()) {
- handler = new CSVResultHandler(PherfConstants.MONITOR_FILE_NAME, ResultFileDetails.CSV);
+ handler =
+ new CSVResultHandler(PherfConstants.MONITOR_FILE_NAME,
+ ResultFileDetails.CSV);
return handler.read();
} else {
return resultHandler.read();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/loaddata/DataLoader.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/loaddata/DataLoader.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/loaddata/DataLoader.java
deleted file mode 100644
index c521822..0000000
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/loaddata/DataLoader.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pherf.loaddata;
-
-import java.math.BigDecimal;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Types;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.phoenix.pherf.result.ResultUtil;
-import org.apache.phoenix.pherf.util.ResourceList;
-import org.apache.phoenix.pherf.util.RowCalculator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.phoenix.pherf.PherfConstants;
-import org.apache.phoenix.pherf.configuration.Column;
-import org.apache.phoenix.pherf.configuration.DataModel;
-import org.apache.phoenix.pherf.configuration.Scenario;
-import org.apache.phoenix.pherf.configuration.XMLConfigParser;
-import org.apache.phoenix.pherf.exception.PherfException;
-import org.apache.phoenix.pherf.result.DataLoadThreadTime;
-import org.apache.phoenix.pherf.result.DataLoadTimeSummary;
-import org.apache.phoenix.pherf.rules.DataValue;
-import org.apache.phoenix.pherf.rules.RulesApplier;
-import org.apache.phoenix.pherf.util.PhoenixUtil;
-
-public class DataLoader {
- private static final Logger logger = LoggerFactory.getLogger(DataLoader.class);
- private final PhoenixUtil pUtil;
- private final XMLConfigParser parser;
- private final RulesApplier rulesApplier;
- private final ResultUtil resultUtil;
- private final ExecutorService pool;
-
- private final int threadPoolSize;
- private final int batchSize;
-
- public DataLoader(XMLConfigParser parser) throws Exception {
- this(new PhoenixUtil(), parser);
- }
-
- public DataLoader(PhoenixUtil phoenixUtil, XMLConfigParser parser) throws Exception{
- this(phoenixUtil, PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES), parser);
- }
-
- /**
- * Default the writers to use up all available cores for threads.
- *
- * @param parser
- * @throws Exception
- */
- public DataLoader(PhoenixUtil phoenixUtil, Properties properties, XMLConfigParser parser) throws Exception {
- this.pUtil = phoenixUtil;
- this.parser = parser;
- this.rulesApplier = new RulesApplier(parser);
- this.resultUtil = new ResultUtil();
- int size = Integer.parseInt(properties.getProperty("pherf.default.dataloader.threadpool"));
- this.threadPoolSize = (size == 0) ? Runtime.getRuntime().availableProcessors() : size;
- this.pool = Executors.newFixedThreadPool(this.threadPoolSize);
- String bSize = properties.getProperty("pherf.default.dataloader.batchsize");
- this.batchSize = (bSize == null) ? PherfConstants.DEFAULT_BATCH_SIZE : Integer.parseInt(bSize);
- }
-
- public void execute() throws Exception {
- try {
- DataLoadTimeSummary dataLoadTimeSummary = new DataLoadTimeSummary();
- DataLoadThreadTime dataLoadThreadTime = new DataLoadThreadTime();
-
- for (Scenario scenario : getParser().getScenarios()) {
- List<Future> writeBatches = new ArrayList<Future>();
- logger.info("\nLoading " + scenario.getRowCount()
- + " rows for " + scenario.getTableName());
- long start = System.currentTimeMillis();
-
- RowCalculator rowCalculator = new RowCalculator(getThreadPoolSize(), scenario.getRowCount());
- for (int i = 0; i < getThreadPoolSize(); i++) {
- List<Column> phxMetaCols = pUtil.getColumnsFromPhoenix(
- scenario.getSchemaName(),
- scenario.getTableNameWithoutSchemaName(),
- pUtil.getConnection());
- int threadRowCount = rowCalculator.getNext();
- logger.info("Kick off thread (#" + i + ")for upsert with (" + threadRowCount + ") rows.");
- Future<Info> write = upsertData(scenario, phxMetaCols,
- scenario.getTableName(), threadRowCount, dataLoadThreadTime);
- writeBatches.add(write);
- }
-
- if (writeBatches.isEmpty()) {
- throw new PherfException(
- "Holy shit snacks! Throwing up hands in disbelief and exiting. Could not write data for some unknown reason.");
- }
-
- int sumRows = 0, sumDuration = 0;
- // Wait for all the batch threads to complete
- for (Future<Info> write : writeBatches) {
- Info writeInfo = write.get();
- sumRows += writeInfo.getRowCount();
- sumDuration += writeInfo.getDuration();
- logger.info("Executor writes complete with row count ("
- + writeInfo.getRowCount()
- + ") in Ms ("
- + writeInfo.getDuration() + ")");
- }
- logger.info("Writes completed with total row count (" + sumRows
- + ") with total time of(" + sumDuration + ") Ms");
- dataLoadTimeSummary.add(scenario.getTableName(), sumRows, (int) (System.currentTimeMillis() - start));
-
-
- // always update stats for Phoenix base tables
- updatePhoenixStats(scenario.getTableName());
- }
- resultUtil.write(dataLoadTimeSummary);
- resultUtil.write(dataLoadThreadTime);
-
- } finally {
- pool.shutdown();
- }
- }
-
- /**
- * TODO Move this method to PhoenixUtil
- * Update Phoenix table stats
- *
- * @param tableName
- * @throws Exception
- */
- public void updatePhoenixStats(String tableName) throws Exception {
- logger.info("Updating stats for " + tableName);
- pUtil.executeStatement("UPDATE STATISTICS " + tableName);
- }
-
- public Future<Info> upsertData(final Scenario scenario,
- final List<Column> columns, final String tableName,
- final int rowCount, final DataLoadThreadTime dataLoadThreadTime) {
- Future<Info> future = pool.submit(new Callable<Info>() {
- @Override
- public Info call() throws Exception {
- int rowsCreated = 0;
- Info info = null;
- long start = 0, duration = 0, totalDuration = 0;
- SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Connection connection = null;
- try {
- connection = pUtil.getConnection();
- long logStartTime = System.currentTimeMillis();
- for (int i = 0; i < rowCount; i++) {
- String sql = buildSql(columns, tableName);
- PreparedStatement stmt = connection
- .prepareStatement(sql);
- stmt = buildStatement(scenario, columns, stmt, simpleDateFormat);
- start = System.currentTimeMillis();
- rowsCreated += stmt.executeUpdate();
- stmt.close();
- if ((i % getBatchSize()) == 0) {
- connection.commit();
- duration = System.currentTimeMillis() - start;
- logger.info("Committed Batch. Total " + tableName + " rows for this thread (" + this.hashCode() + ") in ("
- + duration + ") Ms");
-
- if (i % PherfConstants.LOG_PER_NROWS == 0 && i != 0) {
- dataLoadThreadTime.add(tableName, Thread.currentThread().getName(), i, System.currentTimeMillis() - logStartTime);
- logStartTime = System.currentTimeMillis();
- }
- }
- }
- } finally {
- if (connection != null) {
- try {
- connection.commit();
- duration = System.currentTimeMillis() - start;
- logger.info("Committed Final Batch. Duration (" + duration + ") Ms");
- connection.close();
- } catch (SQLException e) {
- // Swallow since we are closing anyway
- e.printStackTrace();
- }
- }
- }
- totalDuration = System.currentTimeMillis() - start;
- return new Info(totalDuration, rowsCreated);
- }
- });
- return future;
- }
-
- private PreparedStatement buildStatement(Scenario scenario,
- List<Column> columns, PreparedStatement statement, SimpleDateFormat simpleDateFormat) throws Exception {
- int count = 1;
- for (Column column : columns) {
-
- DataValue dataValue = getRulesApplier().getDataForRule(scenario,
- column);
- switch (column.getType()) {
- case VARCHAR:
- if (dataValue.getValue().equals("")) {
- statement.setNull(count, Types.VARCHAR);
- } else {
- statement.setString(count, dataValue.getValue());
- }
- break;
- case CHAR:
- if (dataValue.getValue().equals("")) {
- statement.setNull(count, Types.CHAR);
- } else {
- statement.setString(count, dataValue.getValue());
- }
- break;
- case DECIMAL:
- if (dataValue.getValue().equals("")) {
- statement.setNull(count, Types.DECIMAL);
- } else {
- statement.setBigDecimal(count,
- new BigDecimal(dataValue.getValue()));
- }
- break;
- case INTEGER:
- if (dataValue.getValue().equals("")) {
- statement.setNull(count, Types.INTEGER);
- } else {
- statement.setInt(count,
- Integer.parseInt(dataValue.getValue()));
- }
- break;
- case DATE:
- if (dataValue.getValue().equals("")) {
- statement.setNull(count, Types.DATE);
- } else {
- Date date = new java.sql.Date(simpleDateFormat.parse(dataValue.getValue()).getTime());
- statement.setDate(count, date);
- }
- break;
- default:
- break;
- }
- count++;
- }
- return statement;
- }
-
- private String buildSql(final List<Column> columns, final String tableName) {
- StringBuilder builder = new StringBuilder();
- builder.append("upsert into ");
- builder.append(tableName);
- builder.append(" (");
- int count = 1;
- for (Column column : columns) {
- builder.append(column.getName());
- if (count < columns.size()) {
- builder.append(",");
- } else {
- builder.append(")");
- }
- count++;
- }
- builder.append(" VALUES (");
- for (int i = 0; i < columns.size(); i++) {
- if (i < columns.size() - 1) {
- builder.append("?,");
- } else {
- builder.append("?)");
- }
- }
- return builder.toString();
- }
-
- public XMLConfigParser getParser() {
- return parser;
- }
-
- public RulesApplier getRulesApplier() {
- return rulesApplier;
- }
-
- public int getBatchSize() {
- return batchSize;
- }
-
- public int getThreadPoolSize() {
- return threadPoolSize;
- }
-
- private class Info {
-
- private final int rowCount;
- private final long duration;
-
- public Info(long duration, int rows) {
- this(0, 0, 0, duration, rows);
- }
-
- public Info(int regionSize, int completedIterations, int timesSeen,
- long duration, int rows) {
- this.duration = duration;
- this.rowCount = rows;
- }
-
- public long getDuration() {
- return duration;
- }
-
- public int getRowCount() {
- return rowCount;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java
index 23dcdd5..e5553cc 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java
@@ -18,61 +18,68 @@
package org.apache.phoenix.pherf.result;
+import org.apache.phoenix.pherf.PherfConstants;
+
import java.util.ArrayList;
import java.util.List;
-import org.apache.phoenix.pherf.PherfConstants;
-
public class DataLoadThreadTime {
- private List<WriteThreadTime> threadTime = new ArrayList<WriteThreadTime>();
+ private List<WriteThreadTime> threadTime = new ArrayList<WriteThreadTime>();
+
+ public List<WriteThreadTime> getThreadTime() {
+ return threadTime;
+ }
- public List<WriteThreadTime> getThreadTime() {
- return threadTime;
- }
+ public void add(String tableName, String threadName, long rowsUpserted,
+ long timeInMsPerMillionRows) {
+ threadTime.add(new WriteThreadTime(tableName, threadName, rowsUpserted,
+ timeInMsPerMillionRows));
+ }
- public void add(String tableName, String threadName, int rowsUpserted, long timeInMsPerMillionRows) {
- threadTime.add(new WriteThreadTime(tableName, threadName, rowsUpserted, timeInMsPerMillionRows));
- }
-
- public String getCsvTitle() {
- return "TABLE_NAME,THREAD_NAME,ROWS_UPSERTED,TIME_IN_MS_PER_" + PherfConstants.LOG_PER_NROWS + "_ROWS\n";
- }
+ public String getCsvTitle() {
+ return "TABLE_NAME,THREAD_NAME,ROWS_UPSERTED,TIME_IN_MS_PER_" + PherfConstants.LOG_PER_NROWS
+ + "_ROWS\n";
+ }
}
class WriteThreadTime {
- private String tableName;
- private String threadName;
- private int rowsUpserted;
- private long timeInMsPerMillionRows;
-
- public WriteThreadTime(String tableName, String threadName, int rowsUpserted, long timeInMsPerMillionRows) {
- this.tableName = tableName;
- this.threadName = threadName;
- this.rowsUpserted = rowsUpserted;
- this.timeInMsPerMillionRows = timeInMsPerMillionRows;
- }
-
- public String getTableName() {
- return tableName;
- }
- public String getThreadName() {
- return threadName;
- }
- public long getTimeInMsPerMillionRows() {
- return timeInMsPerMillionRows;
- }
+ private String tableName;
+ private String threadName;
+ private long rowsUpserted;
+ private long timeInMsPerMillionRows;
+
+ public WriteThreadTime(String tableName, String threadName, long rowsUpserted,
+ long timeInMsPerMillionRows) {
+ this.tableName = tableName;
+ this.threadName = threadName;
+ this.rowsUpserted = rowsUpserted;
+ this.timeInMsPerMillionRows = timeInMsPerMillionRows;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getThreadName() {
+ return threadName;
+ }
+
+ public long getTimeInMsPerMillionRows() {
+ return timeInMsPerMillionRows;
+ }
- public List<ResultValue> getCsvRepresentation(ResultUtil util) {
+ public List<ResultValue> getCsvRepresentation(ResultUtil util) {
List<ResultValue> rowValues = new ArrayList<>();
rowValues.add(new ResultValue(util.convertNull(getTableName())));
rowValues.add(new ResultValue(util.convertNull(getThreadName())));
rowValues.add(new ResultValue(util.convertNull(String.valueOf(getRowsUpserted()))));
- rowValues.add(new ResultValue(util.convertNull(String.valueOf(getTimeInMsPerMillionRows()))));
+ rowValues.add(new ResultValue(
+ util.convertNull(String.valueOf(getTimeInMsPerMillionRows()))));
return rowValues;
- }
+ }
- public int getRowsUpserted() {
- return rowsUpserted;
- }
+ public long getRowsUpserted() {
+ return rowsUpserted;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java
index bb23f16..0ff5c59 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java
@@ -22,29 +22,29 @@ import java.util.ArrayList;
import java.util.List;
public class DataLoadTimeSummary {
- private List<TableLoadTime> tableLoadTime = new ArrayList<TableLoadTime>();
+ private List<TableLoadTime> tableLoadTime = new ArrayList<TableLoadTime>();
- public List<TableLoadTime> getTableLoadTime() {
- return tableLoadTime;
- }
-
- public void add(String tableName, int rowCount, int durationInMs) {
- tableLoadTime.add(new TableLoadTime(tableName, rowCount, durationInMs));
- }
+ public List<TableLoadTime> getTableLoadTime() {
+ return tableLoadTime;
+ }
+
+ public void add(String tableName, int rowCount, int durationInMs) {
+ tableLoadTime.add(new TableLoadTime(tableName, rowCount, durationInMs));
+ }
}
class TableLoadTime {
- private int durationInMs;
- private String tableName;
- private int rowCount;
+ private int durationInMs;
+ private String tableName;
+ private int rowCount;
+
+ public TableLoadTime(String tableName, int rowCount, int durationInMs) {
+ this.tableName = tableName;
+ this.rowCount = rowCount;
+ this.durationInMs = durationInMs;
+ }
- public TableLoadTime(String tableName, int rowCount, int durationInMs) {
- this.tableName = tableName;
- this.rowCount = rowCount;
- this.durationInMs = durationInMs;
- }
-
- public List<ResultValue> getCsvRepresentation(ResultUtil util) {
+ public List<ResultValue> getCsvRepresentation(ResultUtil util) {
List<ResultValue> rowValues = new ArrayList<>();
rowValues.add(new ResultValue(util.convertNull(getTableName())));
rowValues.add(new ResultValue(util.convertNull(String.valueOf(getRowCount()))));
@@ -53,15 +53,15 @@ class TableLoadTime {
return rowValues;
}
- public int getDurationInMs() {
- return durationInMs;
- }
+ public int getDurationInMs() {
+ return durationInMs;
+ }
- public String getTableName() {
- return tableName;
- }
+ public String getTableName() {
+ return tableName;
+ }
- public int getRowCount() {
- return rowCount;
- }
+ public int getRowCount() {
+ return rowCount;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java
index 72920fa..5c07ffe 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java
@@ -18,61 +18,57 @@
package org.apache.phoenix.pherf.result;
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.phoenix.pherf.configuration.DataModel;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
-import org.apache.phoenix.pherf.configuration.DataModel;
+@XmlRootElement(namespace = "org.apache.phoenix.pherf.result") public class DataModelResult
+ extends DataModel {
+ private List<ScenarioResult> scenarioResult = new ArrayList<ScenarioResult>();
+ private String zookeeper;
-@XmlRootElement(namespace = "org.apache.phoenix.pherf.result")
-public class DataModelResult extends DataModel {
- private List<ScenarioResult> scenarioResult = new ArrayList<ScenarioResult>();
- private String zookeeper;
+ public List<ScenarioResult> getScenarioResult() {
+ return scenarioResult;
+ }
- public List<ScenarioResult> getScenarioResult() {
- return scenarioResult;
- }
+ @SuppressWarnings("unused") public void setScenarioResult(List<ScenarioResult> scenarioResult) {
+ this.scenarioResult = scenarioResult;
+ }
- @SuppressWarnings("unused")
- public void setScenarioResult(List<ScenarioResult> scenarioResult) {
- this.scenarioResult = scenarioResult;
- }
-
- public DataModelResult() {
- }
+ public DataModelResult() {
+ }
- private DataModelResult(String name, String release, String zookeeper) {
+ private DataModelResult(String name, String zookeeper) {
this.setName(name);
- this.setRelease(release);
this.zookeeper = zookeeper;
}
/**
* Copy constructor
- *
+ *
* @param dataModelResult
*/
public DataModelResult(DataModelResult dataModelResult) {
- this(dataModelResult.getName(), dataModelResult.getRelease(), dataModelResult.getZookeeper());
+ this(dataModelResult.getName(), dataModelResult.getZookeeper());
this.scenarioResult = dataModelResult.getScenarioResult();
}
-
- public DataModelResult(DataModel dataModel, String zookeeper) {
- this(dataModel.getName(), dataModel.getRelease(), zookeeper);
- }
-
- public DataModelResult(DataModel dataModel) {
- this(dataModel, null);
- }
- @XmlAttribute()
- public String getZookeeper() {
- return zookeeper;
- }
+ public DataModelResult(DataModel dataModel, String zookeeper) {
+ this(dataModel.getName(), zookeeper);
+ }
+
+ public DataModelResult(DataModel dataModel) {
+ this(dataModel, null);
+ }
+
+ @XmlAttribute() public String getZookeeper() {
+ return zookeeper;
+ }
- public void setZookeeper(String zookeeper) {
- this.zookeeper = zookeeper;
- }
+ public void setZookeeper(String zookeeper) {
+ this.zookeeper = zookeeper;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
index b5fd082..1a682da 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
@@ -18,14 +18,14 @@
package org.apache.phoenix.pherf.result;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
import org.apache.phoenix.pherf.PherfConstants.RunMode;
import org.apache.phoenix.pherf.configuration.Query;
import org.apache.phoenix.util.DateUtil;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
public class QueryResult extends Query {
private List<ThreadTime> threadTimes = new ArrayList<ThreadTime>();
@@ -47,8 +47,7 @@ public class QueryResult extends Query {
this.setId(query.getId());
}
- @SuppressWarnings("unused")
- public QueryResult() {
+ @SuppressWarnings("unused") public QueryResult() {
}
public Date getStartTime() {
@@ -136,8 +135,8 @@ public class QueryResult extends Query {
}
private String getStartTimeText() {
- return (null == this.getStartTime())
- ? ""
- : DateUtil.DEFAULT_MS_DATE_FORMATTER.format(this.getStartTime());
+ return (null == this.getStartTime()) ?
+ "" :
+ DateUtil.DEFAULT_MS_DATE_FORMATTER.format(this.getStartTime());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java
index 9010c21..c2be5a3 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java
@@ -18,31 +18,31 @@
package org.apache.phoenix.pherf.result;
+import org.apache.phoenix.pherf.configuration.QuerySet;
+
import java.util.ArrayList;
import java.util.List;
-import org.apache.phoenix.pherf.configuration.QuerySet;
-
public class QuerySetResult extends QuerySet {
-
- private List<QueryResult> queryResults = new ArrayList<QueryResult>();
-
- public QuerySetResult(QuerySet querySet) {
- this.setConcurrency(querySet.getConcurrency());
- this.setNumberOfExecutions(querySet.getNumberOfExecutions());
- this.setExecutionDurationInMs(querySet.getExecutionDurationInMs());
- this.setExecutionType(querySet.getExecutionType());
- }
-
- public QuerySetResult() {
- }
-
- public List<QueryResult> getQueryResults() {
- return queryResults;
- }
+
+ private List<QueryResult> queryResults = new ArrayList<>();
+
+ public QuerySetResult(QuerySet querySet) {
+ this.setConcurrency(querySet.getConcurrency());
+ this.setNumberOfExecutions(querySet.getNumberOfExecutions());
+ this.setExecutionDurationInMs(querySet.getExecutionDurationInMs());
+ this.setExecutionType(querySet.getExecutionType());
+ }
+
+ public QuerySetResult() {
+ }
+
+ public List<QueryResult> getQueryResults() {
+ return queryResults;
+ }
@SuppressWarnings("unused")
public void setQueryResults(List<QueryResult> queryResults) {
- this.queryResults = queryResults;
- }
+ this.queryResults = queryResults;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java
index 4ccdd2b..158ed11 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java
@@ -18,10 +18,10 @@
package org.apache.phoenix.pherf.result;
-import java.util.List;
-
import org.apache.phoenix.pherf.result.file.ResultFileDetails;
+import java.util.List;
+
/**
* Common container for Pherf results.
*/
@@ -33,10 +33,9 @@ public class Result {
private final String header;
/**
- *
- * @param type {@link org.apache.phoenix.pherf.result.file.ResultFileDetails} Currently unused, but gives metadata about the
- * contents of the result.
- * @param header Used for CSV, otherwise pass null. For CSV pass comma separated string of header fields.
+ * @param type {@link org.apache.phoenix.pherf.result.file.ResultFileDetails} Currently unused, but gives metadata about the
+ * contents of the result.
+ * @param header Used for CSV, otherwise pass null. For CSV pass comma separated string of header fields.
* @param messageValues List<{@link ResultValue} All fields combined represent the data
* for a row to be written.
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java
index f650cbb..5b71300 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java
@@ -29,9 +29,14 @@ import java.util.List;
*/
public interface ResultHandler {
public void write(Result result) throws Exception;
+
public void flush() throws Exception;
+
public void close() throws Exception;
+
public List<Result> read() throws Exception;
+
public boolean isClosed();
+
public ResultFileDetails getResultFileDetails();
}