You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by mu...@apache.org on 2015/12/18 23:29:16 UTC
phoenix git commit: PHOENIX-2524 Fixes for pherf to run with
queryserver (elserj)
Repository: phoenix
Updated Branches:
refs/heads/master b38989dc7 -> 135b978ae
PHOENIX-2524 Fixes for pherf to run with queryserver (elserj)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/135b978a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/135b978a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/135b978a
Branch: refs/heads/master
Commit: 135b978aea1bb5cf0053698583a2002e4452e5b0
Parents: b38989d
Author: Mujtaba <mu...@apache.org>
Authored: Fri Dec 18 14:29:00 2015 -0800
Committer: Mujtaba <mu...@apache.org>
Committed: Fri Dec 18 14:29:00 2015 -0800
----------------------------------------------------------------------
.../src/build/components/all-common-jars.xml | 8 +++
.../java/org/apache/phoenix/pherf/Pherf.java | 19 ++++++-
.../apache/phoenix/pherf/result/ResultUtil.java | 13 ++++-
.../phoenix/pherf/result/file/Header.java | 1 +
.../pherf/result/file/ResultFileDetails.java | 1 +
.../apache/phoenix/pherf/util/PhoenixUtil.java | 57 ++++++++++++++++----
.../phoenix/pherf/workload/WriteWorkload.java | 33 +++++++-----
7 files changed, 105 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/135b978a/phoenix-assembly/src/build/components/all-common-jars.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/src/build/components/all-common-jars.xml b/phoenix-assembly/src/build/components/all-common-jars.xml
index bed9f25..960c3c9 100644
--- a/phoenix-assembly/src/build/components/all-common-jars.xml
+++ b/phoenix-assembly/src/build/components/all-common-jars.xml
@@ -104,5 +104,13 @@
</excludes>
<fileMode>0644</fileMode>
</fileSet>
+ <fileSet>
+ <directory>${project.basedir}/../phoenix-pherf/target/</directory>
+ <outputDirectory>lib</outputDirectory>
+ <includes>
+ <include>phoenix-*.jar</include>
+ </includes>
+ <fileMode>0644</fileMode>
+ </fileSet>
</fileSets>
</component>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/135b978a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
index eaf199a..b84183b 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
@@ -89,6 +89,8 @@ public class Pherf {
options.addOption("label", true, "Label a run. Result file name will be suffixed with specified label");
options.addOption("compare", true, "Specify labeled run(s) to compare");
options.addOption("useAverageCompareType", false, "Compare results with Average query time instead of default is Minimum query time.");
+ options.addOption("t", "thin", false, "Use the Phoenix Thin Driver");
+ options.addOption("s", "server", true, "The URL for the Phoenix QueryServer");
}
private final String zookeeper;
@@ -109,6 +111,8 @@ public class Pherf {
private final String label;
private final String compareResults;
private final CompareType compareType;
+ private final boolean thinDriver;
+ private final String queryServerUrl;
public Pherf(String[] args) throws Exception {
CommandLineParser parser = new PosixParser();
@@ -155,14 +159,27 @@ public class Pherf {
label = command.getOptionValue("label", null);
compareResults = command.getOptionValue("compare", null);
compareType = command.hasOption("useAverageCompareType") ? CompareType.AVERAGE : CompareType.MINIMUM;
+ thinDriver = command.hasOption("thin");
+ if (thinDriver) {
+ queryServerUrl = command.getOptionValue("server", "http://localhost:8765");
+ } else {
+ queryServerUrl = null;
+ }
if ((command.hasOption("h") || (args == null || args.length == 0)) && !command
.hasOption("listFiles")) {
hf.printHelp("Pherf", options);
System.exit(1);
}
- PhoenixUtil.setZookeeper(zookeeper);
PhoenixUtil.setRowCountOverride(rowCountOverride);
+ if (!thinDriver) {
+ logger.info("Using thick driver with ZooKeepers '{}'", zookeeper);
+ PhoenixUtil.setZookeeper(zookeeper);
+ } else {
+ logger.info("Using thin driver with PQS '{}'", queryServerUrl);
+ // Enables the thin-driver and sets the PQS URL
+ PhoenixUtil.useThinDriver(queryServerUrl);
+ }
ResultUtil.setFileSuffix(label);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/135b978a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
index 92eb80a..0c2a7b8 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
@@ -84,7 +84,12 @@ public class ResultUtil {
ensureBaseResultDirExists();
CSVResultHandler writer = null;
- ResultFileDetails resultFileDetails = ResultFileDetails.CSV_AGGREGATE_DATA_LOAD;
+ ResultFileDetails resultFileDetails;
+ if (PhoenixUtil.isThinDriver()) {
+ resultFileDetails = ResultFileDetails.CSV_THIN_AGGREGATE_DATA_LOAD;
+ } else {
+ resultFileDetails = ResultFileDetails.CSV_AGGREGATE_DATA_LOAD;
+ }
try {
writer = new CSVFileResultHandler();
writer.setResultFileDetails(resultFileDetails);
@@ -92,7 +97,11 @@ public class ResultUtil {
for (TableLoadTime loadTime : dataLoadTime.getTableLoadTime()) {
List<ResultValue> rowValues = new ArrayList<>();
- rowValues.add(new ResultValue(PhoenixUtil.getZookeeper()));
+ if (PhoenixUtil.isThinDriver()) {
+ rowValues.add(new ResultValue(PhoenixUtil.getQueryServerUrl()));
+ } else {
+ rowValues.add(new ResultValue(PhoenixUtil.getZookeeper()));
+ }
rowValues.addAll(loadTime.getCsvRepresentation(this));
Result
result =
http://git-wip-us.apache.org/repos/asf/phoenix/blob/135b978a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
index 066fa7a..7d09f68 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
@@ -28,6 +28,7 @@ public enum Header {
DETAILED_PERFORMANCE(DETAILED_BASE + ",RESULT_ROW_COUNT,RUN_TIME_MS"),
DETAILED_FUNCTIONAL(DETAILED_BASE + ",DIFF_STATUS,EXPLAIN_PLAN"),
AGGREGATE_DATA_LOAD("ZK,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"),
+ THIN_AGGREGATE_DATA_LOAD("QUERYSERVER,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"),
MONITOR("STAT_NAME,STAT_VALUE,TIME_STAMP");
private String header;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/135b978a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java
index a85f830..51aa407 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java
@@ -24,6 +24,7 @@ public enum ResultFileDetails {
CSV_DETAILED_PERFORMANCE(Header.DETAILED_PERFORMANCE, Extension.DETAILED_CSV),
CSV_DETAILED_FUNCTIONAL(Header.DETAILED_FUNCTIONAL, Extension.DETAILED_CSV),
CSV_AGGREGATE_DATA_LOAD(Header.AGGREGATE_DATA_LOAD, Extension.CSV),
+ CSV_THIN_AGGREGATE_DATA_LOAD(Header.THIN_AGGREGATE_DATA_LOAD, Extension.CSV),
CSV_MONITOR(Header.MONITOR, Extension.CSV),
XML(Header.EMPTY, Extension.XML),
IMAGE(Header.EMPTY, Extension.VISUALIZATION);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/135b978a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
index 064c604..b778833 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
@@ -28,6 +28,7 @@ import java.sql.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Properties;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
@@ -39,6 +40,8 @@ public class PhoenixUtil {
private static int rowCountOverride = 0;
private boolean testEnabled;
private static PhoenixUtil instance;
+ private static boolean useThinDriver;
+ private static String queryServerUrl;
private PhoenixUtil() {
this(false);
@@ -57,6 +60,19 @@ public class PhoenixUtil {
return instance;
}
+ public static void useThinDriver(String queryServerUrl) {
+ PhoenixUtil.useThinDriver = true;
+ PhoenixUtil.queryServerUrl = Objects.requireNonNull(queryServerUrl);
+ }
+
+ public static String getQueryServerUrl() {
+ return PhoenixUtil.queryServerUrl;
+ }
+
+ public static boolean isThinDriver() {
+ return PhoenixUtil.useThinDriver;
+ }
+
public Connection getConnection() throws Exception {
return getConnection(null);
}
@@ -66,17 +82,31 @@ public class PhoenixUtil {
}
private Connection getConnection(String tenantId, boolean testEnabled) throws Exception {
- if (null == zookeeper) {
- throw new IllegalArgumentException(
- "Zookeeper must be set before initializing connection!");
- }
- Properties props = new Properties();
- if (null != tenantId) {
- props.setProperty("TenantId", tenantId);
- logger.debug("\nSetting tenantId to " + tenantId);
+ if (useThinDriver) {
+ if (null == queryServerUrl) {
+ throw new IllegalArgumentException("QueryServer URL must be set before" +
+ " initializing connection");
+ }
+ Properties props = new Properties();
+ if (null != tenantId) {
+ props.setProperty("TenantId", tenantId);
+ logger.debug("\nSetting tenantId to " + tenantId);
+ }
+ String url = "jdbc:phoenix:thin:url=" + queryServerUrl + ";serialization=PROTOBUF";
+ return DriverManager.getConnection(url, props);
+ } else {
+ if (null == zookeeper) {
+ throw new IllegalArgumentException(
+ "Zookeeper must be set before initializing connection!");
+ }
+ Properties props = new Properties();
+ if (null != tenantId) {
+ props.setProperty("TenantId", tenantId);
+ logger.debug("\nSetting tenantId to " + tenantId);
+ }
+ String url = "jdbc:phoenix:" + zookeeper + (testEnabled ? ";test=true" : "");
+ return DriverManager.getConnection(url, props);
}
- String url = "jdbc:phoenix:" + zookeeper + (testEnabled ? ";test=true" : "");
- return DriverManager.getConnection(url, props);
}
public boolean executeStatement(String sql, Scenario scenario) throws Exception {
@@ -278,7 +308,12 @@ public class PhoenixUtil {
public static void setZookeeper(String zookeeper) {
logger.info("Setting zookeeper: " + zookeeper);
- PhoenixUtil.zookeeper = zookeeper;
+ useThickDriver(zookeeper);
+ }
+
+ public static void useThickDriver(String zookeeper) {
+ PhoenixUtil.useThinDriver = false;
+ PhoenixUtil.zookeeper = Objects.requireNonNull(zookeeper);
}
public static int getRowCountOverride() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/135b978a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
index 7c73e22..a35e6e8 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
@@ -168,7 +168,7 @@ public class WriteWorkload implements Workload {
// Execute any Scenario DDL before running workload
pUtil.executeScenarioDdl(scenario);
- List<Future> writeBatches = getBatches(dataLoadThreadTime, scenario);
+ List<Future<Info>> writeBatches = getBatches(dataLoadThreadTime, scenario);
waitForBatches(dataLoadTimeSummary, scenario, start, writeBatches);
@@ -182,12 +182,12 @@ public class WriteWorkload implements Workload {
}
}
- private List<Future> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario)
+ private List<Future<Info>> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario)
throws Exception {
RowCalculator
rowCalculator =
new RowCalculator(getThreadPoolSize(), scenario.getRowCount());
- List<Future> writeBatches = new ArrayList<>();
+ List<Future<Info>> writeBatches = new ArrayList<>();
for (int i = 0; i < getThreadPoolSize(); i++) {
List<Column>
@@ -212,7 +212,7 @@ public class WriteWorkload implements Workload {
}
private void waitForBatches(DataLoadTimeSummary dataLoadTimeSummary, Scenario scenario,
- long start, List<Future> writeBatches)
+ long start, List<Future<Info>> writeBatches)
throws InterruptedException, java.util.concurrent.ExecutionException {
int sumRows = 0, sumDuration = 0;
// Wait for all the batch threads to complete
@@ -223,10 +223,12 @@ public class WriteWorkload implements Workload {
logger.info("Executor (" + this.hashCode() + ") writes complete with row count ("
+ writeInfo.getRowCount() + ") in Ms (" + writeInfo.getDuration() + ")");
}
- logger.info("Writes completed with total row count (" + sumRows + ") with total time of("
- + sumDuration + ") Ms");
+ long testDuration = System.currentTimeMillis() - start;
+ logger.info("Writes completed with total row count (" + sumRows
+ + ") with total elapsed time of (" + testDuration
+ + ") ms and total CPU execution time of (" + sumDuration + ") ms");
dataLoadTimeSummary
- .add(scenario.getTableName(), sumRows, (int) (System.currentTimeMillis() - start));
+ .add(scenario.getTableName(), sumRows, (int) testDuration);
}
public Future<Info> upsertData(final Scenario scenario, final List<Column> columns,
@@ -235,9 +237,10 @@ public class WriteWorkload implements Workload {
Future<Info> future = pool.submit(new Callable<Info>() {
@Override public Info call() throws Exception {
int rowsCreated = 0;
- long start = 0, duration, totalDuration;
+ long start = 0, last = 0, duration, totalDuration;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Connection connection = null;
+ PreparedStatement stmt = null;
try {
connection = pUtil.getConnection(scenario.getTenantId());
long logStartTime = System.currentTimeMillis();
@@ -247,17 +250,16 @@ public class WriteWorkload implements Workload {
Long.MAX_VALUE :
WriteWorkload.this.writeParams.getExecutionDurationInMs();
+ last = start = System.currentTimeMillis();
+ String sql = buildSql(columns, tableName);
+ stmt = connection.prepareStatement(sql);
for (long i = rowCount; (i > 0) && ((System.currentTimeMillis() - logStartTime)
< maxDuration); i--) {
- String sql = buildSql(columns, tableName);
- PreparedStatement stmt = connection.prepareStatement(sql);
stmt = buildStatement(scenario, columns, stmt, simpleDateFormat);
- start = System.currentTimeMillis();
rowsCreated += stmt.executeUpdate();
- stmt.close();
if ((i % getBatchSize()) == 0) {
connection.commit();
- duration = System.currentTimeMillis() - start;
+ duration = System.currentTimeMillis() - last;
logger.info("Writer (" + Thread.currentThread().getName()
+ ") committed Batch. Total " + getBatchSize()
+ " rows for this thread (" + this.hashCode() + ") in ("
@@ -272,9 +274,14 @@ public class WriteWorkload implements Workload {
// Pause for throttling if configured to do so
Thread.sleep(threadSleepDuration);
+ // Re-compute the start time for the next batch
+ last = System.currentTimeMillis();
}
}
} finally {
+ if (stmt != null) {
+ stmt.close();
+ }
if (connection != null) {
try {
connection.commit();