You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/11/04 22:13:34 UTC
[07/50] [abbrv] phoenix git commit: PHOENIX-3412 Used the batch JDBC
APIs in pherf.
PHOENIX-3412 Used the batch JDBC APIs in pherf.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d7aea492
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d7aea492
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d7aea492
Branch: refs/heads/encodecolumns2
Commit: d7aea492984c72ac77be7dd1305b79e03347ea7a
Parents: 7f5d79a
Author: Josh Elser <el...@apache.org>
Authored: Thu Mar 24 21:48:30 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 27 14:35:10 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/phoenix/pherf/Pherf.java | 6 +++
.../phoenix/pherf/workload/WriteWorkload.java | 49 ++++++++++++++++++--
2 files changed, 51 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7aea492/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
index 154d6ff..43061e0 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
@@ -91,6 +91,7 @@ public class Pherf {
options.addOption("useAverageCompareType", false, "Compare results with Average query time instead of default is Minimum query time.");
options.addOption("t", "thin", false, "Use the Phoenix Thin Driver");
options.addOption("s", "server", true, "The URL for the Phoenix QueryServer");
+ options.addOption("b", "batchApi", false, "Use JDBC Batch API for writes");
}
private final String zookeeper;
@@ -166,6 +167,11 @@ public class Pherf {
queryServerUrl = null;
}
+ if (command.hasOption('b')) {
+ // If the '-b' option was provided, set the system property for WriteWorkload to pick up.
+ System.setProperty(WriteWorkload.USE_BATCH_API_PROPERTY, Boolean.TRUE.toString());
+ }
+
if ((command.hasOption("h") || (args == null || args.length == 0)) && !command
.hasOption("listFiles")) {
hf.printHelp("Pherf", options);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7aea492/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
index e536eb9..69d35cc 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
@@ -52,6 +52,9 @@ import org.slf4j.LoggerFactory;
public class WriteWorkload implements Workload {
private static final Logger logger = LoggerFactory.getLogger(WriteWorkload.class);
+
+ public static final String USE_BATCH_API_PROPERTY = "pherf.default.dataloader.batchApi";
+
private final PhoenixUtil pUtil;
private final XMLConfigParser parser;
private final RulesApplier rulesApplier;
@@ -64,6 +67,7 @@ public class WriteWorkload implements Workload {
private final int threadPoolSize;
private final int batchSize;
private final GeneratePhoenixStats generateStatistics;
+ private final boolean useBatchApi;
public WriteWorkload(XMLConfigParser parser) throws Exception {
this(PhoenixUtil.create(), parser, GeneratePhoenixStats.NO);
@@ -119,6 +123,9 @@ public class WriteWorkload implements Workload {
int size = Integer.parseInt(properties.getProperty("pherf.default.dataloader.threadpool"));
+ // Should addBatch/executeBatch be used? Default: false
+ this.useBatchApi = Boolean.getBoolean(USE_BATCH_API_PROPERTY);
+
this.threadPoolSize = (size == 0) ? Runtime.getRuntime().availableProcessors() : size;
// TODO Move pool management up to WorkloadExecutor
@@ -201,7 +208,7 @@ public class WriteWorkload implements Workload {
Future<Info>
write =
upsertData(scenario, phxMetaCols, scenario.getTableName(), threadRowCount,
- dataLoadThreadTime);
+ dataLoadThreadTime, this.useBatchApi);
writeBatches.add(write);
}
if (writeBatches.isEmpty()) {
@@ -234,7 +241,7 @@ public class WriteWorkload implements Workload {
public Future<Info> upsertData(final Scenario scenario, final List<Column> columns,
final String tableName, final int rowCount,
- final DataLoadThreadTime dataLoadThreadTime) {
+ final DataLoadThreadTime dataLoadThreadTime, final boolean useBatchApi) {
Future<Info> future = pool.submit(new Callable<Info>() {
@Override public Info call() throws Exception {
int rowsCreated = 0;
@@ -257,8 +264,25 @@ public class WriteWorkload implements Workload {
for (long i = rowCount; (i > 0) && ((System.currentTimeMillis() - logStartTime)
< maxDuration); i--) {
stmt = buildStatement(scenario, columns, stmt, simpleDateFormat);
- rowsCreated += stmt.executeUpdate();
+ if (useBatchApi) {
+ stmt.addBatch();
+ } else {
+ rowsCreated += stmt.executeUpdate();
+ }
if ((i % getBatchSize()) == 0) {
+ if (useBatchApi) {
+ int[] results = stmt.executeBatch();
+ for (int x = 0; x < results.length; x++) {
+ int result = results[x];
+ if (result < 1) {
+ final String msg =
+ "Failed to write update in batch (update count="
+ + result + ")";
+ throw new RuntimeException(msg);
+ }
+ rowsCreated += result;
+ }
+ }
connection.commit();
duration = System.currentTimeMillis() - last;
logger.info("Writer (" + Thread.currentThread().getName()
@@ -280,10 +304,27 @@ public class WriteWorkload implements Workload {
}
}
} finally {
- if (stmt != null) {
+ // Need to keep the statement open to send the remaining batch of updates
+ if (!useBatchApi && stmt != null) {
stmt.close();
}
if (connection != null) {
+ if (useBatchApi && stmt != null) {
+ int[] results = stmt.executeBatch();
+ for (int x = 0; x < results.length; x++) {
+ int result = results[x];
+ if (result < 1) {
+ final String msg =
+ "Failed to write update in batch (update count="
+ + result + ")";
+ throw new RuntimeException(msg);
+ }
+ rowsCreated += result;
+ }
+ // Close the statement after our last batch execution.
+ stmt.close();
+ }
+
try {
connection.commit();
duration = System.currentTimeMillis() - start;