You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/11/04 22:13:34 UTC

[07/50] [abbrv] phoenix git commit: PHOENIX-3412 Used the batch JDBC APIs in pherf.

PHOENIX-3412 Used the batch JDBC APIs in pherf.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d7aea492
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d7aea492
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d7aea492

Branch: refs/heads/encodecolumns2
Commit: d7aea492984c72ac77be7dd1305b79e03347ea7a
Parents: 7f5d79a
Author: Josh Elser <el...@apache.org>
Authored: Thu Mar 24 21:48:30 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 27 14:35:10 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/phoenix/pherf/Pherf.java    |  6 +++
 .../phoenix/pherf/workload/WriteWorkload.java   | 49 ++++++++++++++++++--
 2 files changed, 51 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7aea492/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
index 154d6ff..43061e0 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
@@ -91,6 +91,7 @@ public class Pherf {
 		options.addOption("useAverageCompareType", false, "Compare results with Average query time instead of default is Minimum query time.");
 		    options.addOption("t", "thin", false, "Use the Phoenix Thin Driver");
 		    options.addOption("s", "server", true, "The URL for the Phoenix QueryServer");
+		    options.addOption("b", "batchApi", false, "Use JDBC Batch API for writes");
     }
 
     private final String zookeeper;
@@ -166,6 +167,11 @@ public class Pherf {
             queryServerUrl = null;
         }
 
+        if (command.hasOption('b')) {
+          // If the '-b' option was provided, set the system property for WriteWorkload to pick up.
+          System.setProperty(WriteWorkload.USE_BATCH_API_PROPERTY, Boolean.TRUE.toString());
+        }
+
         if ((command.hasOption("h") || (args == null || args.length == 0)) && !command
                 .hasOption("listFiles")) {
             hf.printHelp("Pherf", options);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7aea492/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
index e536eb9..69d35cc 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
@@ -52,6 +52,9 @@ import org.slf4j.LoggerFactory;
 
 public class WriteWorkload implements Workload {
     private static final Logger logger = LoggerFactory.getLogger(WriteWorkload.class);
+
+    public static final String USE_BATCH_API_PROPERTY = "pherf.default.dataloader.batchApi";
+
     private final PhoenixUtil pUtil;
     private final XMLConfigParser parser;
     private final RulesApplier rulesApplier;
@@ -64,6 +67,7 @@ public class WriteWorkload implements Workload {
     private final int threadPoolSize;
     private final int batchSize;
     private final GeneratePhoenixStats generateStatistics;
+    private final boolean useBatchApi;
 
     public WriteWorkload(XMLConfigParser parser) throws Exception {
         this(PhoenixUtil.create(), parser, GeneratePhoenixStats.NO);
@@ -119,6 +123,9 @@ public class WriteWorkload implements Workload {
 
         int size = Integer.parseInt(properties.getProperty("pherf.default.dataloader.threadpool"));
 
+        // Should addBatch/executeBatch be used? Default: false
+        this.useBatchApi = Boolean.getBoolean(USE_BATCH_API_PROPERTY);
+
         this.threadPoolSize = (size == 0) ? Runtime.getRuntime().availableProcessors() : size;
 
         // TODO Move pool management up to WorkloadExecutor
@@ -201,7 +208,7 @@ public class WriteWorkload implements Workload {
             Future<Info>
                     write =
                     upsertData(scenario, phxMetaCols, scenario.getTableName(), threadRowCount,
-                            dataLoadThreadTime);
+                            dataLoadThreadTime, this.useBatchApi);
             writeBatches.add(write);
         }
         if (writeBatches.isEmpty()) {
@@ -234,7 +241,7 @@ public class WriteWorkload implements Workload {
 
     public Future<Info> upsertData(final Scenario scenario, final List<Column> columns,
             final String tableName, final int rowCount,
-            final DataLoadThreadTime dataLoadThreadTime) {
+            final DataLoadThreadTime dataLoadThreadTime, final boolean useBatchApi) {
         Future<Info> future = pool.submit(new Callable<Info>() {
             @Override public Info call() throws Exception {
                 int rowsCreated = 0;
@@ -257,8 +264,25 @@ public class WriteWorkload implements Workload {
                     for (long i = rowCount; (i > 0) && ((System.currentTimeMillis() - logStartTime)
                             < maxDuration); i--) {
                         stmt = buildStatement(scenario, columns, stmt, simpleDateFormat);
-                        rowsCreated += stmt.executeUpdate();
+                        if (useBatchApi) {
+                            stmt.addBatch();
+                        } else {
+                            rowsCreated += stmt.executeUpdate();
+                        }
                         if ((i % getBatchSize()) == 0) {
+                            if (useBatchApi) {
+                                int[] results = stmt.executeBatch();
+                                for (int x = 0; x < results.length; x++) {
+                                    int result = results[x];
+                                    if (result < 1) {
+                                        final String msg =
+                                            "Failed to write update in batch (update count="
+                                                + result + ")";
+                                        throw new RuntimeException(msg);
+                                    }
+                                    rowsCreated += result;
+                                }
+                            }
                             connection.commit();
                             duration = System.currentTimeMillis() - last;
                             logger.info("Writer (" + Thread.currentThread().getName()
@@ -280,10 +304,27 @@ public class WriteWorkload implements Workload {
                         }
                     }
                 } finally {
-                    if (stmt != null) {
+                    // Need to keep the statement open to send the remaining batch of updates
+                    if (!useBatchApi && stmt != null) {
                       stmt.close();
                     }
                     if (connection != null) {
+                        if (useBatchApi && stmt != null) {
+                            int[] results = stmt.executeBatch();
+                            for (int x = 0; x < results.length; x++) {
+                                int result = results[x];
+                                if (result < 1) {
+                                    final String msg =
+                                        "Failed to write update in batch (update count="
+                                            + result + ")";
+                                    throw new RuntimeException(msg);
+                                }
+                                rowsCreated += result;
+                            }
+                            // Close the statement after our last batch execution.
+                            stmt.close();
+                        }
+
                         try {
                             connection.commit();
                             duration = System.currentTimeMillis() - start;