You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by mu...@apache.org on 2015/12/18 23:29:16 UTC

phoenix git commit: PHOENIX-2524 Fixes for pherf to run with queryserver (elserj)

Repository: phoenix
Updated Branches:
  refs/heads/master b38989dc7 -> 135b978ae


PHOENIX-2524 Fixes for pherf to run with queryserver (elserj)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/135b978a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/135b978a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/135b978a

Branch: refs/heads/master
Commit: 135b978aea1bb5cf0053698583a2002e4452e5b0
Parents: b38989d
Author: Mujtaba <mu...@apache.org>
Authored: Fri Dec 18 14:29:00 2015 -0800
Committer: Mujtaba <mu...@apache.org>
Committed: Fri Dec 18 14:29:00 2015 -0800

----------------------------------------------------------------------
 .../src/build/components/all-common-jars.xml    |  8 +++
 .../java/org/apache/phoenix/pherf/Pherf.java    | 19 ++++++-
 .../apache/phoenix/pherf/result/ResultUtil.java | 13 ++++-
 .../phoenix/pherf/result/file/Header.java       |  1 +
 .../pherf/result/file/ResultFileDetails.java    |  1 +
 .../apache/phoenix/pherf/util/PhoenixUtil.java  | 57 ++++++++++++++++----
 .../phoenix/pherf/workload/WriteWorkload.java   | 33 +++++++-----
 7 files changed, 105 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/135b978a/phoenix-assembly/src/build/components/all-common-jars.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/src/build/components/all-common-jars.xml b/phoenix-assembly/src/build/components/all-common-jars.xml
index bed9f25..960c3c9 100644
--- a/phoenix-assembly/src/build/components/all-common-jars.xml
+++ b/phoenix-assembly/src/build/components/all-common-jars.xml
@@ -104,5 +104,13 @@
       </excludes>
       <fileMode>0644</fileMode>
     </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/../phoenix-pherf/target/</directory>
+      <outputDirectory>lib</outputDirectory>
+      <includes>
+        <include>phoenix-*.jar</include>
+      </includes>
+      <fileMode>0644</fileMode>
+    </fileSet>
   </fileSets>
 </component>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/135b978a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
index eaf199a..b84183b 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
@@ -89,6 +89,8 @@ public class Pherf {
 		options.addOption("label", true, "Label a run. Result file name will be suffixed with specified label");
 		options.addOption("compare", true, "Specify labeled run(s) to compare");
 		options.addOption("useAverageCompareType", false, "Compare results with Average query time instead of default is Minimum query time.");
+		    options.addOption("t", "thin", false, "Use the Phoenix Thin Driver");
+		    options.addOption("s", "server", true, "The URL for the Phoenix QueryServer");
     }
 
     private final String zookeeper;
@@ -109,6 +111,8 @@ public class Pherf {
     private final String label;
     private final String compareResults;
     private final CompareType compareType;
+    private final boolean thinDriver;
+    private final String queryServerUrl;
 
     public Pherf(String[] args) throws Exception {
         CommandLineParser parser = new PosixParser();
@@ -155,14 +159,27 @@ public class Pherf {
         label = command.getOptionValue("label", null);
         compareResults = command.getOptionValue("compare", null);
         compareType = command.hasOption("useAverageCompareType") ? CompareType.AVERAGE : CompareType.MINIMUM;
+        thinDriver = command.hasOption("thin");
+        if (thinDriver) {
+            queryServerUrl = command.getOptionValue("server", "http://localhost:8765");
+        } else {
+            queryServerUrl = null;
+        }
 
         if ((command.hasOption("h") || (args == null || args.length == 0)) && !command
                 .hasOption("listFiles")) {
             hf.printHelp("Pherf", options);
             System.exit(1);
         }
-        PhoenixUtil.setZookeeper(zookeeper);
         PhoenixUtil.setRowCountOverride(rowCountOverride);
+        if (!thinDriver) {
+            logger.info("Using thick driver with ZooKeepers '{}'", zookeeper);
+            PhoenixUtil.setZookeeper(zookeeper);
+        } else {
+            logger.info("Using thin driver with PQS '{}'", queryServerUrl);
+            // Enables the thin-driver and sets the PQS URL
+            PhoenixUtil.useThinDriver(queryServerUrl);
+        }
         ResultUtil.setFileSuffix(label);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/135b978a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
index 92eb80a..0c2a7b8 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
@@ -84,7 +84,12 @@ public class ResultUtil {
         ensureBaseResultDirExists();
 
         CSVResultHandler writer = null;
-        ResultFileDetails resultFileDetails = ResultFileDetails.CSV_AGGREGATE_DATA_LOAD;
+        ResultFileDetails resultFileDetails;
+        if (PhoenixUtil.isThinDriver()) {
+            resultFileDetails = ResultFileDetails.CSV_THIN_AGGREGATE_DATA_LOAD;
+        } else {
+            resultFileDetails = ResultFileDetails.CSV_AGGREGATE_DATA_LOAD;
+        }
         try {
             writer = new CSVFileResultHandler();
             writer.setResultFileDetails(resultFileDetails);
@@ -92,7 +97,11 @@ public class ResultUtil {
 
             for (TableLoadTime loadTime : dataLoadTime.getTableLoadTime()) {
                 List<ResultValue> rowValues = new ArrayList<>();
-                rowValues.add(new ResultValue(PhoenixUtil.getZookeeper()));
+                if (PhoenixUtil.isThinDriver()) {
+                    rowValues.add(new ResultValue(PhoenixUtil.getQueryServerUrl()));
+                } else {
+                    rowValues.add(new ResultValue(PhoenixUtil.getZookeeper()));
+                }
                 rowValues.addAll(loadTime.getCsvRepresentation(this));
                 Result
                         result =

http://git-wip-us.apache.org/repos/asf/phoenix/blob/135b978a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
index 066fa7a..7d09f68 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
@@ -28,6 +28,7 @@ public enum Header {
     DETAILED_PERFORMANCE(DETAILED_BASE + ",RESULT_ROW_COUNT,RUN_TIME_MS"),
     DETAILED_FUNCTIONAL(DETAILED_BASE + ",DIFF_STATUS,EXPLAIN_PLAN"),
     AGGREGATE_DATA_LOAD("ZK,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"),
+    THIN_AGGREGATE_DATA_LOAD("QUERYSERVER,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"),
     MONITOR("STAT_NAME,STAT_VALUE,TIME_STAMP");
 
     private String header;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/135b978a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java
index a85f830..51aa407 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/ResultFileDetails.java
@@ -24,6 +24,7 @@ public enum ResultFileDetails {
     CSV_DETAILED_PERFORMANCE(Header.DETAILED_PERFORMANCE, Extension.DETAILED_CSV),
     CSV_DETAILED_FUNCTIONAL(Header.DETAILED_FUNCTIONAL, Extension.DETAILED_CSV),
     CSV_AGGREGATE_DATA_LOAD(Header.AGGREGATE_DATA_LOAD, Extension.CSV),
+    CSV_THIN_AGGREGATE_DATA_LOAD(Header.THIN_AGGREGATE_DATA_LOAD, Extension.CSV),
     CSV_MONITOR(Header.MONITOR, Extension.CSV),
     XML(Header.EMPTY, Extension.XML),
     IMAGE(Header.EMPTY, Extension.VISUALIZATION);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/135b978a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
index 064c604..b778833 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
@@ -28,6 +28,7 @@ import java.sql.*;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Properties;
 
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
@@ -39,6 +40,8 @@ public class PhoenixUtil {
     private static int rowCountOverride = 0;
     private boolean testEnabled;
     private static PhoenixUtil instance;
+    private static boolean useThinDriver;
+    private static String queryServerUrl;
 
     private PhoenixUtil() {
         this(false);
@@ -57,6 +60,19 @@ public class PhoenixUtil {
         return instance;
     }
 
+    public static void useThinDriver(String queryServerUrl) {
+        PhoenixUtil.useThinDriver = true;
+        PhoenixUtil.queryServerUrl = Objects.requireNonNull(queryServerUrl);
+    }
+
+    public static String getQueryServerUrl() {
+        return PhoenixUtil.queryServerUrl;
+    }
+
+    public static boolean isThinDriver() {
+        return PhoenixUtil.useThinDriver;
+    }
+
     public Connection getConnection() throws Exception {
         return getConnection(null);
     }
@@ -66,17 +82,31 @@ public class PhoenixUtil {
     }
 
     private Connection getConnection(String tenantId, boolean testEnabled) throws Exception {
-        if (null == zookeeper) {
-            throw new IllegalArgumentException(
-                    "Zookeeper must be set before initializing connection!");
-        }
-        Properties props = new Properties();
-        if (null != tenantId) {
-            props.setProperty("TenantId", tenantId);
-            logger.debug("\nSetting tenantId to " + tenantId);
+        if (useThinDriver) {
+            if (null == queryServerUrl) {
+                throw new IllegalArgumentException("QueryServer URL must be set before" +
+                      " initializing connection");
+            }
+            Properties props = new Properties();
+            if (null != tenantId) {
+                props.setProperty("TenantId", tenantId);
+                logger.debug("\nSetting tenantId to " + tenantId);
+            }
+            String url = "jdbc:phoenix:thin:url=" + queryServerUrl + ";serialization=PROTOBUF";
+            return DriverManager.getConnection(url, props);
+        } else {
+            if (null == zookeeper) {
+                throw new IllegalArgumentException(
+                        "Zookeeper must be set before initializing connection!");
+            }
+            Properties props = new Properties();
+            if (null != tenantId) {
+                props.setProperty("TenantId", tenantId);
+                logger.debug("\nSetting tenantId to " + tenantId);
+            }
+            String url = "jdbc:phoenix:" + zookeeper + (testEnabled ? ";test=true" : "");
+            return DriverManager.getConnection(url, props);
         }
-        String url = "jdbc:phoenix:" + zookeeper + (testEnabled ? ";test=true" : "");
-        return DriverManager.getConnection(url, props);
     }
 
     public boolean executeStatement(String sql, Scenario scenario) throws Exception {
@@ -278,7 +308,12 @@ public class PhoenixUtil {
 
     public static void setZookeeper(String zookeeper) {
         logger.info("Setting zookeeper: " + zookeeper);
-        PhoenixUtil.zookeeper = zookeeper;
+        useThickDriver(zookeeper);
+    }
+
+    public static void useThickDriver(String zookeeper) {
+        PhoenixUtil.useThinDriver = false;
+        PhoenixUtil.zookeeper = Objects.requireNonNull(zookeeper);
     }
 
     public static int getRowCountOverride() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/135b978a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
index 7c73e22..a35e6e8 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
@@ -168,7 +168,7 @@ public class WriteWorkload implements Workload {
         // Execute any Scenario DDL before running workload
         pUtil.executeScenarioDdl(scenario);
         
-        List<Future> writeBatches = getBatches(dataLoadThreadTime, scenario);
+        List<Future<Info>> writeBatches = getBatches(dataLoadThreadTime, scenario);
 
         waitForBatches(dataLoadTimeSummary, scenario, start, writeBatches);
 
@@ -182,12 +182,12 @@ public class WriteWorkload implements Workload {
         }
     }
 
-    private List<Future> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario)
+    private List<Future<Info>> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario)
             throws Exception {
         RowCalculator
                 rowCalculator =
                 new RowCalculator(getThreadPoolSize(), scenario.getRowCount());
-        List<Future> writeBatches = new ArrayList<>();
+        List<Future<Info>> writeBatches = new ArrayList<>();
 
         for (int i = 0; i < getThreadPoolSize(); i++) {
             List<Column>
@@ -212,7 +212,7 @@ public class WriteWorkload implements Workload {
     }
 
     private void waitForBatches(DataLoadTimeSummary dataLoadTimeSummary, Scenario scenario,
-            long start, List<Future> writeBatches)
+            long start, List<Future<Info>> writeBatches)
             throws InterruptedException, java.util.concurrent.ExecutionException {
         int sumRows = 0, sumDuration = 0;
         // Wait for all the batch threads to complete
@@ -223,10 +223,12 @@ public class WriteWorkload implements Workload {
             logger.info("Executor (" + this.hashCode() + ") writes complete with row count ("
                     + writeInfo.getRowCount() + ") in Ms (" + writeInfo.getDuration() + ")");
         }
-        logger.info("Writes completed with total row count (" + sumRows + ") with total time of("
-                + sumDuration + ") Ms");
+        long testDuration = System.currentTimeMillis() - start;
+        logger.info("Writes completed with total row count (" + sumRows
+                + ") with total elapsed time of (" + testDuration
+                + ") ms and total CPU execution time of (" + sumDuration + ") ms");
         dataLoadTimeSummary
-                .add(scenario.getTableName(), sumRows, (int) (System.currentTimeMillis() - start));
+                .add(scenario.getTableName(), sumRows, (int) testDuration);
     }
 
     public Future<Info> upsertData(final Scenario scenario, final List<Column> columns,
@@ -235,9 +237,10 @@ public class WriteWorkload implements Workload {
         Future<Info> future = pool.submit(new Callable<Info>() {
             @Override public Info call() throws Exception {
                 int rowsCreated = 0;
-                long start = 0, duration, totalDuration;
+                long start = 0, last = 0, duration, totalDuration;
                 SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                 Connection connection = null;
+                PreparedStatement stmt = null;
                 try {
                     connection = pUtil.getConnection(scenario.getTenantId());
                     long logStartTime = System.currentTimeMillis();
@@ -247,17 +250,16 @@ public class WriteWorkload implements Workload {
                                     Long.MAX_VALUE :
                                     WriteWorkload.this.writeParams.getExecutionDurationInMs();
 
+                    last = start = System.currentTimeMillis();
+                    String sql = buildSql(columns, tableName);
+                    stmt = connection.prepareStatement(sql);
                     for (long i = rowCount; (i > 0) && ((System.currentTimeMillis() - logStartTime)
                             < maxDuration); i--) {
-                        String sql = buildSql(columns, tableName);
-                        PreparedStatement stmt = connection.prepareStatement(sql);
                         stmt = buildStatement(scenario, columns, stmt, simpleDateFormat);
-                        start = System.currentTimeMillis();
                         rowsCreated += stmt.executeUpdate();
-                        stmt.close();
                         if ((i % getBatchSize()) == 0) {
                             connection.commit();
-                            duration = System.currentTimeMillis() - start;
+                            duration = System.currentTimeMillis() - last;
                             logger.info("Writer (" + Thread.currentThread().getName()
                                     + ") committed Batch. Total " + getBatchSize()
                                     + " rows for this thread (" + this.hashCode() + ") in ("
@@ -272,9 +274,14 @@ public class WriteWorkload implements Workload {
 
                             // Pause for throttling if configured to do so
                             Thread.sleep(threadSleepDuration);
+                            // Re-compute the start time for the next batch
+                            last = System.currentTimeMillis();
                         }
                     }
                 } finally {
+                    if (stmt != null) {
+                      stmt.close();
+                    }
                     if (connection != null) {
                         try {
                             connection.commit();