You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by co...@apache.org on 2015/06/20 01:35:28 UTC

[1/3] phoenix git commit: PHOENIX-1920 - Pherf - Add support for mixed r/w workloads

Repository: phoenix
Updated Branches:
  refs/heads/master 466eeb35f -> 7175dcbc0


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java
index 78f18ca..c9333a0 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java
@@ -43,153 +43,160 @@ import difflib.DiffUtils;
 import difflib.Patch;
 
 public class QueryVerifier {
-	private PhoenixUtil pUtil = new PhoenixUtil();
-	private static final Logger logger = LoggerFactory
-			.getLogger(QueryVerifier.class);
-	private boolean useTemporaryOutput;
-	private String directoryLocation;
-
-	public QueryVerifier(boolean useTemporaryOutput) {
-		this.useTemporaryOutput = useTemporaryOutput;
-		this.directoryLocation = this.useTemporaryOutput ? 
-				PherfConstants.EXPORT_TMP : PherfConstants.EXPORT_DIR;
-		
-		ensureBaseDirExists();
-	}
-	
-	/***
-	 * Export query resultSet to CSV file
-	 * @param query
-	 * @throws Exception
-	 */
-	public String exportCSV(Query query) throws Exception {
-		Connection conn = null;
-		PreparedStatement statement = null;
-		ResultSet rs = null;
-		String fileName = getFileName(query);
-		FileOutputStream fos = new FileOutputStream(fileName);
-		try {
-			conn = pUtil.getConnection(query.getTenantId());
-			statement = conn.prepareStatement(query.getStatement());
-			boolean isQuery = statement.execute();
-			if (isQuery) {
-				rs = statement.executeQuery();
-				int columnCount = rs.getMetaData().getColumnCount();
-				while (rs.next()) {
-					for (int columnNum = 1; columnNum <= columnCount; columnNum++) {
-						fos.write((rs.getString(columnNum) + PherfConstants.RESULT_FILE_DELIMETER).getBytes());
-					}
-					fos.write(PherfConstants.NEW_LINE.getBytes());
-				}
-			} else {
-				conn.commit();
-			}
-		} catch (Exception e) {
-			e.printStackTrace();
-		} finally {
-			if (rs != null) rs.close();
-			if (statement != null) statement.close();
-			if (conn != null) conn.close();
-			fos.flush();
-			fos.close();
-		}
-		return fileName;
-	}
-	
-	/***
-	 * Do a diff between exported query results and temporary CSV file
-	 * @param query
-	 * @param newCSV
-	 * @return
-	 */
-	public boolean doDiff(Query query, String newCSV) {
+    private PhoenixUtil pUtil = PhoenixUtil.create();
+    private static final Logger logger = LoggerFactory.getLogger(QueryVerifier.class);
+    private boolean useTemporaryOutput;
+    private String directoryLocation;
+
+    public QueryVerifier(boolean useTemporaryOutput) {
+        this.useTemporaryOutput = useTemporaryOutput;
+        this.directoryLocation =
+                this.useTemporaryOutput ? PherfConstants.EXPORT_TMP : PherfConstants.EXPORT_DIR;
+
+        ensureBaseDirExists();
+    }
+
+    /**
+     * Export query resultSet to CSV file
+     *
+     * @param query
+     * @throws Exception
+     */
+    public String exportCSV(Query query) throws Exception {
+        Connection conn = null;
+        PreparedStatement statement = null;
+        ResultSet rs = null;
+        String fileName = getFileName(query);
+        FileOutputStream fos = new FileOutputStream(fileName);
+        try {
+            conn = pUtil.getConnection(query.getTenantId());
+            statement = conn.prepareStatement(query.getStatement());
+            boolean isQuery = statement.execute();
+            if (isQuery) {
+                rs = statement.executeQuery();
+                int columnCount = rs.getMetaData().getColumnCount();
+                while (rs.next()) {
+                    for (int columnNum = 1; columnNum <= columnCount; columnNum++) {
+                        fos.write((rs.getString(columnNum) + PherfConstants.RESULT_FILE_DELIMETER)
+                                .getBytes());
+                    }
+                    fos.write(PherfConstants.NEW_LINE.getBytes());
+                }
+            } else {
+                conn.commit();
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (rs != null) rs.close();
+            if (statement != null) statement.close();
+            if (conn != null) conn.close();
+            fos.flush();
+            fos.close();
+        }
+        return fileName;
+    }
+
+    /**
+     * Do a diff between exported query results and temporary CSV file
+     *
+     * @param query
+     * @param newCSV
+     * @return
+     */
+    public boolean doDiff(Query query, String newCSV) {
         List<String> original = fileToLines(getCSVName(query, PherfConstants.EXPORT_DIR, ""));
-        List<String> newLines  = fileToLines(newCSV);
-        
+        List<String> newLines = fileToLines(newCSV);
+
         Patch patch = DiffUtils.diff(original, newLines);
         if (patch.getDeltas().isEmpty()) {
-        	logger.info("Match: " + query.getId() + " with " + newCSV);
-        	return true;
+            logger.info("Match: " + query.getId() + " with " + newCSV);
+            return true;
         } else {
-        	logger.error("DIFF FAILED: " + query.getId() + " with " + newCSV);
-        	return false;
+            logger.error("DIFF FAILED: " + query.getId() + " with " + newCSV);
+            return false;
         }
-	}
-	
-	/***
-	 * Helper method to load file
-	 * @param filename
-	 * @return
-	 */
+    }
+
+    /**
+     * Helper method to load file
+     *
+     * @param filename
+     * @return
+     */
     private static List<String> fileToLines(String filename) {
-            List<String> lines = new LinkedList<String>();
-            String line = "";
-            try {
-                    BufferedReader in = new BufferedReader(new FileReader(filename));
-                    while ((line = in.readLine()) != null) {
-                            lines.add(line);
-                    }
-                    in.close();
-            } catch (IOException e) {
-                    e.printStackTrace();
+        List<String> lines = new LinkedList<String>();
+        String line = "";
+        try {
+            BufferedReader in = new BufferedReader(new FileReader(filename));
+            while ((line = in.readLine()) != null) {
+                lines.add(line);
             }
-            
-            return lines;
+            in.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        return lines;
     }
 
     /**
      * Get explain plan for a query
+     *
      * @param query
      * @return
      * @throws SQLException
      */
-	public String getExplainPlan(Query query) throws SQLException {
-		Connection conn = null;
-		ResultSet rs = null;
-		PreparedStatement statement = null;
-		StringBuilder buf = new StringBuilder();
-		try {
-			conn = pUtil.getConnection(query.getTenantId());
-			statement = conn.prepareStatement("EXPLAIN " + query.getStatement());
-			rs = statement.executeQuery();
-	        while (rs.next()) {
-	            buf.append(rs.getString(1).trim().replace(",", "-"));
-	        }
-			statement.close();
-		} catch (Exception e) {
-			e.printStackTrace();
-		} finally {
-			if (rs != null) rs.close();
-			if (statement != null) statement.close();
-			if (conn != null) conn.close();
-		}
-		return buf.toString();
-	}
-	
-    /***
+    public String getExplainPlan(Query query) throws SQLException {
+        Connection conn = null;
+        ResultSet rs = null;
+        PreparedStatement statement = null;
+        StringBuilder buf = new StringBuilder();
+        try {
+            conn = pUtil.getConnection(query.getTenantId());
+            statement = conn.prepareStatement("EXPLAIN " + query.getStatement());
+            rs = statement.executeQuery();
+            while (rs.next()) {
+                buf.append(rs.getString(1).trim().replace(",", "-"));
+            }
+            statement.close();
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (rs != null) rs.close();
+            if (statement != null) statement.close();
+            if (conn != null) conn.close();
+        }
+        return buf.toString();
+    }
+
+    /**
      * Helper method to generate CSV file name
+     *
      * @param query
      * @return
      * @throws FileNotFoundException
      */
-	private String getFileName(Query query) throws FileNotFoundException {
-		String tempExt = "";
-		if (this.useTemporaryOutput) {
-			tempExt = "_" + java.util.UUID.randomUUID().toString();
-		}
-		return getCSVName(query, this.directoryLocation, tempExt);
-	}
-	
-	private String getCSVName(Query query, String directory, String tempExt) {
-		String csvFile = directory + PherfConstants.PATH_SEPARATOR
-		        + query.getId() + tempExt + Extension.CSV.toString();
-				return csvFile;
-	}
-	
-	private void ensureBaseDirExists() {
-		File baseDir = new File(this.directoryLocation);
-		if (!baseDir.exists()) {
-			baseDir.mkdir();
-		}
-	}
+    private String getFileName(Query query) throws FileNotFoundException {
+        String tempExt = "";
+        if (this.useTemporaryOutput) {
+            tempExt = "_" + java.util.UUID.randomUUID().toString();
+        }
+        return getCSVName(query, this.directoryLocation, tempExt);
+    }
+
+    private String getCSVName(Query query, String directory, String tempExt) {
+        String
+                csvFile =
+                directory + PherfConstants.PATH_SEPARATOR + query.getId() + tempExt + Extension.CSV
+                        .toString();
+        return csvFile;
+    }
+
+    private void ensureBaseDirExists() {
+        File baseDir = new File(this.directoryLocation);
+        if (!baseDir.exists()) {
+            baseDir.mkdir();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
new file mode 100644
index 0000000..16a493e
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
@@ -0,0 +1,10 @@
+package org.apache.phoenix.pherf.workload;
+
+public interface Workload {
+    public Runnable execute() throws Exception;
+
+    /**
+     * Use this method to perform any cleanup or forced shutdown of the thread.
+     */
+    public void complete();
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
index cf2f038..a65b4aa 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
@@ -19,95 +19,96 @@
 package org.apache.phoenix.pherf.workload;
 
 import org.apache.phoenix.pherf.PherfConstants;
-import org.apache.phoenix.pherf.PherfConstants.RunMode;
-import org.apache.phoenix.pherf.configuration.XMLConfigParser;
-import org.apache.phoenix.pherf.jmx.MonitorManager;
-import org.apache.phoenix.pherf.loaddata.DataLoader;
-
-import org.apache.phoenix.pherf.util.ResourceList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 public class WorkloadExecutor {
     private static final Logger logger = LoggerFactory.getLogger(WorkloadExecutor.class);
-    private final XMLConfigParser parser;
-    private MonitorManager monitor;
-    private Future monitorThread;
     private final int poolSize;
 
-    private final ExecutorService pool;
+    // Jobs can be accessed by multiple threads
+    private final Map<Workload, Future> jobs = new ConcurrentHashMap<>();
 
+    private final ExecutorService pool;
 
     public WorkloadExecutor() throws Exception {
         this(PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES));
     }
 
-    public WorkloadExecutor(Properties properties) throws Exception{
-        this(properties,PherfConstants.DEFAULT_FILE_PATTERN);
+    public WorkloadExecutor(Properties properties) throws Exception {
+        this(properties, new ArrayList());
     }
 
-    public WorkloadExecutor(Properties properties, String filePattern) throws Exception {
-        this(properties,
-                new XMLConfigParser(filePattern),
-                true);
+    public WorkloadExecutor(Properties properties, List<Workload> workloads) throws Exception {
+        this.poolSize =
+                (properties.getProperty("pherf.default.threadpool") == null) ?
+                        PherfConstants.DEFAULT_THREAD_POOL_SIZE :
+                        Integer.parseInt(properties.getProperty("pherf.default.threadpool"));
+
+        this.pool = Executors.newFixedThreadPool(this.poolSize);
+        init(workloads);
     }
 
-    public WorkloadExecutor(Properties properties, XMLConfigParser parser, boolean monitor) throws Exception {
-        this.parser = parser;
-        this.poolSize = (properties.getProperty("pherf.default.threadpool") == null)
-                ? PherfConstants.DEFAULT_THREAD_POOL_SIZE
-                : Integer.parseInt(properties.getProperty("pherf.default.threadpool"));
+    public void add(Workload workload) throws Exception {
+        this.jobs.put(workload, pool.submit(workload.execute()));
+    }
 
-        this.pool = Executors.newFixedThreadPool(this.poolSize);
-        if (monitor) {
-            initMonitor(Integer.parseInt(properties.getProperty("pherf.default.monitorFrequency")));
+    /**
+     * Blocks on waiting for all workloads to finish. If a
+     * {@link org.apache.phoenix.pherf.workload.Workload} Requires complete() to be called, it must
+     * be called prior to using this method. Otherwise it will block infinitely.
+     */
+    public void get() {
+        for (Workload workload : jobs.keySet()) {
+            get(workload);
         }
     }
 
     /**
-     * Executes all scenarios dataload
+     * Calls the {@link java.util.concurrent.Future#get()} method pertaining to this workflow.
+     * Once the Future competes, the workflow is removed from the list.
      *
-     * @throws Exception
+     * @param workload Key entry in the HashMap
      */
-    public void executeDataLoad() throws Exception {
-        logger.info("\n\nStarting Data Loader...");
-        DataLoader dataLoader = new DataLoader(parser);
-        dataLoader.execute();
+    public void get(Workload workload) {
+        try {
+            Future future = jobs.get(workload);
+            future.get();
+            jobs.remove(workload);
+        } catch (InterruptedException | ExecutionException e) {
+            logger.error("", e);
+        }
     }
 
     /**
-     * Executes all scenario multi-threaded query sets
-     *
-     * @param queryHint
-     * @throws Exception
+     * Complete all workloads in the list.
+     * Entries in the job Map will persist until {#link WorkloadExecutorNew#get()} is called
      */
-    public void executeMultithreadedQueryExecutor(String queryHint, boolean export, RunMode runMode) throws Exception {
-        logger.info("\n\nStarting Query Executor...");
-        QueryExecutor queryExecutor = new QueryExecutor(parser);
-        queryExecutor.execute(queryHint, export, runMode);
+    public void complete() {
+        for (Workload workload : jobs.keySet()) {
+            workload.complete();
+        }
     }
 
-    public void shutdown() throws Exception {
-		if (null != monitor && monitor.isRunning()) {
-            this.monitor.stop();
-            this.monitorThread.get(60, TimeUnit.SECONDS);
-            this.pool.shutdown();
-        }
+    public void shutdown() {
+        // Make sure any Workloads still on pool have been properly shutdown
+        complete();
+        pool.shutdownNow();
     }
 
-    // Just used for testing
-    public XMLConfigParser getParser() {
-        return parser;
+    public ExecutorService getPool() {
+        return pool;
     }
 
-    private void initMonitor(int monitorFrequency) throws Exception {
-        this.monitor = new MonitorManager(monitorFrequency);
-        monitorThread = pool.submit(this.monitor);
+    private void init(List<Workload> workloads) throws Exception {
+        for (Workload workload : workloads) {
+            this.jobs.put(workload, pool.submit(workload.execute()));
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
new file mode 100644
index 0000000..305521b
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload;
+
+import org.apache.phoenix.pherf.PherfConstants;
+import org.apache.phoenix.pherf.configuration.Column;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.configuration.WriteParams;
+import org.apache.phoenix.pherf.configuration.XMLConfigParser;
+import org.apache.phoenix.pherf.exception.PherfException;
+import org.apache.phoenix.pherf.result.DataLoadThreadTime;
+import org.apache.phoenix.pherf.result.DataLoadTimeSummary;
+import org.apache.phoenix.pherf.result.ResultUtil;
+import org.apache.phoenix.pherf.rules.DataValue;
+import org.apache.phoenix.pherf.rules.RulesApplier;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.util.RowCalculator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.sql.*;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+public class WriteWorkload implements Workload {
+    private static final Logger logger = LoggerFactory.getLogger(WriteWorkload.class);
+    private final PhoenixUtil pUtil;
+    private final XMLConfigParser parser;
+    private final RulesApplier rulesApplier;
+    private final ResultUtil resultUtil;
+    private final ExecutorService pool;
+    private final WriteParams writeParams;
+    private final Scenario scenario;
+    private final long threadSleepDuration;
+
+    private final int threadPoolSize;
+    private final int batchSize;
+
+    public WriteWorkload(XMLConfigParser parser) throws Exception {
+        this(PhoenixUtil.create(), parser);
+    }
+
+    public WriteWorkload(PhoenixUtil util, XMLConfigParser parser) throws Exception {
+        this(util, parser, null);
+    }
+
+    public WriteWorkload(PhoenixUtil phoenixUtil, XMLConfigParser parser, Scenario scenario)
+            throws Exception {
+        this(phoenixUtil, PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES),
+                parser, scenario);
+    }
+
+    /**
+     * Default the writers to use up all available cores for threads. If writeParams are used in
+     * the config files, they will override the defaults. writeParams are used for read/write mixed
+     * workloads.
+     * TODO extract notion of the scenario list and have 1 write workload per scenario
+     *
+     * @param phoenixUtil {@link org.apache.phoenix.pherf.util.PhoenixUtil} Query helper
+     * @param properties  {@link java.util.Properties} default properties to use
+     * @param parser      {@link org.apache.phoenix.pherf.configuration.XMLConfigParser}
+     * @param scenario    {@link org.apache.phoenix.pherf.configuration.Scenario} If null is passed
+     *                    it will run against all scenarios in the parsers list.
+     * @throws Exception
+     */
+    public WriteWorkload(PhoenixUtil phoenixUtil, Properties properties, XMLConfigParser parser,
+            Scenario scenario) throws Exception {
+        this.pUtil = phoenixUtil;
+        this.parser = parser;
+        this.rulesApplier = new RulesApplier(parser);
+        this.resultUtil = new ResultUtil();
+
+        // Overwrite defaults properties with those given in the configuration. This indicates the
+        // scenario is a R/W mixed workload.
+        if (scenario != null) {
+            this.scenario = scenario;
+            writeParams = scenario.getWriteParams();
+            threadSleepDuration = writeParams.getThreadSleepDuration();
+        } else {
+            writeParams = null;
+            this.scenario = null;
+            threadSleepDuration = 0;
+        }
+
+        int size = Integer.parseInt(properties.getProperty("pherf.default.dataloader.threadpool"));
+
+        this.threadPoolSize = (size == 0) ? Runtime.getRuntime().availableProcessors() : size;
+
+        // TODO Move pool management up to WorkloadExecutor
+        this.pool = Executors.newFixedThreadPool(this.threadPoolSize);
+
+        String
+                bSize =
+                (writeParams == null) || (writeParams.getBatchSize() == Long.MIN_VALUE) ?
+                        properties.getProperty("pherf.default.dataloader.batchsize") :
+                        String.valueOf(writeParams.getBatchSize());
+        this.batchSize =
+                (bSize == null) ? PherfConstants.DEFAULT_BATCH_SIZE : Integer.parseInt(bSize);
+    }
+
+    @Override public void complete() {
+    }
+
+    public Runnable execute() throws Exception {
+        return new Runnable() {
+            @Override public void run() {
+                try {
+                    DataLoadTimeSummary dataLoadTimeSummary = new DataLoadTimeSummary();
+                    DataLoadThreadTime dataLoadThreadTime = new DataLoadThreadTime();
+
+                    if (WriteWorkload.this.scenario == null) {
+                        for (Scenario scenario : getParser().getScenarios()) {
+                            exec(dataLoadTimeSummary, dataLoadThreadTime, scenario);
+                        }
+                    } else {
+                        exec(dataLoadTimeSummary, dataLoadThreadTime, WriteWorkload.this.scenario);
+                    }
+                    resultUtil.write(dataLoadTimeSummary);
+                    resultUtil.write(dataLoadThreadTime);
+
+                } catch (Exception e) {
+                    logger.warn("", e);
+                }
+            }
+        };
+    }
+
+    private synchronized void exec(DataLoadTimeSummary dataLoadTimeSummary,
+            DataLoadThreadTime dataLoadThreadTime, Scenario scenario) throws Exception {
+        logger.info("\nLoading " + scenario.getRowCount() + " rows for " + scenario.getTableName());
+        long start = System.currentTimeMillis();
+
+        List<Future> writeBatches = getBatches(dataLoadThreadTime, scenario);
+
+        waitForBatches(dataLoadTimeSummary, scenario, start, writeBatches);
+
+        // always update stats for Phoenix base tables
+        updatePhoenixStats(scenario.getTableName());
+    }
+
+    private List<Future> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario)
+            throws Exception {
+        RowCalculator
+                rowCalculator =
+                new RowCalculator(getThreadPoolSize(), scenario.getRowCount());
+        List<Future> writeBatches = new ArrayList<>();
+
+        for (int i = 0; i < getThreadPoolSize(); i++) {
+            List<Column>
+                    phxMetaCols =
+                    pUtil.getColumnsFromPhoenix(scenario.getSchemaName(),
+                            scenario.getTableNameWithoutSchemaName(), pUtil.getConnection());
+            int threadRowCount = rowCalculator.getNext();
+            logger.info(
+                    "Kick off thread (#" + i + ")for upsert with (" + threadRowCount + ") rows.");
+            Future<Info>
+                    write =
+                    upsertData(scenario, phxMetaCols, scenario.getTableName(), threadRowCount,
+                            dataLoadThreadTime);
+            writeBatches.add(write);
+        }
+        if (writeBatches.isEmpty()) {
+            throw new PherfException(
+                    "Holy shit snacks! Throwing up hands in disbelief and exiting. Could not write data for some unknown reason.");
+        }
+
+        return writeBatches;
+    }
+
+    private void waitForBatches(DataLoadTimeSummary dataLoadTimeSummary, Scenario scenario,
+            long start, List<Future> writeBatches)
+            throws InterruptedException, java.util.concurrent.ExecutionException {
+        int sumRows = 0, sumDuration = 0;
+        // Wait for all the batch threads to complete
+        for (Future<Info> write : writeBatches) {
+            Info writeInfo = write.get();
+            sumRows += writeInfo.getRowCount();
+            sumDuration += writeInfo.getDuration();
+            logger.info("Executor (" + this.hashCode() + ") writes complete with row count ("
+                    + writeInfo.getRowCount() + ") in Ms (" + writeInfo.getDuration() + ")");
+        }
+        logger.info("Writes completed with total row count (" + sumRows + ") with total time of("
+                + sumDuration + ") Ms");
+        dataLoadTimeSummary
+                .add(scenario.getTableName(), sumRows, (int) (System.currentTimeMillis() - start));
+    }
+
+    /**
+     * TODO Move this method to PhoenixUtil
+     * Update Phoenix table stats
+     *
+     * @param tableName
+     * @throws Exception
+     */
+    public void updatePhoenixStats(String tableName) throws Exception {
+        logger.info("Updating stats for " + tableName);
+        pUtil.executeStatement("UPDATE STATISTICS " + tableName);
+    }
+
+    public Future<Info> upsertData(final Scenario scenario, final List<Column> columns,
+            final String tableName, final int rowCount,
+            final DataLoadThreadTime dataLoadThreadTime) {
+        Future<Info> future = pool.submit(new Callable<Info>() {
+            @Override public Info call() throws Exception {
+                int rowsCreated = 0;
+                long start = 0, duration, totalDuration;
+                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+                Connection connection = null;
+                try {
+                    connection = pUtil.getConnection();
+                    long logStartTime = System.currentTimeMillis();
+                    long
+                            maxDuration =
+                            (WriteWorkload.this.writeParams == null) ?
+                                    Long.MAX_VALUE :
+                                    WriteWorkload.this.writeParams.getExecutionDurationInMs();
+
+                    for (long i = rowCount; (i > 0) && ((System.currentTimeMillis() - logStartTime)
+                            < maxDuration); i--) {
+                        String sql = buildSql(columns, tableName);
+                        PreparedStatement stmt = connection.prepareStatement(sql);
+                        stmt = buildStatement(scenario, columns, stmt, simpleDateFormat);
+                        start = System.currentTimeMillis();
+                        rowsCreated += stmt.executeUpdate();
+                        stmt.close();
+                        if ((i % getBatchSize()) == 0) {
+                            connection.commit();
+                            duration = System.currentTimeMillis() - start;
+                            logger.info("Writer (" + Thread.currentThread().getName()
+                                    + ") committed Batch. Total " + getBatchSize()
+                                    + " rows for this thread (" + this.hashCode() + ") in ("
+                                    + duration + ") Ms");
+
+                            if (i % PherfConstants.LOG_PER_NROWS == 0 && i != 0) {
+                                dataLoadThreadTime
+                                        .add(tableName, Thread.currentThread().getName(), i,
+                                                System.currentTimeMillis() - logStartTime);
+                                logStartTime = System.currentTimeMillis();
+                            }
+
+                            // Pause for throttling if configured to do so
+                            Thread.sleep(threadSleepDuration);
+                        }
+                    }
+                } finally {
+                    if (connection != null) {
+                        try {
+                            connection.commit();
+                            duration = System.currentTimeMillis() - start;
+                            logger.info("Writer ( " + Thread.currentThread().getName()
+                                    + ") committed Final Batch. Duration (" + duration + ") Ms");
+                            connection.close();
+                        } catch (SQLException e) {
+                            // Swallow since we are closing anyway
+                            e.printStackTrace();
+                        }
+                    }
+                }
+                totalDuration = System.currentTimeMillis() - start;
+                return new Info(totalDuration, rowsCreated);
+            }
+        });
+        return future;
+    }
+
+    private PreparedStatement buildStatement(Scenario scenario, List<Column> columns,
+            PreparedStatement statement, SimpleDateFormat simpleDateFormat) throws Exception {
+        int count = 1;
+        for (Column column : columns) {
+
+            DataValue dataValue = getRulesApplier().getDataForRule(scenario, column);
+            switch (column.getType()) {
+            case VARCHAR:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.VARCHAR);
+                } else {
+                    statement.setString(count, dataValue.getValue());
+                }
+                break;
+            case CHAR:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.CHAR);
+                } else {
+                    statement.setString(count, dataValue.getValue());
+                }
+                break;
+            case DECIMAL:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.DECIMAL);
+                } else {
+                    statement.setBigDecimal(count, new BigDecimal(dataValue.getValue()));
+                }
+                break;
+            case INTEGER:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.INTEGER);
+                } else {
+                    statement.setInt(count, Integer.parseInt(dataValue.getValue()));
+                }
+                break;
+            case DATE:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.DATE);
+                } else {
+                    Date
+                            date =
+                            new java.sql.Date(
+                                    simpleDateFormat.parse(dataValue.getValue()).getTime());
+                    statement.setDate(count, date);
+                }
+                break;
+            default:
+                break;
+            }
+            count++;
+        }
+        return statement;
+    }
+
+    private String buildSql(final List<Column> columns, final String tableName) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("upsert into ");
+        builder.append(tableName);
+        builder.append(" (");
+        int count = 1;
+        for (Column column : columns) {
+            builder.append(column.getName());
+            if (count < columns.size()) {
+                builder.append(",");
+            } else {
+                builder.append(")");
+            }
+            count++;
+        }
+        builder.append(" VALUES (");
+        for (int i = 0; i < columns.size(); i++) {
+            if (i < columns.size() - 1) {
+                builder.append("?,");
+            } else {
+                builder.append("?)");
+            }
+        }
+        return builder.toString();
+    }
+
+    public XMLConfigParser getParser() {
+        return parser;
+    }
+
+    public RulesApplier getRulesApplier() {
+        return rulesApplier;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public int getThreadPoolSize() {
+        return threadPoolSize;
+    }
+
+    private class Info {
+
+        private final int rowCount;
+        private final long duration;
+
+        public Info(long duration, int rows) {
+            this.duration = duration;
+            this.rowCount = rows;
+        }
+
+        public long getDuration() {
+            return duration;
+        }
+
+        public int getRowCount() {
+            return rowCount;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
index 9514089..8f93685 100644
--- a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
+++ b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
@@ -304,6 +304,41 @@
         </column>
     </datamapping>
     <scenarios>
+        <scenario tableName="PHERF.PHERF_PROD_TEST_UNSALTED" rowCount="100" name="readWriteScenario">
+            <!-- Scenario level rule overrides will be unsupported in V1.
+                    You can use the general datamappings in the mean time-->
+            <dataOverride>
+                <column>
+                    <type>VARCHAR</type>
+                    <userDefined>true</userDefined>
+                    <dataSequence>LIST</dataSequence>
+                    <name>TENANT_ID</name>
+                </column>
+            </dataOverride>
+            <writeParams executionDurationInMs="10000">
+                <!--
+                    Number of writer it insert into the threadpool
+                -->
+                <writerThreadCount>5</writerThreadCount>
+
+                <!--
+                    Time in Ms that each thread will sleep between batch writes. This helps to
+                    throttle writers.
+                -->
+                <threadSleepDuration>10</threadSleepDuration>
+
+                <batchSize>100</batchSize>
+            </writeParams>
+            <!--Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first -->
+            <querySet concurrency="1" executionType="PARALLEL" executionDurationInMs="60000"
+                      numberOfExecutions="100">
+                <!--  Aggregate queries on a per tenant basis -->
+                <query tenantId="00Dxx0000001gER"
+                       ddl="CREATE VIEW IF NOT EXISTS PHERF.PHERF_TEST_VIEW_UNSALTED AS SELECT * FROM PHERF.PHERF_PROD_TEST_UNSALTED"
+                       statement="select count(*) from PHERF.PHERF_TEST_VIEW_UNSALTED"/>
+            </querySet>
+
+        </scenario>
         <scenario tableName="PHERF.PHERF_PROD_TEST_UNSALTED" rowCount="10">
             <!-- Scenario level rule overrides will be unsupported in V1.
                     You can use the general datamappings in the mean time-->

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
index f362842..6f25fbd 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.phoenix.pherf;
 
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -27,7 +28,6 @@ import java.util.List;
 
 import org.apache.phoenix.pherf.configuration.*;
 import org.apache.phoenix.pherf.rules.DataValue;
-import org.apache.phoenix.pherf.workload.WorkloadExecutor;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,53 +38,55 @@ import javax.xml.bind.Marshaller;
 
 import static org.junit.Assert.*;
 
-public class ConfigurationParserTest extends ResultBaseTest{
+public class ConfigurationParserTest extends ResultBaseTest {
     private static final Logger logger = LoggerFactory.getLogger(ConfigurationParserTest.class);
 
     @Test
-    public void testConfigFilesParsing() {
-        try {
-        	WorkloadExecutor workloadExec = new WorkloadExecutor();
-            List<Scenario> scenarioList = workloadExec.getParser().getScenarios();
-            assertTrue("Could not load the scenarios from xml.", (scenarioList != null) && (scenarioList.size() > 0));
-            logger.info("Number of scenarios loaded: " + scenarioList.size());
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail();
+    public void testReadWriteWorkloadReader() throws Exception {
+        String scenarioName = "testScenarioRW";
+        List<Scenario> scenarioList = getScenarios();
+        Scenario target = null;
+        for (Scenario scenario : scenarioList) {
+            if (scenarioName.equals(scenario.getName())) {
+                target = scenario;
+            }
         }
+        assertNotNull("Could not find scenario: " + scenarioName, target);
+        WriteParams params = target.getWriteParams();
+
+        assertNotNull("Could not find writeParams in scenario: " + scenarioName, params);
+        assertNotNull("Could not find batch size: ", params.getBatchSize());
+        assertNotNull("Could not find execution duration: ", params.getExecutionDurationInMs());
+        assertNotNull("Could not find sleep duration: ", params.getThreadSleepDuration());
+        assertNotNull("Could not find writer count: ", params.getWriterThreadCount());
     }
 
-	@Test
+    @Test
     // TODO Break this into multiple smaller tests.
-    public void testConfigReader(){
-		URL resourceUrl = getClass().getResource("/scenario/test_scenario.xml");
-        assertNotNull("Test data XML file is missing", resourceUrl);
-
-		try {
+    public void testConfigReader() {
+        try {
 
             logger.debug("DataModel: " + writeXML());
-			Path resourcePath = Paths.get(resourceUrl.toURI());
-            DataModel data = XMLConfigParser.readDataModel(resourcePath);
-            List<Scenario> scenarioList = data.getScenarios();
-            assertTrue("Could not load the scenarios from xml.", (scenarioList != null) && (scenarioList.size() > 0));
-            List<Column> dataMappingColumns = data.getDataMappingColumns();
-            assertTrue("Could not load the data columns from xml.", (dataMappingColumns != null) && (dataMappingColumns.size() > 0));
+            List<Scenario> scenarioList = getScenarios();
+            List<Column> dataMappingColumns = getDataModel().getDataMappingColumns();
+            assertTrue("Could not load the data columns from xml.",
+                    (dataMappingColumns != null) && (dataMappingColumns.size() > 0));
             assertTrue("Could not load the data DataValue list from xml.",
                     (dataMappingColumns.get(6).getDataValues() != null)
-                    && (dataMappingColumns.get(6).getDataValues().size() > 0));
+                            && (dataMappingColumns.get(6).getDataValues().size() > 0));
 
             assertDateValue(dataMappingColumns);
 
             // Validate column mappings
             for (Column column : dataMappingColumns) {
-                assertNotNull("Column ("+ column.getName() + ") is missing its type",column.getType());
+                assertNotNull("Column (" + column.getName() + ") is missing its type",
+                        column.getType());
             }
 
-            Scenario scenario = scenarioList.get(0);
+            Scenario scenario = scenarioList.get(1);
             assertNotNull(scenario);
             assertEquals("PHERF.TEST_TABLE", scenario.getTableName());
-            assertEquals(10, scenario.getRowCount());
+            assertEquals(30, scenario.getRowCount());
             assertEquals(1, scenario.getDataOverride().getColumn().size());
             QuerySet qs = scenario.getQuerySet().get(0);
             assertEquals(ExecutionType.SERIAL, qs.getExecutionType());
@@ -99,27 +101,50 @@ public class ConfigurationParserTest extends ResultBaseTest{
             assertEquals("select count(*) from PHERF.TEST_TABLE", firstQuery.getStatement());
             assertEquals("123456789012345", firstQuery.getTenantId());
             assertEquals(null, firstQuery.getDdl());
-            assertEquals(0, (long)firstQuery.getExpectedAggregateRowCount());
+            assertEquals(0, (long) firstQuery.getExpectedAggregateRowCount());
 
             Query secondQuery = qs.getQuery().get(1);
-            assertEquals("Could not get statement.", "select sum(SOME_INT) from PHERF.TEST_TABLE", secondQuery.getStatement());
+            assertEquals("Could not get statement.", "select sum(SOME_INT) from PHERF.TEST_TABLE",
+                    secondQuery.getStatement());
             assertEquals("Could not get queryGroup.", "g1", secondQuery.getQueryGroup());
 
             // Make sure anything in the overrides matches a real column in the data mappings
             DataOverride override = scenario.getDataOverride();
             for (Column column : override.getColumn()) {
-                assertTrue("Could not lookup Column (" + column.getName() + ") in DataMapping columns: " + dataMappingColumns, dataMappingColumns.contains(column));
+                assertTrue("Could not lookup Column (" + column.getName()
+                        + ") in DataMapping columns: " + dataMappingColumns,
+                        dataMappingColumns.contains(column));
             }
 
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail();
-		}
-	}
+        } catch (Exception e) {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+    private URL getResourceUrl() {
+        URL resourceUrl = getClass().getResource("/scenario/test_scenario.xml");
+        assertNotNull("Test data XML file is missing", resourceUrl);
+        return resourceUrl;
+    }
+
+    private List<Scenario> getScenarios() throws URISyntaxException, JAXBException{
+        DataModel data = getDataModel();
+        List<Scenario> scenarioList = data.getScenarios();
+        assertTrue("Could not load the scenarios from xml.",
+                (scenarioList != null) && (scenarioList.size() > 0));
+        return scenarioList;
+    }
+
+    private DataModel getDataModel() throws URISyntaxException, JAXBException {
+        Path resourcePath = Paths.get(getResourceUrl().toURI());
+        return XMLConfigParser.readDataModel(resourcePath);
+    }
 
     private void assertDateValue(List<Column> dataMappingColumns) {
         for (Column dataMapping : dataMappingColumns) {
-            if ((dataMapping.getType() == DataTypeMapping.DATE) && (dataMapping.getName().equals("CREATED_DATE"))) {
+            if ((dataMapping.getType() == DataTypeMapping.DATE) && (dataMapping.getName()
+                    .equals("CREATED_DATE"))) {
                 // First rule should have min/max set
                 assertNotNull(dataMapping.getDataValues().get(0).getMinValue());
                 assertNotNull(dataMapping.getDataValues().get(0).getMaxValue());
@@ -139,7 +164,7 @@ public class ConfigurationParserTest extends ResultBaseTest{
     /*
         Used for debugging to dump out a simple xml filed based on the bound objects.
      */
-	private String writeXML() {
+    private String writeXML() {
         DataModel data = new DataModel();
         try {
             DataValue dataValue = new DataValue();
@@ -156,7 +181,6 @@ public class ConfigurationParserTest extends ResultBaseTest{
             List<Column> columnList = new ArrayList<>();
             columnList.add(column);
 
-            data.setRelease("192");
             data.setDataMappingColumns(columnList);
 
             Scenario scenario = new Scenario();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
index a202437..4ccf95c 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
@@ -33,7 +33,6 @@ import org.apache.phoenix.pherf.result.file.ResultFileDetails;
 import org.apache.phoenix.pherf.result.impl.CSVResultHandler;
 import org.apache.phoenix.pherf.result.impl.XMLResultHandler;
 import org.apache.phoenix.pherf.result.*;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.phoenix.pherf.configuration.Query;
@@ -72,7 +71,7 @@ public class ResultTest extends ResultBaseTest {
     public void testMonitorResult() throws Exception {
         ExecutorService executorService = Executors.newFixedThreadPool(1);
         MonitorManager monitor = new MonitorManager(100);
-        Future future = executorService.submit(monitor);
+        Future future = executorService.submit(monitor.execute());
         List<Result> records;
         final int TIMEOUT = 30;
 
@@ -83,7 +82,7 @@ public class ResultTest extends ResultBaseTest {
             Thread.sleep(100);
             if (ct == max) {
                 int timer = 0;
-                monitor.stop();
+                monitor.complete();
                 while (monitor.isRunning() && (timer < TIMEOUT)) {
                     System.out.println("Waiting for monitor to finish. Seconds Waited :" + timer);
                     Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
index 15d4608..92604d4 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
@@ -19,7 +19,7 @@
 package org.apache.phoenix.pherf;
 
 import org.apache.phoenix.pherf.configuration.*;
-import org.apache.phoenix.pherf.loaddata.DataLoader;
+import org.apache.phoenix.pherf.workload.WriteWorkload;
 import org.apache.phoenix.pherf.rules.DataValue;
 import org.apache.phoenix.pherf.rules.RulesApplier;
 import org.apache.phoenix.pherf.util.PhoenixUtil;
@@ -28,20 +28,19 @@ import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 import org.junit.Test;
 
-import java.sql.Types;
 import java.util.*;
 
 import static org.junit.Assert.*;
 
 public class RuleGeneratorTest {
-    static PhoenixUtil util = new PhoenixUtil(true);
-    static final String matcherScenario = PherfConstants.SCENARIO_ROOT_PATTERN + ".xml";
+    private static PhoenixUtil util = PhoenixUtil.create(true);
+    private static final String matcherScenario = PherfConstants.SCENARIO_ROOT_PATTERN + ".xml";
 
     @Test
     public void testDateGenerator() throws Exception {
         XMLConfigParser parser = new XMLConfigParser(matcherScenario);
         DataModel model = parser.getDataModels().get(0);
-        DataLoader loader = new DataLoader(parser);
+        WriteWorkload loader = new WriteWorkload(parser);
         RulesApplier rulesApplier = loader.getRulesApplier();
 
         for (Column dataMapping : model.getDataMappingColumns()) {
@@ -68,7 +67,7 @@ public class RuleGeneratorTest {
     public void testNullChance() throws Exception {
         XMLConfigParser parser = new XMLConfigParser(matcherScenario);
         DataModel model = parser.getDataModels().get(0);
-        DataLoader loader = new DataLoader(parser);
+        WriteWorkload loader = new WriteWorkload(parser);
         RulesApplier rulesApplier = loader.getRulesApplier();
         int sampleSize = 100;
         List<String> values = new ArrayList<>(sampleSize);
@@ -96,7 +95,7 @@ public class RuleGeneratorTest {
     public void testSequentialDataSequence() throws Exception {
         XMLConfigParser parser = new XMLConfigParser(matcherScenario);
         DataModel model = parser.getDataModels().get(0);
-        DataLoader loader = new DataLoader(parser);
+        WriteWorkload loader = new WriteWorkload(parser);
         RulesApplier rulesApplier = loader.getRulesApplier();
 
         Column targetColumn = null;
@@ -181,7 +180,7 @@ public class RuleGeneratorTest {
         expectedValues.add("cCCyYhnNbBs9kWr");
 
         XMLConfigParser parser = new XMLConfigParser(".*test_scenario.xml");
-        DataLoader loader = new DataLoader(parser);
+        WriteWorkload loader = new WriteWorkload(parser);
         RulesApplier rulesApplier = loader.getRulesApplier();
         Scenario scenario = parser.getScenarios().get(0);
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
index 45d36d2..fddf022 100644
--- a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
+++ b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
@@ -127,10 +127,50 @@
             <name>NEWVAL_STRING</name>
             <prefix>TSTPRFX</prefix>
         </column>
-
     </datamapping>
     <scenarios>
-        <scenario tableName="PHERF.TEST_TABLE" rowCount="10" name="testScenario">
+        <scenario tableName="PHERF.TEST_TABLE" rowCount="100" name="testScenarioRW">
+            <!-- Scenario level rule overrides will be unsupported in V1.
+                    You can use the general datamappings in the mean time-->
+            <dataOverride>
+                <column>
+                    <type>VARCHAR</type>
+                    <userDefined>true</userDefined>
+                    <dataSequence>RANDOM</dataSequence>
+                    <length>10</length>
+                    <name>FIELD</name>
+                </column>
+            </dataOverride>
+
+            <!--
+                This is used to add mixed R/W workloads.
+
+                If this tag exists, a writer pool will be created based on the below properties.
+                These props will override the default values in pherf.properties, but only for this
+                scenario.The write jobs will run in conjunction with the querySet below.
+            -->
+            <writeParams executionDurationInMs="10000">
+                <!--
+                    Number of writer it insert into the threadpool
+                -->
+                <writerThreadCount>2</writerThreadCount>
+
+                <!--
+                    Time in Ms that each thread will sleep between batch writes. This helps to
+                    throttle writers.
+                -->
+                <threadSleepDuration>10</threadSleepDuration>
+
+                <batchSize>1000</batchSize>
+            </writeParams>
+            <querySet concurrency="1" executionType="PARALLEL" executionDurationInMs="10000">
+                <query id="q3" statement="select count(*) from PHERF.TEST_TABLE"/>
+                <query id="q4" statement="select sum(DIVISION) from PHERF.TEST_TABLE"/>
+            </querySet>
+
+        </scenario>
+
+        <scenario tableName="PHERF.TEST_TABLE" rowCount="30" name="testScenario">
             <!-- Scenario level rule overrides will be unsupported in V1.
                     You can use the general datamappings in the mean time-->
             <dataOverride>
@@ -145,16 +185,20 @@
             <!--Note: 1. Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first 
                       2. DDL included in query are executed only once on start of querySet execution.
             -->
-            <querySet concurrency="1-3" executionType="SERIAL" executionDurationInMs="5000" numberOfExecutions="100">
-                <query id="q1" tenantId="123456789012345" expectedAggregateRowCount="0" statement="select count(*) from PHERF.TEST_TABLE"/>
+            <querySet concurrency="1-3" executionType="SERIAL" executionDurationInMs="5000"
+                      numberOfExecutions="100">
+                <query id="q1" tenantId="123456789012345" expectedAggregateRowCount="0"
+                       statement="select count(*) from PHERF.TEST_TABLE"/>
                 <!-- queryGroup is a way to organize queries across tables or scenario files.
                     The value will be dumped to results. This gives a value to group by on reporting to compare queries -->
-                <query id="q2" queryGroup="g1" statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
+                <query id="q2" queryGroup="g1"
+                       statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
             </querySet>
             <!--Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first -->
-            <querySet concurrency="2-3" executionType="PARALLEL" executionDurationInMs="10000" numberOfExecutions="10">
+            <querySet concurrency="2-3" executionType="PARALLEL" executionDurationInMs="10000"
+                      numberOfExecutions="10">
                 <query id="q3" statement="select count(*) from PHERF.TEST_TABLE"/>
-                <query id="q4" statement="select sum(DIVISION) from PHERF.TEST_TABLE"/>
+                <query id="q4" statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
             </querySet>
         </scenario>
     </scenarios>


[2/3] phoenix git commit: PHOENIX-1920 - Pherf - Add support for mixed r/w workloads

Posted by co...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java
index 523feb4..39d6a9c 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java
@@ -33,17 +33,13 @@ public class ResultManager {
     private final ResultUtil util;
     private final PherfConstants.RunMode runMode;
 
-
     public ResultManager(String fileNameSeed, PherfConstants.RunMode runMode) {
-        this(runMode, Arrays.asList(
-                new XMLResultHandler(fileNameSeed, ResultFileDetails.XML),
+        this(runMode, Arrays.asList(new XMLResultHandler(fileNameSeed, ResultFileDetails.XML),
                 new ImageResultHandler(fileNameSeed, ResultFileDetails.IMAGE),
-						new CSVResultHandler(
-								fileNameSeed,
-								runMode == RunMode.PERFORMANCE ? ResultFileDetails.CSV_DETAILED_PERFORMANCE
-										: ResultFileDetails.CSV_DETAILED_FUNCTIONAL),
-                new CSVResultHandler(fileNameSeed, ResultFileDetails.CSV_AGGREGATE_PERFORMANCE)
-        ));
+                new CSVResultHandler(fileNameSeed, runMode == RunMode.PERFORMANCE ?
+                        ResultFileDetails.CSV_DETAILED_PERFORMANCE :
+                        ResultFileDetails.CSV_DETAILED_FUNCTIONAL),
+                new CSVResultHandler(fileNameSeed, ResultFileDetails.CSV_AGGREGATE_PERFORMANCE)));
     }
 
     public ResultManager(PherfConstants.RunMode runMode, List<ResultHandler> resultHandlers) {
@@ -81,6 +77,7 @@ public class ResultManager {
 
     /**
      * Write a combined set of results for each result in the list.
+     *
      * @param dataModelResults List<{@link DataModelResult > </>}
      * @throws Exception
      */
@@ -89,7 +86,9 @@ public class ResultManager {
 
         CSVResultHandler detailsCSVWriter = null;
         try {
-            detailsCSVWriter = new CSVResultHandler(PherfConstants.COMBINED_FILE_NAME, ResultFileDetails.CSV_DETAILED_PERFORMANCE);
+            detailsCSVWriter =
+                    new CSVResultHandler(PherfConstants.COMBINED_FILE_NAME,
+                            ResultFileDetails.CSV_DETAILED_PERFORMANCE);
             for (DataModelResult dataModelResult : dataModelResults) {
                 util.write(detailsCSVWriter, dataModelResult, runMode);
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
index fd960d1..07dfa86 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
@@ -22,15 +22,16 @@ import org.apache.phoenix.pherf.PherfConstants;
 import org.apache.phoenix.pherf.PherfConstants.RunMode;
 import org.apache.phoenix.pherf.result.file.ResultFileDetails;
 import org.apache.phoenix.pherf.result.impl.CSVResultHandler;
-import org.apache.phoenix.pherf.result.impl.ImageResultHandler;
-import org.apache.phoenix.pherf.result.impl.XMLResultHandler;
 import org.apache.phoenix.pherf.util.PhoenixUtil;
 
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
 import java.text.Format;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
+import java.util.Map;
 
 public class ResultUtil {
 
@@ -54,7 +55,10 @@ public class ResultUtil {
                     List<ResultValue> rowValues = new ArrayList<>();
                     rowValues.add(new ResultValue(PhoenixUtil.getZookeeper()));
                     rowValues.addAll(writeThreadTime.getCsvRepresentation(this));
-                    Result result = new Result(ResultFileDetails.CSV_DETAILED_PERFORMANCE, "ZK," + dataLoadThreadTime.getCsvTitle(), rowValues);
+                    Result
+                            result =
+                            new Result(ResultFileDetails.CSV_DETAILED_PERFORMANCE,
+                                    "ZK," + dataLoadThreadTime.getCsvTitle(), rowValues);
                     writer.write(result);
                 }
             }
@@ -83,7 +87,10 @@ public class ResultUtil {
                 List<ResultValue> rowValues = new ArrayList<>();
                 rowValues.add(new ResultValue(PhoenixUtil.getZookeeper()));
                 rowValues.addAll(loadTime.getCsvRepresentation(this));
-                Result result = new Result(resultFileDetails, resultFileDetails.getHeader().toString(), rowValues);
+                Result
+                        result =
+                        new Result(resultFileDetails, resultFileDetails.getHeader().toString(),
+                                rowValues);
                 writer.write(result);
             }
         } finally {
@@ -94,23 +101,29 @@ public class ResultUtil {
         }
     }
 
-    public synchronized void write(ResultHandler resultHandler, DataModelResult dataModelResult, RunMode runMode) throws Exception {
+    public synchronized void write(ResultHandler resultHandler, DataModelResult dataModelResult,
+            RunMode runMode) throws Exception {
         ResultFileDetails resultFileDetails = resultHandler.getResultFileDetails();
         switch (resultFileDetails) {
-            case CSV_AGGREGATE_PERFORMANCE:
-            case CSV_DETAILED_PERFORMANCE:
-            case CSV_DETAILED_FUNCTIONAL:
-                List<List<ResultValue>> rowDetails = getCSVResults(dataModelResult, resultFileDetails, runMode);
-                for (List<ResultValue> row : rowDetails) {
-                    Result result = new Result(resultFileDetails, resultFileDetails.getHeader().toString(), row);
-                    resultHandler.write(result);
-                }
-                break;
-            default:
-                List<ResultValue> resultValue = new ArrayList();
-                resultValue.add(new ResultValue<>(dataModelResult));
-                resultHandler.write(new Result(resultFileDetails, null, resultValue));
-                break;
+        case CSV_AGGREGATE_PERFORMANCE:
+        case CSV_DETAILED_PERFORMANCE:
+        case CSV_DETAILED_FUNCTIONAL:
+            List<List<ResultValue>>
+                    rowDetails =
+                    getCSVResults(dataModelResult, resultFileDetails, runMode);
+            for (List<ResultValue> row : rowDetails) {
+                Result
+                        result =
+                        new Result(resultFileDetails, resultFileDetails.getHeader().toString(),
+                                row);
+                resultHandler.write(result);
+            }
+            break;
+        default:
+            List<ResultValue> resultValue = new ArrayList();
+            resultValue.add(new ResultValue<>(dataModelResult));
+            resultHandler.write(new Result(resultFileDetails, null, resultValue));
+            break;
         }
     }
 
@@ -146,40 +159,47 @@ public class ResultUtil {
         return str;
     }
 
-    private List<List<ResultValue>> getCSVResults(DataModelResult dataModelResult, ResultFileDetails resultFileDetails, RunMode runMode) {
+    private List<List<ResultValue>> getCSVResults(DataModelResult dataModelResult,
+            ResultFileDetails resultFileDetails, RunMode runMode) {
         List<List<ResultValue>> rowList = new ArrayList<>();
 
         for (ScenarioResult result : dataModelResult.getScenarioResult()) {
             for (QuerySetResult querySetResult : result.getQuerySetResult()) {
                 for (QueryResult queryResult : querySetResult.getQueryResults()) {
                     switch (resultFileDetails) {
-                        case CSV_AGGREGATE_PERFORMANCE:
-                            List<ResultValue> csvResult = queryResult.getCsvRepresentation(this);
-                            rowList.add(csvResult);
-                            break;
-                        case CSV_DETAILED_PERFORMANCE:
-                        case CSV_DETAILED_FUNCTIONAL:
-                            List<List<ResultValue>> detailedRows = queryResult.getCsvDetailedRepresentation(this, runMode);
-                            for (List<ResultValue> detailedRowList : detailedRows) {
-                                List<ResultValue> valueList = new ArrayList<>();
-                                valueList.add(new ResultValue(convertNull(result.getTableName())));
-                                valueList.add(new ResultValue(convertNull(result.getName())));
-                                valueList.add(new ResultValue(convertNull(dataModelResult.getZookeeper())));
-                                valueList.add(new ResultValue(convertNull(String.valueOf(result.getRowCount()))));
-                                valueList.add(new ResultValue(convertNull(String.valueOf(querySetResult.getNumberOfExecutions()))));
-                                valueList.add(new ResultValue(convertNull(String.valueOf(querySetResult.getExecutionType()))));
-                                if (result.getPhoenixProperties() != null) {
-                                    String props = buildProperty(result);
-                                    valueList.add(new ResultValue(convertNull(props)));
-                                } else {
-                                    valueList.add(new ResultValue("null"));
-                                }
-                                valueList.addAll(detailedRowList);
-                                rowList.add(valueList);
+                    case CSV_AGGREGATE_PERFORMANCE:
+                        List<ResultValue> csvResult = queryResult.getCsvRepresentation(this);
+                        rowList.add(csvResult);
+                        break;
+                    case CSV_DETAILED_PERFORMANCE:
+                    case CSV_DETAILED_FUNCTIONAL:
+                        List<List<ResultValue>>
+                                detailedRows =
+                                queryResult.getCsvDetailedRepresentation(this, runMode);
+                        for (List<ResultValue> detailedRowList : detailedRows) {
+                            List<ResultValue> valueList = new ArrayList<>();
+                            valueList.add(new ResultValue(convertNull(result.getTableName())));
+                            valueList.add(new ResultValue(convertNull(result.getName())));
+                            valueList.add(new ResultValue(
+                                    convertNull(dataModelResult.getZookeeper())));
+                            valueList.add(new ResultValue(
+                                    convertNull(String.valueOf(result.getRowCount()))));
+                            valueList.add(new ResultValue(convertNull(
+                                    String.valueOf(querySetResult.getNumberOfExecutions()))));
+                            valueList.add(new ResultValue(convertNull(
+                                    String.valueOf(querySetResult.getExecutionType()))));
+                            if (result.getPhoenixProperties() != null) {
+                                String props = buildProperty(result);
+                                valueList.add(new ResultValue(convertNull(props)));
+                            } else {
+                                valueList.add(new ResultValue("null"));
                             }
-                            break;
-                        default:
-                            break;
+                            valueList.addAll(detailedRowList);
+                            rowList.add(valueList);
+                        }
+                        break;
+                    default:
+                        break;
                     }
                 }
             }
@@ -192,8 +212,7 @@ public class ResultUtil {
         boolean firstPartialSeparator = true;
 
         for (Map.Entry<String, String> entry : result.getPhoenixProperties().entrySet()) {
-            if (!firstPartialSeparator)
-                sb.append("|");
+            if (!firstPartialSeparator) sb.append("|");
             firstPartialSeparator = false;
             sb.append(entry.getKey() + "=" + entry.getValue());
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultValue.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultValue.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultValue.java
index 38abd65..78364d9 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultValue.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultValue.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.pherf.result;
 
 /**
  * Generic box container for a result value. This class allows for writing results of any type easily
+ *
  * @param <T>
  */
 public class ResultValue<T> {
@@ -33,8 +34,7 @@ public class ResultValue<T> {
         return resultValue;
     }
 
-    @Override
-    public String toString() {
+    @Override public String toString() {
         return resultValue.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
index 690f7e6..3aa45fa 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/RunTime.java
@@ -18,104 +18,91 @@
 
 package org.apache.phoenix.pherf.result;
 
+import javax.xml.bind.annotation.XmlAttribute;
 import java.util.Comparator;
 import java.util.Date;
 
-import javax.xml.bind.annotation.XmlAttribute;
-
 public class RunTime implements Comparator<RunTime>, Comparable<RunTime> {
-	private Date startTime;
-	private Integer elapsedDurationInMs;
-	private String message;
-	private Long resultRowCount;
-	private String explainPlan;
-
-    @SuppressWarnings("unused")
-    public RunTime() {
-    }
-
-    @SuppressWarnings("unused")
-    public RunTime(Integer elapsedDurationInMs) {
-		this(null, elapsedDurationInMs);
-	}
-	
-	public RunTime(Long resultRowCount, Integer elapsedDurationInMs) {
-		this(null, resultRowCount, elapsedDurationInMs);
-	}
-	
-	public RunTime(Date startTime, Long resultRowCount, Integer elapsedDurationInMs) {
-		this(null, null, startTime, resultRowCount, elapsedDurationInMs);
-	}
-	
-	public RunTime(String message, Date startTime, Long resultRowCount, Integer elapsedDurationInMs) {
-		this(message, null, startTime, resultRowCount, elapsedDurationInMs);
-	}
-	
-	public RunTime(String message, String explainPlan, Date startTime, Long resultRowCount, Integer elapsedDurationInMs) {
-		this.elapsedDurationInMs = elapsedDurationInMs;
-		this.startTime = startTime;
-		this.resultRowCount = resultRowCount;
-		this.message = message;
-		this.explainPlan = explainPlan;
-	}
-
-	@XmlAttribute()
-	public Date getStartTime() {
-		return startTime;
-	}
-
-    @SuppressWarnings("unused")
-	public void setStartTime(Date startTime) {
-		this.startTime = startTime;
-	}
-	
-	@XmlAttribute()
-	public Integer getElapsedDurationInMs() {
-		return elapsedDurationInMs;
-	}
-
-    @SuppressWarnings("unused")
-    public void setElapsedDurationInMs(Integer elapsedDurationInMs) {
-		this.elapsedDurationInMs = elapsedDurationInMs;
-	}
-
-	@Override
-	public int compare(RunTime r1, RunTime r2) {
-		return r1.getElapsedDurationInMs().compareTo(r2.getElapsedDurationInMs());
-	}
-
-	@Override
-	public int compareTo(RunTime o) {
-		return compare(this, o);
-	}
-
-	@XmlAttribute()
-	public String getMessage() {
-		return message;
-	}
-
-    @SuppressWarnings("unused")
-    public void setMessage(String message) {
-		this.message = message;
-	}
-	
-	@XmlAttribute()
-	public String getExplainPlan() {
-		return explainPlan;
-	}
-
-    @SuppressWarnings("unused")
-    public void setExplainPlan(String explainPlan) {
-		this.explainPlan = explainPlan;
-	}
-
-	@XmlAttribute()
-	public Long getResultRowCount() {
-		return resultRowCount;
-	}
-
-    @SuppressWarnings("unused")
-	public void setResultRowCount(Long resultRowCount) {
-		this.resultRowCount = resultRowCount;
-	}
+    private Date startTime;
+    private Integer elapsedDurationInMs;
+    private String message;
+    private Long resultRowCount;
+    private String explainPlan;
+
+    @SuppressWarnings("unused") public RunTime() {
+    }
+
+    @SuppressWarnings("unused") public RunTime(Integer elapsedDurationInMs) {
+        this(null, elapsedDurationInMs);
+    }
+
+    public RunTime(Long resultRowCount, Integer elapsedDurationInMs) {
+        this(null, resultRowCount, elapsedDurationInMs);
+    }
+
+    public RunTime(Date startTime, Long resultRowCount, Integer elapsedDurationInMs) {
+        this(null, null, startTime, resultRowCount, elapsedDurationInMs);
+    }
+
+    public RunTime(String message, Date startTime, Long resultRowCount,
+            Integer elapsedDurationInMs) {
+        this(message, null, startTime, resultRowCount, elapsedDurationInMs);
+    }
+
+    public RunTime(String message, String explainPlan, Date startTime, Long resultRowCount,
+            Integer elapsedDurationInMs) {
+        this.elapsedDurationInMs = elapsedDurationInMs;
+        this.startTime = startTime;
+        this.resultRowCount = resultRowCount;
+        this.message = message;
+        this.explainPlan = explainPlan;
+    }
+
+    @XmlAttribute() public Date getStartTime() {
+        return startTime;
+    }
+
+    @SuppressWarnings("unused") public void setStartTime(Date startTime) {
+        this.startTime = startTime;
+    }
+
+    @XmlAttribute() public Integer getElapsedDurationInMs() {
+        return elapsedDurationInMs;
+    }
+
+    @SuppressWarnings("unused") public void setElapsedDurationInMs(Integer elapsedDurationInMs) {
+        this.elapsedDurationInMs = elapsedDurationInMs;
+    }
+
+    @Override public int compare(RunTime r1, RunTime r2) {
+        return r1.getElapsedDurationInMs().compareTo(r2.getElapsedDurationInMs());
+    }
+
+    @Override public int compareTo(RunTime o) {
+        return compare(this, o);
+    }
+
+    @XmlAttribute() public String getMessage() {
+        return message;
+    }
+
+    @SuppressWarnings("unused") public void setMessage(String message) {
+        this.message = message;
+    }
+
+    @XmlAttribute() public String getExplainPlan() {
+        return explainPlan;
+    }
+
+    @SuppressWarnings("unused") public void setExplainPlan(String explainPlan) {
+        this.explainPlan = explainPlan;
+    }
+
+    @XmlAttribute() public Long getResultRowCount() {
+        return resultRowCount;
+    }
+
+    @SuppressWarnings("unused") public void setResultRowCount(Long resultRowCount) {
+        this.resultRowCount = resultRowCount;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ScenarioResult.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ScenarioResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ScenarioResult.java
index b57e424..9cac1c7 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ScenarioResult.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ScenarioResult.java
@@ -18,31 +18,31 @@
 
 package org.apache.phoenix.pherf.result;
 
+import org.apache.phoenix.pherf.configuration.Scenario;
+
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.phoenix.pherf.configuration.Scenario;
 
 public class ScenarioResult extends Scenario {
 
-	private List<QuerySetResult> querySetResult = new ArrayList<QuerySetResult>();
-	
-	public List<QuerySetResult> getQuerySetResult() {
-		return querySetResult;
-	}
-
-    @SuppressWarnings("unused")
-	public void setQuerySetResult(List<QuerySetResult> querySetResult) {
-		this.querySetResult = querySetResult;
-	}
-	
-	public ScenarioResult() {
-	}
-	
-	public ScenarioResult(Scenario scenario) {
-		this.setDataOverride(scenario.getDataOverride());
-		this.setPhoenixProperties(scenario.getPhoenixProperties());
-		this.setRowCount(scenario.getRowCount());
-		this.setTableName(scenario.getTableName());
-		this.setName(scenario.getName());
-	}
+    private List<QuerySetResult> querySetResult = new ArrayList<>();
+
+    public List<QuerySetResult> getQuerySetResult() {
+        return querySetResult;
+    }
+
+    @SuppressWarnings("unused") public void setQuerySetResult(List<QuerySetResult> querySetResult) {
+        this.querySetResult = querySetResult;
+    }
+
+    public ScenarioResult() {
+    }
+
+    public ScenarioResult(Scenario scenario) {
+        this.setDataOverride(scenario.getDataOverride());
+        this.setPhoenixProperties(scenario.getPhoenixProperties());
+        this.setRowCount(scenario.getRowCount());
+        this.setTableName(scenario.getTableName());
+        this.setName(scenario.getName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
index f043bec..03b5664 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ThreadTime.java
@@ -18,13 +18,12 @@
 
 package org.apache.phoenix.pherf.result;
 
+import javax.xml.bind.annotation.XmlAttribute;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 
-import javax.xml.bind.annotation.XmlAttribute;
-
 public class ThreadTime {
     private List<RunTime> runTimesInMs = Collections.synchronizedList(new ArrayList<RunTime>());
     private String threadName;
@@ -84,23 +83,22 @@ public class ThreadTime {
         return Collections.max(getRunTimesInMs());
     }
 
-    @XmlAttribute()
-    public String getThreadName() {
+    @XmlAttribute() public String getThreadName() {
         return threadName;
     }
 
     public void setThreadName(String threadName) {
         this.threadName = threadName;
     }
-    
+
     private String parseThreadName(boolean getConcurrency) {
-    	if (getThreadName() == null || !getThreadName().contains(",")) return null;
-    	String[] threadNameSet = getThreadName().split(",");
-    	if (getConcurrency) {
-    		return threadNameSet[1];}
-    	else {
-    		return threadNameSet[0];
-    	}
+        if (getThreadName() == null || !getThreadName().contains(",")) return null;
+        String[] threadNameSet = getThreadName().split(",");
+        if (getConcurrency) {
+            return threadNameSet[1];
+        } else {
+            return threadNameSet[0];
+        }
     }
 
     public List<List<ResultValue>> getCsvPerformanceRepresentation(ResultUtil util) {
@@ -110,11 +108,14 @@ public class ThreadTime {
             List<ResultValue> rowValues = new ArrayList(getRunTimesInMs().size());
             rowValues.add(new ResultValue(util.convertNull(parseThreadName(false))));
             rowValues.add(new ResultValue(util.convertNull(parseThreadName(true))));
-            rowValues.add(new ResultValue(String.valueOf(getRunTimesInMs().get(i).getResultRowCount())));
+            rowValues.add(new ResultValue(
+                    String.valueOf(getRunTimesInMs().get(i).getResultRowCount())));
             if (getRunTimesInMs().get(i).getMessage() == null) {
-                rowValues.add(new ResultValue(util.convertNull(String.valueOf(getRunTimesInMs().get(i).getElapsedDurationInMs()))));
+                rowValues.add(new ResultValue(util.convertNull(
+                        String.valueOf(getRunTimesInMs().get(i).getElapsedDurationInMs()))));
             } else {
-                rowValues.add(new ResultValue(util.convertNull(getRunTimesInMs().get(i).getMessage())));
+                rowValues.add(new ResultValue(
+                        util.convertNull(getRunTimesInMs().get(i).getMessage())));
             }
             rows.add(rowValues);
         }
@@ -129,7 +130,8 @@ public class ThreadTime {
             rowValues.add(new ResultValue(util.convertNull(parseThreadName(false))));
             rowValues.add(new ResultValue(util.convertNull(parseThreadName(true))));
             rowValues.add(new ResultValue(util.convertNull(getRunTimesInMs().get(i).getMessage())));
-            rowValues.add(new ResultValue(util.convertNull(getRunTimesInMs().get(i).getExplainPlan())));
+            rowValues.add(new ResultValue(
+                    util.convertNull(getRunTimesInMs().get(i).getExplainPlan())));
             rows.add(rowValues);
         }
         return rows;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Extension.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Extension.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Extension.java
index 0df383c..e6a7308 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Extension.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Extension.java
@@ -31,8 +31,7 @@ public enum Extension {
         this.extension = extension;
     }
 
-    @Override
-    public String toString() {
+    @Override public String toString() {
         return extension;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
index 98e7b30..15e2b9a 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/file/Header.java
@@ -20,9 +20,11 @@ package org.apache.phoenix.pherf.result.file;
 
 public enum Header {
     EMPTY(""),
-    AGGREGATE_PERFORMANCE("START_TIME,QUERY_GROUP,QUERY,TENANT_ID,AVG_MAX_TIME_MS,AVG_TIME_MS,AVG_MIN_TIME_MS,RUN_COUNT"),
-    DETAILED_BASE("BASE_TABLE_NAME,SCENARIO_NAME,ZOOKEEPER,ROW_COUNT,EXECUTION_COUNT,EXECUTION_TYPE,PHOENIX_PROPERTIES"
-            + ",START_TIME,QUERY_GROUP,QUERY,TENANT_ID,THREAD_NUMBER,CONCURRENCY_LEVEL"),
+    AGGREGATE_PERFORMANCE(
+            "START_TIME,QUERY_GROUP,QUERY,TENANT_ID,AVG_MAX_TIME_MS,AVG_TIME_MS,AVG_MIN_TIME_MS,RUN_COUNT"),
+    DETAILED_BASE(
+            "BASE_TABLE_NAME,SCENARIO_NAME,ZOOKEEPER,ROW_COUNT,EXECUTION_COUNT,EXECUTION_TYPE,PHOENIX_PROPERTIES"
+                    + ",START_TIME,QUERY_GROUP,QUERY,TENANT_ID,THREAD_NUMBER,CONCURRENCY_LEVEL"),
     DETAILED_PERFORMANCE(DETAILED_BASE + ",RESULT_ROW_COUNT,RUN_TIME_MS"),
     DETAILED_FUNCTIONAL(DETAILED_BASE + ",DIFF_STATUS,EXPLAIN_PLAN"),
     AGGREGATE_DATA_LOAD("ZK,TABLE_NAME,ROW_COUNT,LOAD_DURATION_IN_MS"),
@@ -34,8 +36,7 @@ public enum Header {
         this.header = header;
     }
 
-    @Override
-    public String toString() {
+    @Override public String toString() {
         return header;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/CSVResultHandler.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/CSVResultHandler.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/CSVResultHandler.java
index e7fbb48..e69f600 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/CSVResultHandler.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/CSVResultHandler.java
@@ -18,13 +18,6 @@
 
 package org.apache.phoenix.pherf.result.impl;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVPrinter;
@@ -36,6 +29,13 @@ import org.apache.phoenix.pherf.result.ResultUtil;
 import org.apache.phoenix.pherf.result.ResultValue;
 import org.apache.phoenix.pherf.result.file.ResultFileDetails;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * TODO Doc this class. Note that each instance that has a non unique file name will overwrite the last
  */
@@ -51,22 +51,22 @@ public class CSVResultHandler implements ResultHandler {
         this(resultFileName, resultFileDetails, true);
     }
 
-    public CSVResultHandler(String resultFileName, ResultFileDetails resultFileDetails, boolean generateFullFileName) {
+    public CSVResultHandler(String resultFileName, ResultFileDetails resultFileDetails,
+            boolean generateFullFileName) {
         this.util = new ResultUtil();
         PherfConstants constants = PherfConstants.create();
         String resultDir = constants.getProperty("pherf.default.results.dir");
 
-        this.resultFileName = generateFullFileName ?
-                resultDir + PherfConstants.PATH_SEPARATOR
-                        + PherfConstants.RESULT_PREFIX
-                        + resultFileName + util.getSuffix()
-                        + resultFileDetails.getExtension().toString()
-            : resultFileName;
+        this.resultFileName =
+                generateFullFileName ?
+                        resultDir + PherfConstants.PATH_SEPARATOR + PherfConstants.RESULT_PREFIX
+                                + resultFileName + util.getSuffix() + resultFileDetails
+                                .getExtension().toString() :
+                        resultFileName;
         this.resultFileDetails = resultFileDetails;
     }
 
-    @Override
-    public synchronized void write(Result result) throws IOException {
+    @Override public synchronized void write(Result result) throws IOException {
         util.ensureBaseResultDirExists();
 
         open(result);
@@ -74,15 +74,13 @@ public class CSVResultHandler implements ResultHandler {
         flush();
     }
 
-    @Override
-    public synchronized void flush() throws IOException {
+    @Override public synchronized void flush() throws IOException {
         if (csvPrinter != null) {
             csvPrinter.flush();
         }
     }
 
-    @Override
-    public synchronized void close() throws IOException {
+    @Override public synchronized void close() throws IOException {
         if (csvPrinter != null) {
             csvPrinter.flush();
             csvPrinter.close();
@@ -90,8 +88,7 @@ public class CSVResultHandler implements ResultHandler {
         }
     }
 
-    @Override
-    public synchronized List<Result> read() throws IOException {
+    @Override public synchronized List<Result> read() throws IOException {
         CSVParser parser = null;
         util.ensureBaseResultDirExists();
         try {
@@ -131,13 +128,11 @@ public class CSVResultHandler implements ResultHandler {
         isClosed = false;
     }
 
-    @Override
-    public synchronized boolean isClosed() {
+    @Override public synchronized boolean isClosed() {
         return isClosed;
     }
 
-    @Override
-    public ResultFileDetails getResultFileDetails() {
+    @Override public ResultFileDetails getResultFileDetails() {
         return resultFileDetails;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/ImageResultHandler.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/ImageResultHandler.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/ImageResultHandler.java
index ad3c8fb..5c3eac1 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/ImageResultHandler.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/ImageResultHandler.java
@@ -19,8 +19,8 @@
 package org.apache.phoenix.pherf.result.impl;
 
 import org.apache.phoenix.pherf.PherfConstants;
-import org.apache.phoenix.pherf.result.file.ResultFileDetails;
 import org.apache.phoenix.pherf.result.*;
+import org.apache.phoenix.pherf.result.file.ResultFileDetails;
 import org.jfree.chart.ChartFactory;
 import org.jfree.chart.ChartUtilities;
 import org.jfree.chart.JFreeChart;
@@ -42,22 +42,22 @@ public class ImageResultHandler implements ResultHandler {
         this(resultFileName, resultFileDetails, true);
     }
 
-    public ImageResultHandler(String resultFileName, ResultFileDetails resultFileDetails, boolean generateFullFileName) {
+    public ImageResultHandler(String resultFileName, ResultFileDetails resultFileDetails,
+            boolean generateFullFileName) {
         ResultUtil util = new ResultUtil();
         PherfConstants constants = PherfConstants.create();
         String resultDir = constants.getProperty("pherf.default.results.dir");
 
-        this.resultFileName = generateFullFileName ?
-                resultDir + PherfConstants.PATH_SEPARATOR
-                        + PherfConstants.RESULT_PREFIX
-                        + resultFileName + util.getSuffix()
-                        + resultFileDetails.getExtension().toString()
-                : resultFileName;
+        this.resultFileName =
+                generateFullFileName ?
+                        resultDir + PherfConstants.PATH_SEPARATOR + PherfConstants.RESULT_PREFIX
+                                + resultFileName + util.getSuffix() + resultFileDetails
+                                .getExtension().toString() :
+                        resultFileName;
         this.resultFileDetails = resultFileDetails;
     }
 
-    @Override
-    public synchronized void write(Result result) throws Exception {
+    @Override public synchronized void write(Result result) throws Exception {
         TimeSeriesCollection timeSeriesCollection = new TimeSeriesCollection();
         int rowCount = 0;
         int maxLegendCount = 20;
@@ -70,12 +70,16 @@ public class ImageResultHandler implements ResultHandler {
             for (QuerySetResult querySetResult : scenarioResult.getQuerySetResult()) {
                 for (QueryResult queryResult : querySetResult.getQueryResults()) {
                     for (ThreadTime tt : queryResult.getThreadTimes()) {
-                        TimeSeries timeSeries = new TimeSeries(queryResult.getStatement() + " :: " + tt.getThreadName());
+                        TimeSeries
+                                timeSeries =
+                                new TimeSeries(
+                                        queryResult.getStatement() + " :: " + tt.getThreadName());
                         rowCount++;
                         synchronized (tt.getRunTimesInMs()) {
                             for (RunTime rt : tt.getRunTimesInMs()) {
                                 if (rt.getStartTime() != null) {
-                                    timeSeries.add(new Millisecond(rt.getStartTime()), rt.getElapsedDurationInMs());
+                                    timeSeries.add(new Millisecond(rt.getStartTime()),
+                                            rt.getElapsedDurationInMs());
                                 }
                             }
                         }
@@ -85,10 +89,14 @@ public class ImageResultHandler implements ResultHandler {
             }
         }
         boolean legend = rowCount > maxLegendCount ? false : true;
-        JFreeChart chart = ChartFactory.createTimeSeriesChart(dataModelResult.getName()
-                , "Time", "Query Time (ms)", timeSeriesCollection,
-                legend, true, false);
-        StandardXYItemRenderer renderer = new StandardXYItemRenderer(StandardXYItemRenderer.SHAPES_AND_LINES);
+        JFreeChart
+                chart =
+                ChartFactory
+                        .createTimeSeriesChart(dataModelResult.getName(), "Time", "Query Time (ms)",
+                                timeSeriesCollection, legend, true, false);
+        StandardXYItemRenderer
+                renderer =
+                new StandardXYItemRenderer(StandardXYItemRenderer.SHAPES_AND_LINES);
         chart.getXYPlot().setRenderer(renderer);
         chart.getXYPlot().setBackgroundPaint(Color.WHITE);
         chart.getXYPlot().setRangeGridlinePaint(Color.BLACK);
@@ -96,35 +104,31 @@ public class ImageResultHandler implements ResultHandler {
             chart.getXYPlot().getRenderer().setSeriesStroke(i, new BasicStroke(3f));
         }
         try {
-            ChartUtilities.saveChartAsJPEG(new File(resultFileName), chart, chartDimension, chartDimension);
+            ChartUtilities.saveChartAsJPEG(new File(resultFileName), chart, chartDimension,
+                    chartDimension);
         } catch (IOException e) {
             e.printStackTrace();
         }
 
     }
 
-    @Override
-    public synchronized void flush() throws Exception {
+    @Override public synchronized void flush() throws Exception {
 
     }
 
-    @Override
-    public synchronized void close() throws Exception {
+    @Override public synchronized void close() throws Exception {
 
     }
 
-    @Override
-    public List<Result> read() throws Exception {
+    @Override public List<Result> read() throws Exception {
         return null;
     }
 
-    @Override
-    public boolean isClosed() {
+    @Override public boolean isClosed() {
         return false;
     }
 
-    @Override
-    public ResultFileDetails getResultFileDetails() {
+    @Override public ResultFileDetails getResultFileDetails() {
         return resultFileDetails;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/XMLResultHandler.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/XMLResultHandler.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/XMLResultHandler.java
index 8a913ed..009ae21 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/XMLResultHandler.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/impl/XMLResultHandler.java
@@ -19,8 +19,8 @@
 package org.apache.phoenix.pherf.result.impl;
 
 import org.apache.phoenix.pherf.PherfConstants;
-import org.apache.phoenix.pherf.result.file.ResultFileDetails;
 import org.apache.phoenix.pherf.result.*;
+import org.apache.phoenix.pherf.result.file.ResultFileDetails;
 
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.Marshaller;
@@ -30,7 +30,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Properties;
 
 public class XMLResultHandler implements ResultHandler {
     private final String resultFileName;
@@ -40,22 +39,22 @@ public class XMLResultHandler implements ResultHandler {
         this(resultFileName, resultFileDetails, true);
     }
 
-    public XMLResultHandler(String resultFileName, ResultFileDetails resultFileDetails, boolean generateFullFileName) {
+    public XMLResultHandler(String resultFileName, ResultFileDetails resultFileDetails,
+            boolean generateFullFileName) {
         ResultUtil util = new ResultUtil();
         PherfConstants constants = PherfConstants.create();
         String resultDir = constants.getProperty("pherf.default.results.dir");
 
-        this.resultFileName = generateFullFileName ?
-                resultDir + PherfConstants.PATH_SEPARATOR
-                        + PherfConstants.RESULT_PREFIX
-                        + resultFileName + util.getSuffix()
-                        + resultFileDetails.getExtension().toString()
-                : resultFileName;
+        this.resultFileName =
+                generateFullFileName ?
+                        resultDir + PherfConstants.PATH_SEPARATOR + PherfConstants.RESULT_PREFIX
+                                + resultFileName + util.getSuffix() + resultFileDetails
+                                .getExtension().toString() :
+                        resultFileName;
         this.resultFileDetails = resultFileDetails;
     }
 
-    @Override
-    public synchronized void write(Result result) throws Exception {
+    @Override public synchronized void write(Result result) throws Exception {
         FileOutputStream os = null;
         JAXBContext jaxbContext = JAXBContext.newInstance(DataModelResult.class);
         Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
@@ -72,18 +71,15 @@ public class XMLResultHandler implements ResultHandler {
         }
     }
 
-    @Override
-    public synchronized void flush() throws IOException {
+    @Override public synchronized void flush() throws IOException {
         return;
     }
 
-    @Override
-    public synchronized void close() throws IOException {
+    @Override public synchronized void close() throws IOException {
         return;
     }
 
-    @Override
-    public synchronized List<Result> read() throws Exception {
+    @Override public synchronized List<Result> read() throws Exception {
 
         JAXBContext jaxbContext = JAXBContext.newInstance(DataModelResult.class);
         Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
@@ -95,13 +91,11 @@ public class XMLResultHandler implements ResultHandler {
         return results;
     }
 
-    @Override
-    public boolean isClosed() {
+    @Override public boolean isClosed() {
         return true;
     }
 
-    @Override
-    public ResultFileDetails getResultFileDetails() {
+    @Override public ResultFileDetails getResultFileDetails() {
         return resultFileDetails;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java
index 4761211..439f87e 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java
@@ -45,7 +45,7 @@ public class SchemaReader {
      * @throws Exception
      */
     public SchemaReader(final String searchPattern) throws Exception {
-        this(new PhoenixUtil(), searchPattern);
+        this(PhoenixUtil.create(), searchPattern);
     }
 
     public SchemaReader(PhoenixUtil util, final String searchPattern) throws Exception {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
index 83e324d..0156149 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
@@ -30,6 +30,8 @@ import java.util.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.phoenix.pherf.configuration.Query;
+import org.apache.phoenix.pherf.configuration.QuerySet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,15 +41,25 @@ public class PhoenixUtil {
 	private static String zookeeper;
 	private static int rowCountOverride = 0;
     private boolean testEnabled;
+    private static PhoenixUtil instance;
 
-    public PhoenixUtil() {
+    private PhoenixUtil() {
         this(false);
     }
 
-    public PhoenixUtil(final boolean testEnabled) {
+    private PhoenixUtil(final boolean testEnabled) {
         this.testEnabled = testEnabled;
     }
 
+    public static PhoenixUtil create() {
+        return create(false);
+    }
+
+    public static PhoenixUtil create(final boolean testEnabled) {
+        instance = instance != null ? instance : new PhoenixUtil(testEnabled);
+        return instance;
+    }
+
     public Connection getConnection() throws Exception{
     	return getConnection(null);
     }
@@ -56,7 +68,7 @@ public class PhoenixUtil {
         return getConnection(tenantId, testEnabled);
     }
 
-    public Connection getConnection(String tenantId, boolean testEnabled) throws Exception {
+    private Connection getConnection(String tenantId, boolean testEnabled) throws Exception {
         if (null == zookeeper) {
             throw new IllegalArgumentException(
                     "Zookeeper must be set before initializing connection!");
@@ -115,17 +127,6 @@ public class PhoenixUtil {
         return result;
     }
 
-    @SuppressWarnings("unused")
-    public ResultSet executeQuery(PreparedStatement preparedStatement, Connection connection) {
-        ResultSet resultSet = null;
-        try {
-            resultSet = preparedStatement.executeQuery();
-        } catch (SQLException e) {
-            e.printStackTrace();
-        }
-        return resultSet;
-    }
-    
     /**
      * Delete existing tables with schema name set as {@link PherfConstants#PHERF_SCHEMA_NAME} with regex comparison 
      * 
@@ -133,14 +134,14 @@ public class PhoenixUtil {
      * @throws SQLException
      * @throws Exception
      */
-    public void deleteTables(String regexMatch) throws SQLException, Exception {
+    public void deleteTables(String regexMatch) throws Exception {
     	regexMatch = regexMatch.toUpperCase().replace("ALL", ".*");
     	Connection conn = getConnection();
     	try {
         	ResultSet resultSet = getTableMetaData(PherfConstants.PHERF_SCHEMA_NAME, null, conn);
 	    	while (resultSet.next()) {
-	    		String tableName = resultSet.getString("TABLE_SCHEM") == null ? resultSet.getString("TABLE_NAME") : 
-	    						   resultSet.getString("TABLE_SCHEM") + "." + resultSet.getString("TABLE_NAME");
+	    		String tableName = resultSet.getString("TABLE_SCHEMA") == null ? resultSet.getString("TABLE_NAME") :
+	    						   resultSet.getString("TABLE_SCHEMA") + "." + resultSet.getString("TABLE_NAME");
 	    		if (tableName.matches(regexMatch)) {
 		    		logger.info("\nDropping " + tableName);
 		    		executeStatement("DROP TABLE " + tableName + " CASCADE", conn);
@@ -183,8 +184,33 @@ public class PhoenixUtil {
     	
     	return Collections.unmodifiableList(columnList);
     }
-    
-	public static String getZookeeper() {
+
+    /**
+     * Execute all querySet DDLs first based on tenantId if specified. This is executed
+     * first since we don't want to run DDLs in parallel to executing queries.
+     *
+     * @param querySet
+     * @throws Exception
+     */
+    public void executeQuerySetDdls(QuerySet querySet) throws Exception {
+        for (Query query : querySet.getQuery()) {
+            if (null != query.getDdl()) {
+                Connection conn = null;
+                try {
+                    logger.info("\nExecuting DDL:" + query.getDdl() + " on tenantId:" + query
+                            .getTenantId());
+                    executeStatement(query.getDdl(),
+                            conn = getConnection(query.getTenantId()));
+                } finally {
+                    if (null != conn) {
+                        conn.close();
+                    }
+                }
+            }
+        }
+    }
+
+    public static String getZookeeper() {
 		return zookeeper;
 	}
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
new file mode 100644
index 0000000..efb3da9
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Calendar;
+import java.util.Date;
+
+import org.apache.phoenix.pherf.PherfConstants.RunMode;
+
+import org.apache.phoenix.pherf.result.DataModelResult;
+import org.apache.phoenix.pherf.result.ResultManager;
+import org.apache.phoenix.pherf.result.RunTime;
+import org.apache.phoenix.pherf.result.ThreadTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.phoenix.pherf.configuration.Query;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+
+class MultiThreadedRunner implements Runnable {
+    private static final Logger logger = LoggerFactory.getLogger(MultiThreadedRunner.class);
+    private Query query;
+    private ThreadTime threadTime;
+    private PhoenixUtil pUtil = PhoenixUtil.create();
+    private String threadName;
+    private DataModelResult dataModelResult;
+    private long numberOfExecutions;
+    private long executionDurationInMs;
+    private static long lastResultWritten = System.currentTimeMillis() - 1000;
+    private final ResultManager resultManager;
+
+    /**
+     * MultiThreadedRunner
+     *
+     * @param threadName
+     * @param query
+     * @param dataModelResult
+     * @param threadTime
+     * @param numberOfExecutions
+     * @param executionDurationInMs
+     */
+    MultiThreadedRunner(String threadName, Query query, DataModelResult dataModelResult,
+            ThreadTime threadTime, long numberOfExecutions, long executionDurationInMs) {
+        this.query = query;
+        this.threadName = threadName;
+        this.threadTime = threadTime;
+        this.dataModelResult = dataModelResult;
+        this.numberOfExecutions = numberOfExecutions;
+        this.executionDurationInMs = executionDurationInMs;
+        this.resultManager = new ResultManager(dataModelResult.getName(), RunMode.PERFORMANCE);
+    }
+
+    /**
+     * Executes run for a minimum of number of execution or execution duration
+     */
+    public void run() {
+        logger.info("\n\nThread Starting " + threadName + " ; " + query.getStatement() + " for "
+                + numberOfExecutions + "times\n\n");
+        Long start = System.currentTimeMillis();
+        for (long i = numberOfExecutions; (i > 0 && ((System.currentTimeMillis() - start)
+                < executionDurationInMs)); i--) {
+            try {
+                synchronized (resultManager) {
+                    timedQuery();
+                    if ((System.currentTimeMillis() - lastResultWritten) > 1000) {
+                        resultManager.write(dataModelResult);
+                        lastResultWritten = System.currentTimeMillis();
+                    }
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+        logger.info("\n\nThread exiting." + threadName + "\n\n");
+    }
+
+    private synchronized ThreadTime getThreadTime() {
+        return threadTime;
+    }
+
+    /**
+     * Timed query execution
+     *
+     * @throws Exception
+     */
+    private void timedQuery() throws Exception {
+        boolean
+                isSelectCountStatement =
+                query.getStatement().toUpperCase().trim().contains("COUNT(*)") ? true : false;
+
+        Connection conn = null;
+        PreparedStatement statement = null;
+        ResultSet rs = null;
+        Long start = System.currentTimeMillis();
+        Date startDate = Calendar.getInstance().getTime();
+        String exception = null;
+        long resultRowCount = 0;
+
+        try {
+            conn = pUtil.getConnection(query.getTenantId());
+            statement = conn.prepareStatement(query.getStatement());
+            boolean isQuery = statement.execute();
+            if (isQuery) {
+                rs = statement.getResultSet();
+                while (rs.next()) {
+                    if (null != query.getExpectedAggregateRowCount()) {
+                        if (rs.getLong(1) != query.getExpectedAggregateRowCount())
+                            throw new RuntimeException(
+                                    "Aggregate count " + rs.getLong(1) + " does not match expected "
+                                            + query.getExpectedAggregateRowCount());
+                    }
+
+                    if (isSelectCountStatement) {
+                        resultRowCount = rs.getLong(1);
+                    } else {
+                        resultRowCount++;
+                    }
+                }
+            } else {
+                conn.commit();
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            exception = e.getMessage();
+        } finally {
+            getThreadTime().getRunTimesInMs().add(new RunTime(exception, startDate, resultRowCount,
+                    (int) (System.currentTimeMillis() - start)));
+
+            if (rs != null) rs.close();
+            if (statement != null) statement.close();
+            if (conn != null) conn.close();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
index c78db90..1735754 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
@@ -30,84 +30,69 @@ import org.apache.phoenix.pherf.result.RunTime;
 import org.apache.phoenix.pherf.result.ThreadTime;
 
 class MultithreadedDiffer implements Runnable {
-	private static final Logger logger = LoggerFactory
-			.getLogger(MultithreadedRunner.class);
-	private Thread t;
-	private Query query;
-	private ThreadTime threadTime;
-	private String threadName;
-	private long numberOfExecutions;
-	private long executionDurationInMs;
-	private QueryVerifier queryVerifier = new QueryVerifier(true);
+    private static final Logger logger = LoggerFactory.getLogger(MultiThreadedRunner.class);
+    private Thread t;
+    private Query query;
+    private ThreadTime threadTime;
+    private String threadName;
+    private long numberOfExecutions;
+    private long executionDurationInMs;
+    private QueryVerifier queryVerifier = new QueryVerifier(true);
 
-	private synchronized ThreadTime getThreadTime() {
+    private synchronized ThreadTime getThreadTime() {
         return threadTime;
     }
 
     /**
-	 * Query Verification
-	 * @throws Exception
-	 */
-	private void diffQuery() throws Exception {
-		Long start = System.currentTimeMillis();
-		Date startDate = Calendar.getInstance().getTime();
- 		String newCSV = queryVerifier.exportCSV(query);
- 		boolean verifyResult = queryVerifier.doDiff(query, newCSV);
- 		String explainPlan = queryVerifier.getExplainPlan(query);
-        getThreadTime().getRunTimesInMs().add(
-                new RunTime(verifyResult == true ? PherfConstants.DIFF_PASS : PherfConstants.DIFF_FAIL, 
-                		explainPlan, startDate, -1L, 
-                		(int)(System.currentTimeMillis() - start)));
-	}
-
-	/**
-	 * Multithreaded Differ
-	 * @param threadName
-	 * @param query
-	 * @param threadName
-	 * @param threadTime
-	 * @param numberOfExecutions
-	 * @param executionDurationInMs
-	 */
-	MultithreadedDiffer(String threadName,
-			Query query, 
-			ThreadTime threadTime, 
-			long numberOfExecutions, 
-			long executionDurationInMs) {
-		this.query = query;
-		this.threadName = threadName;
-		this.threadTime = threadTime;
-		this.numberOfExecutions = numberOfExecutions;
-		this.executionDurationInMs = executionDurationInMs;
-	}
+     * Query Verification
+     *
+     * @throws Exception
+     */
+    private void diffQuery() throws Exception {
+        Long start = System.currentTimeMillis();
+        Date startDate = Calendar.getInstance().getTime();
+        String newCSV = queryVerifier.exportCSV(query);
+        boolean verifyResult = queryVerifier.doDiff(query, newCSV);
+        String explainPlan = queryVerifier.getExplainPlan(query);
+        getThreadTime().getRunTimesInMs().add(new RunTime(
+                        verifyResult == true ? PherfConstants.DIFF_PASS : PherfConstants.DIFF_FAIL,
+                        explainPlan, startDate, -1L, (int) (System.currentTimeMillis() - start)));
+    }
 
-	/**
-	 * Executes verification runs for a minimum of number of execution or execution duration
-	 */
-	public void run() {
-		logger.info("\n\nThread Starting " + t.getName() + " ; " + query.getStatement() + " for "
-				+ numberOfExecutions + "times\n\n");
-		Long start = System.currentTimeMillis();
-		for (long i = numberOfExecutions; (i > 0 && ((System
-				.currentTimeMillis() - start) < executionDurationInMs)); i--) {
-			try {
-				diffQuery();
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
-		}
-		logger.info("\n\nThread exiting." + t.getName() + "\n\n");
-	}
+    /**
+     * Multithreaded Differ
+     *
+     * @param threadName
+     * @param query
+     * @param threadName
+     * @param threadTime
+     * @param numberOfExecutions
+     * @param executionDurationInMs
+     */
+    MultithreadedDiffer(String threadName, Query query, ThreadTime threadTime,
+            long numberOfExecutions, long executionDurationInMs) {
+        this.query = query;
+        this.threadName = threadName;
+        this.threadTime = threadTime;
+        this.numberOfExecutions = numberOfExecutions;
+        this.executionDurationInMs = executionDurationInMs;
+    }
 
-	/**
-	 * Thread start
-	 * @return
-	 */
-	public Thread start() {
-		if (t == null) {
-			t = new Thread(this, threadName);
-			t.start();
-		}
-		return t;
-	}
+    /**
+     * Executes verification runs for a minimum of number of execution or execution duration
+     */
+    public void run() {
+        logger.info("\n\nThread Starting " + t.getName() + " ; " + query.getStatement() + " for "
+                + numberOfExecutions + "times\n\n");
+        Long start = System.currentTimeMillis();
+        for (long i = numberOfExecutions; (i > 0 && ((System.currentTimeMillis() - start)
+                < executionDurationInMs)); i--) {
+            try {
+                diffQuery();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+        logger.info("\n\nThread exiting." + t.getName() + "\n\n");
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedRunner.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedRunner.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedRunner.java
deleted file mode 100644
index 237fc17..0000000
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedRunner.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.phoenix.pherf.workload;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.util.Calendar;
-import java.util.Date;
-
-import org.apache.phoenix.pherf.PherfConstants.RunMode;
-
-import org.apache.phoenix.pherf.result.DataModelResult;
-import org.apache.phoenix.pherf.result.ResultManager;
-import org.apache.phoenix.pherf.result.RunTime;
-import org.apache.phoenix.pherf.result.ThreadTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.phoenix.pherf.configuration.Query;
-import org.apache.phoenix.pherf.util.PhoenixUtil;
-
-class MultithreadedRunner implements Runnable {
-	private static final Logger logger = LoggerFactory
-			.getLogger(MultithreadedRunner.class);
-	private Thread t;
-	private Query query;
-	private ThreadTime threadTime;
-	private PhoenixUtil pUtil = new PhoenixUtil();
-	private String threadName;
-	private DataModelResult dataModelResult;
-	private long numberOfExecutions;
-	private long executionDurationInMs;
-	private static long lastResultWritten = System.currentTimeMillis() - 1000;
-    private final ResultManager resultManager;
-
-    /**
-     * Multithreaded runner
-     *
-     * @param threadName
-     * @param query
-     * @param dataModelResult
-     * @param threadTime
-     * @param numberOfExecutions
-     * @param executionDurationInMs
-     */
-    MultithreadedRunner(String threadName,
-                        Query query,
-                        DataModelResult dataModelResult,
-                        ThreadTime threadTime,
-                        long numberOfExecutions,
-                        long executionDurationInMs) {
-        this.query = query;
-        this.threadName = threadName;
-        this.threadTime = threadTime;
-        this.dataModelResult = dataModelResult;
-        this.numberOfExecutions = numberOfExecutions;
-        this.executionDurationInMs = executionDurationInMs;
-        this.resultManager = new ResultManager(dataModelResult.getName(), RunMode.PERFORMANCE);
-    }
-
-	/**
-	 * Executes run for a minimum of number of execution or execution duration
-	 */
-	public void run() {
-		logger.info("\n\nThread Starting " + t.getName() + " ; " + query.getStatement() + " for "
-				+ numberOfExecutions + "times\n\n");
-		Long start = System.currentTimeMillis();
-		for (long i = numberOfExecutions; (i > 0 && ((System
-				.currentTimeMillis() - start) < executionDurationInMs)); i--) {
-			try {
-                synchronized (resultManager) {
-                    timedQuery();
-                    if ((System.currentTimeMillis() - lastResultWritten) > 1000) {
-                        resultManager.write(dataModelResult);
-                        lastResultWritten = System.currentTimeMillis();
-                    }
-                }
-            } catch (Exception e) {
-				e.printStackTrace();
-			}
-		}
-		logger.info("\n\nThread exiting." + t.getName() + "\n\n");
-	}
-
-	/**
-	 * Thread start
-	 * @return
-	 */
-	public Thread start() {
-		if (t == null) {
-			t = new Thread(this, threadName);
-			t.start();
-		}
-		return t;
-	}
-
-    private synchronized ThreadTime getThreadTime() {
-        return threadTime;
-    }
-
-    /**
-     * Timed query execution
-     *
-     * @throws Exception
-     */
-    private void timedQuery() throws Exception {
-        boolean isSelectCountStatement = query.getStatement().toUpperCase().trim()
-                .contains("COUNT(*)") ? true : false;
-
-        Connection conn = null;
-        PreparedStatement statement = null;
-        ResultSet rs = null;
-        Long start = System.currentTimeMillis();
-        Date startDate = Calendar.getInstance().getTime();
-        String exception = null;
-        long resultRowCount = 0;
-
-        try {
-            conn = pUtil.getConnection(query.getTenantId());
-            statement = conn.prepareStatement(query.getStatement());
-            boolean isQuery = statement.execute();
-            if (isQuery) {
-                rs = statement.getResultSet();
-                while (rs.next()) {
-                    if (null != query.getExpectedAggregateRowCount()) {
-                        if (rs.getLong(1) != query.getExpectedAggregateRowCount())
-                            throw new RuntimeException("Aggregate count "
-                                    + rs.getLong(1) + " does not match expected "
-                                    + query.getExpectedAggregateRowCount());
-                    }
-
-                    if (isSelectCountStatement) {
-                        resultRowCount = rs.getLong(1);
-                    } else {
-                        resultRowCount++;
-                    }
-                }
-            } else {
-                conn.commit();
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            exception = e.getMessage();
-        } finally {
-            getThreadTime().getRunTimesInMs().add(
-                    new RunTime(exception, startDate, resultRowCount, (int) (System.currentTimeMillis() - start)));
-
-            if (rs != null) rs.close();
-            if (statement != null) statement.close();
-            if (conn != null) conn.close();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
index 6f6e000..624188c 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
@@ -18,227 +18,256 @@
 
 package org.apache.phoenix.pherf.workload;
 
-import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.phoenix.pherf.PherfConstants.RunMode;
-import org.apache.phoenix.pherf.configuration.XMLConfigParser;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.phoenix.pherf.PherfConstants.RunMode;
+import org.apache.phoenix.pherf.configuration.*;
 import org.apache.phoenix.pherf.result.*;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.phoenix.pherf.configuration.DataModel;
-import org.apache.phoenix.pherf.configuration.ExecutionType;
-import org.apache.phoenix.pherf.configuration.Query;
-import org.apache.phoenix.pherf.configuration.QuerySet;
-import org.apache.phoenix.pherf.configuration.Scenario;
-import org.apache.phoenix.pherf.util.PhoenixUtil;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+public class QueryExecutor implements Workload {
+    private static final Logger logger = LoggerFactory.getLogger(QueryExecutor.class);
+    private List<DataModel> dataModels;
+    private String queryHint;
+    private final RunMode runMode;
+    private final boolean exportCSV;
+    private final ExecutorService pool;
+    private final XMLConfigParser parser;
+    private final PhoenixUtil util;
+
+    public QueryExecutor(XMLConfigParser parser, PhoenixUtil util, ExecutorService pool) {
+        this(parser, util, pool, parser.getDataModels(), null, false, RunMode.PERFORMANCE);
+    }
+
+    public QueryExecutor(XMLConfigParser parser, PhoenixUtil util, ExecutorService pool,
+            List<DataModel> dataModels, String queryHint, boolean exportCSV, RunMode runMode) {
+        this.parser = parser;
+        this.queryHint = queryHint;
+        this.exportCSV = exportCSV;
+        this.runMode = runMode;
+        this.dataModels = dataModels;
+        this.pool = pool;
+        this.util = util;
+    }
+
+    @Override public void complete() {
+
+    }
+
+    /**
+     * Calls in Multithreaded Query Executor for all datamodels
+     *
+     * @throws Exception
+     */
+    public Runnable execute() throws Exception {
+        Runnable runnable = null;
+        for (DataModel dataModel : dataModels) {
+            if (exportCSV) {
+                runnable = exportAllScenarios(dataModel);
+            } else {
+                runnable = executeAllScenarios(dataModel);
+            }
+        }
+        return runnable;
+    }
+
+    /**
+     * Export all queries results to CSV
+     *
+     * @param dataModel
+     * @throws Exception
+     */
+    protected Runnable exportAllScenarios(final DataModel dataModel) throws Exception {
+        return new Runnable() {
+            @Override public void run() {
+                try {
+
+                    List<Scenario> scenarios = dataModel.getScenarios();
+                    QueryVerifier exportRunner = new QueryVerifier(false);
+                    for (Scenario scenario : scenarios) {
+                        for (QuerySet querySet : scenario.getQuerySet()) {
+                            util.executeQuerySetDdls(querySet);
+                            for (Query query : querySet.getQuery()) {
+                                exportRunner.exportCSV(query);
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    logger.warn("", e);
+                }
+            }
+        };
+    }
+
+    /**
+     * Execute all scenarios
+     *
+     * @param dataModel
+     * @throws Exception
+     */
+    protected Runnable executeAllScenarios(final DataModel dataModel) throws Exception {
+        return new Runnable() {
+            @Override public void run() {
+                List<DataModelResult> dataModelResults = new ArrayList<>();
+                DataModelResult
+                        dataModelResult =
+                        new DataModelResult(dataModel, PhoenixUtil.getZookeeper());
+                ResultManager
+                        resultManager =
+                        new ResultManager(dataModelResult.getName(), QueryExecutor.this.runMode);
+
+                dataModelResults.add(dataModelResult);
+                List<Scenario> scenarios = dataModel.getScenarios();
+                Configuration conf = HBaseConfiguration.create();
+                Map<String, String> phoenixProperty = conf.getValByRegex("phoenix");
+                try {
+
+                    for (Scenario scenario : scenarios) {
+                        ScenarioResult scenarioResult = new ScenarioResult(scenario);
+                        scenarioResult.setPhoenixProperties(phoenixProperty);
+                        dataModelResult.getScenarioResult().add(scenarioResult);
+                        WriteParams writeParams = scenario.getWriteParams();
+
+                        if (writeParams != null) {
+                            int writerThreadCount = writeParams.getWriterThreadCount();
+                            for (int i = 0; i < writerThreadCount; i++) {
+                                logger.debug("Inserting write workload ( " + i + " ) of ( "
+                                        + writerThreadCount + " )");
+                                Workload writes = new WriteWorkload(PhoenixUtil.create(), parser);
+                                pool.submit(writes.execute());
+                            }
+                        }
+
+                        for (QuerySet querySet : scenario.getQuerySet()) {
+                            QuerySetResult querySetResult = new QuerySetResult(querySet);
+                            scenarioResult.getQuerySetResult().add(querySetResult);
+
+                            util.executeQuerySetDdls(querySet);
+                            if (querySet.getExecutionType() == ExecutionType.SERIAL) {
+                                executeQuerySetSerial(dataModelResult, querySet, querySetResult);
+                            } else {
+                                executeQuerySetParallel(dataModelResult, querySet, querySetResult);
+                            }
+                        }
+                        resultManager.write(dataModelResult);
+                    }
+                    resultManager.write(dataModelResults);
+                } catch (Exception e) {
+                    logger.warn("", e);
+                }
+            }
+        };
+    }
 
-public class QueryExecutor {
-	private static final Logger logger = LoggerFactory.getLogger(QueryExecutor.class);
-	private List<DataModel> dataModels;
-	private String queryHint;
-	private RunMode runMode;
+    /**
+     * Execute query set serially
+     *
+     * @param dataModelResult
+     * @param querySet
+     * @param querySetResult
+     * @throws InterruptedException
+     */
+    protected void executeQuerySetSerial(DataModelResult dataModelResult, QuerySet querySet,
+            QuerySetResult querySetResult) throws InterruptedException {
+        for (Query query : querySet.getQuery()) {
+            QueryResult queryResult = new QueryResult(query);
+            querySetResult.getQueryResults().add(queryResult);
+
+            for (int cr = querySet.getMinConcurrency(); cr <= querySet.getMaxConcurrency(); cr++) {
+
+                List<Future> threads = new ArrayList<>();
+
+                for (int i = 0; i < cr; i++) {
+
+                    Runnable
+                            thread =
+                            executeRunner((i + 1) + "," + cr, dataModelResult, queryResult,
+                                    querySetResult);
+                    threads.add(pool.submit(thread));
+                }
+
+                for (Future thread : threads) {
+                    try {
+                        thread.get();
+                    } catch (ExecutionException e) {
+                        logger.error("", e);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Execute query set in parallel
+     *
+     * @param dataModelResult
+     * @param querySet
+     * @param querySetResult
+     * @throws InterruptedException
+     */
+    protected void executeQuerySetParallel(DataModelResult dataModelResult, QuerySet querySet,
+            QuerySetResult querySetResult) throws InterruptedException {
+        for (int cr = querySet.getMinConcurrency(); cr <= querySet.getMaxConcurrency(); cr++) {
+            List<Future> threads = new ArrayList<>();
+            for (int i = 0; i < cr; i++) {
+                for (Query query : querySet.getQuery()) {
+                    QueryResult queryResult = new QueryResult(query);
+                    querySetResult.getQueryResults().add(queryResult);
+
+                    Runnable
+                            thread =
+                            executeRunner((i + 1) + "," + cr, dataModelResult, queryResult,
+                                    querySetResult);
+                    threads.add(pool.submit(thread));
+                }
+
+                for (Future thread : threads) {
+                    try {
+                        thread.get();
+                    } catch (ExecutionException e) {
+                        logger.error("", e);
+                    }
+                }
+            }
+        }
+    }
 
-	public QueryExecutor(XMLConfigParser parser) {
-		this.dataModels = parser.getDataModels();
+    /**
+     * Execute multi-thread runner
+     *
+     * @param name
+     * @param dataModelResult
+     * @param queryResult
+     * @param querySet
+     * @return
+     */
+    protected Runnable executeRunner(String name, DataModelResult dataModelResult,
+            QueryResult queryResult, QuerySet querySet) {
+        ThreadTime threadTime = new ThreadTime();
+        queryResult.getThreadTimes().add(threadTime);
+        threadTime.setThreadName(name);
+        queryResult.setHint(this.queryHint);
+        logger.info("\nExecuting query " + queryResult.getStatement());
+        Runnable thread;
+        if (this.runMode == RunMode.FUNCTIONAL) {
+            thread =
+                    new MultithreadedDiffer(threadTime.getThreadName(), queryResult, threadTime,
+                            querySet.getNumberOfExecutions(), querySet.getExecutionDurationInMs());
+        } else {
+            thread =
+                    new MultiThreadedRunner(threadTime.getThreadName(), queryResult,
+                            dataModelResult, threadTime, querySet.getNumberOfExecutions(),
+                            querySet.getExecutionDurationInMs());
+        }
+        return thread;
     }
-	
-	/**
-	 * Calls in Multithreaded Query Executor for all datamodels
-	 * @throws Exception 
-	 */
-	public void execute(String queryHint, boolean exportCSV, RunMode runMode) throws Exception {
-		this.queryHint = queryHint;
-		this.runMode = runMode;
-		for (DataModel dataModel: dataModels) {
-			if (exportCSV) {
-				exportAllScenarios(dataModel);	
-			} else {
-				executeAllScenarios(dataModel);
-			}
-		}
-	}
-
-	/**
-	 * Export all queries results to CSV 
-	 * @param dataModel
-	 * @throws Exception 
-	 */
-	protected void exportAllScenarios(DataModel dataModel) throws Exception {
-		List<Scenario> scenarios = dataModel.getScenarios();
-		QueryVerifier exportRunner = new QueryVerifier(false);
-		for (Scenario scenario : scenarios) {
-			for (QuerySet querySet : scenario.getQuerySet()) {
-				executeQuerySetDdls(querySet);
-				for (Query query : querySet.getQuery()) {
-					exportRunner.exportCSV(query);
-				}
-			}
-		}
-	}
-	
-	/**
-	 * Execute all scenarios
-	 * @param dataModel
-	 * @throws Exception 
-	 */
-	protected void executeAllScenarios(DataModel dataModel) throws Exception {
-		List<DataModelResult> dataModelResults = new ArrayList<DataModelResult>();
-		DataModelResult dataModelResult = new DataModelResult(dataModel, PhoenixUtil.getZookeeper());
-        ResultManager resultManager = new ResultManager(dataModelResult.getName(), this.runMode);
-
-
-		dataModelResults.add(dataModelResult);
-		List<Scenario> scenarios = dataModel.getScenarios();
-		Configuration conf = HBaseConfiguration.create();
-		Map<String, String> phoenixProperty = conf.getValByRegex("phoenix");
-		phoenixProperty.putAll(conf.getValByRegex("sfdc"));
-
-		for (Scenario scenario : scenarios) {
-			ScenarioResult scenarioResult = new ScenarioResult(scenario);
-			scenarioResult.setPhoenixProperties(phoenixProperty);
-			dataModelResult.getScenarioResult().add(scenarioResult);
-
-			for (QuerySet querySet : scenario.getQuerySet()) {
-				QuerySetResult querySetResult = new QuerySetResult(querySet);
-				scenarioResult.getQuerySetResult().add(querySetResult);
-				
-				executeQuerySetDdls(querySet);
-				
-				if (querySet.getExecutionType() == ExecutionType.SERIAL) {
-					execcuteQuerySetSerial(dataModelResult, querySet, querySetResult, scenarioResult);
-				} else {
-					execcuteQuerySetParallel(dataModelResult, querySet, querySetResult, scenarioResult);					
-				}
-			}
-            resultManager.write(dataModelResult);
-		}
-        resultManager.write(dataModelResults);
-	}
-
-	/**
-	 * Execute all querySet DDLs first based on tenantId if specified. This is executed
-	 * first since we don't want to run DDLs in parallel to executing queries.
-	 * 
-	 * @param querySet
-	 * @throws Exception 
-	 */
-	protected void executeQuerySetDdls(QuerySet querySet) throws Exception {
-		PhoenixUtil pUtil = new PhoenixUtil();
-		for (Query query : querySet.getQuery()) {
-			if (null != query.getDdl()) {
-				Connection conn = null;
-				try {
-					logger.info("\nExecuting DDL:" + query.getDdl() + " on tenantId:" + query.getTenantId());
-					pUtil.executeStatement(query.getDdl(), conn = pUtil.getConnection(query.getTenantId()));
-				} finally {
-					if (null != conn) {
-						conn.close();
-					}
-				}
-			}
-		}
-	}
-
-	/**
-	 * Execute query set serially
-	 * @param dataModelResult
-	 * @param querySet
-	 * @param querySetResult
-	 * @param scenario
-	 * @throws InterruptedException
-	 */
-	protected void execcuteQuerySetSerial(DataModelResult dataModelResult, QuerySet querySet, QuerySetResult querySetResult, Scenario scenario) throws InterruptedException {
-		for (Query query : querySet.getQuery()) {
-			QueryResult queryResult = new QueryResult(query);
-			querySetResult.getQueryResults().add(queryResult);
-
-			for (int cr = querySet.getMinConcurrency(); cr <= querySet
-					.getMaxConcurrency(); cr++) {
-				
-				List<Thread> threads = new ArrayList<Thread>();
-				
-				for (int i = 0; i < cr; i++) {
-
-					Thread thread = executeRunner((i + 1) + ","
-							+ cr, dataModelResult, queryResult,
-							querySetResult);
-					threads.add(thread);
-				}
-
-				for (Thread thread : threads) {
-					thread.join();
-				}
-			}
-		}
-	}
-
-	/**
-	 * Execute query set in parallel
-	 * @param dataModelResult
-	 * @param querySet
-	 * @param querySetResult
-	 * @param scenario
-	 * @throws InterruptedException
-	 */
-	protected void execcuteQuerySetParallel(DataModelResult dataModelResult, QuerySet querySet, QuerySetResult querySetResult, Scenario scenario)
-			throws InterruptedException {
-		for (int cr = querySet.getMinConcurrency(); cr <= querySet
-				.getMaxConcurrency(); cr++) {
-			List<Thread> threads = new ArrayList<Thread>();
-			for (int i = 0; i < cr; i++) {
-				for (Query query : querySet.getQuery()) {
-					QueryResult queryResult = new QueryResult(query);
-					querySetResult.getQueryResults().add(queryResult);
-
-					Thread thread = executeRunner((i + 1) + ","
-							+ cr, dataModelResult, queryResult,
-							querySetResult);
-					threads.add(thread);
-				}
-			}
-			for (Thread thread : threads) {
-				thread.join();
-			}
-		}
-	}
-	
-	/**
-	 * Execute multi-thread runner
-	 * @param name
-	 * @param dataModelResult
-	 * @param queryResult
-	 * @param querySet
-	 * @return
-	 */
-	protected Thread executeRunner(String name, DataModelResult dataModelResult, QueryResult queryResult, QuerySet querySet) {
-		ThreadTime threadTime = new ThreadTime();
-		queryResult.getThreadTimes().add(threadTime);
-		threadTime.setThreadName(name);
-		queryResult.setHint(this.queryHint);
-		logger.info("\nExecuting query "
-				+ queryResult.getStatement());
-		Thread thread;
-		if (this.runMode == RunMode.FUNCTIONAL) {
-			thread = new MultithreadedDiffer(
-					threadTime.getThreadName(),
-					queryResult,
-					threadTime, querySet.getNumberOfExecutions(), querySet.getExecutionDurationInMs())
-					.start();
-		} else {
-			thread = new MultithreadedRunner(
-					threadTime.getThreadName(),
-					queryResult,
-					dataModelResult,
-					threadTime, querySet.getNumberOfExecutions(), querySet.getExecutionDurationInMs())
-					.start();
-		}
-		return thread;
-	}
-}
+}
\ No newline at end of file


[3/3] phoenix git commit: PHOENIX-1920 - Pherf - Add support for mixed r/w workloads

Posted by co...@apache.org.
PHOENIX-1920 - Pherf - Add support for mixed r/w workloads


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7175dcbc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7175dcbc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7175dcbc

Branch: refs/heads/master
Commit: 7175dcbc011dff48f6d041697ec84da98f80f729
Parents: 466eeb3
Author: cmarcel <cm...@salesforce.com>
Authored: Fri Jun 19 16:34:41 2015 -0700
Committer: cmarcel <cm...@salesforce.com>
Committed: Fri Jun 19 16:34:41 2015 -0700

----------------------------------------------------------------------
 .gitignore                                      |   2 +
 phoenix-pherf/pom.xml                           |  10 +-
 .../org/apache/phoenix/pherf/DataIngestIT.java  | 134 ++++--
 .../org/apache/phoenix/pherf/PherfMainIT.java   |  36 ++
 .../apache/phoenix/pherf/ResultBaseTestIT.java  |  31 +-
 .../apache/phoenix/pherf/SchemaReaderIT.java    |  17 +-
 .../java/org/apache/phoenix/pherf/Pherf.java    | 179 +++++---
 .../apache/phoenix/pherf/PherfConstants.java    |   8 +-
 .../phoenix/pherf/configuration/DataModel.java  |  10 -
 .../phoenix/pherf/configuration/Scenario.java   |  12 +-
 .../pherf/configuration/WriteParams.java        |  72 +++
 .../pherf/configuration/XMLConfigParser.java    |  25 +-
 .../phoenix/pherf/jmx/MonitorManager.java       | 153 ++++---
 .../phoenix/pherf/loaddata/DataLoader.java      | 332 --------------
 .../pherf/result/DataLoadThreadTime.java        |  87 ++--
 .../pherf/result/DataLoadTimeSummary.java       |  54 +--
 .../phoenix/pherf/result/DataModelResult.java   |  68 ++-
 .../phoenix/pherf/result/QueryResult.java       |  17 +-
 .../phoenix/pherf/result/QuerySetResult.java    |  40 +-
 .../org/apache/phoenix/pherf/result/Result.java |  11 +-
 .../phoenix/pherf/result/ResultHandler.java     |   5 +
 .../phoenix/pherf/result/ResultManager.java     |  19 +-
 .../apache/phoenix/pherf/result/ResultUtil.java | 119 +++--
 .../phoenix/pherf/result/ResultValue.java       |   4 +-
 .../apache/phoenix/pherf/result/RunTime.java    | 179 ++++----
 .../phoenix/pherf/result/ScenarioResult.java    |  44 +-
 .../apache/phoenix/pherf/result/ThreadTime.java |  34 +-
 .../phoenix/pherf/result/file/Extension.java    |   3 +-
 .../phoenix/pherf/result/file/Header.java       |  11 +-
 .../pherf/result/impl/CSVResultHandler.java     |  47 +-
 .../pherf/result/impl/ImageResultHandler.java   |  58 +--
 .../pherf/result/impl/XMLResultHandler.java     |  36 +-
 .../phoenix/pherf/schema/SchemaReader.java      |   2 +-
 .../apache/phoenix/pherf/util/PhoenixUtil.java  |  64 ++-
 .../pherf/workload/MultiThreadedRunner.java     | 153 +++++++
 .../pherf/workload/MultithreadedDiffer.java     | 131 +++---
 .../pherf/workload/MultithreadedRunner.java     | 170 -------
 .../phoenix/pherf/workload/QueryExecutor.java   | 459 ++++++++++---------
 .../phoenix/pherf/workload/QueryVerifier.java   | 265 +++++------
 .../apache/phoenix/pherf/workload/Workload.java |  10 +
 .../pherf/workload/WorkloadExecutor.java        | 109 ++---
 .../phoenix/pherf/workload/WriteWorkload.java   | 403 ++++++++++++++++
 .../scenario/prod_test_unsalted_scenario.xml    |  35 ++
 .../phoenix/pherf/ConfigurationParserTest.java  | 102 +++--
 .../org/apache/phoenix/pherf/ResultTest.java    |   5 +-
 .../apache/phoenix/pherf/RuleGeneratorTest.java |  15 +-
 .../test/resources/scenario/test_scenario.xml   |  58 ++-
 47 files changed, 2171 insertions(+), 1667 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index fc0e4af..b918d76 100644
--- a/.gitignore
+++ b/.gitignore
@@ -22,3 +22,5 @@
 target/
 release/
 RESULTS/
+CSV_EXPORT/
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-pherf/pom.xml b/phoenix-pherf/pom.xml
index 1667c66..0facbde 100644
--- a/phoenix-pherf/pom.xml
+++ b/phoenix-pherf/pom.xml
@@ -16,7 +16,8 @@
   ~   limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
@@ -30,7 +31,7 @@
     <name>Phoenix - Pherf</name>
 
     <properties>
-      <top.dir>${project.basedir}/..</top.dir>
+        <top.dir>${project.basedir}/..</top.dir>
     </properties>
 
     <profiles>
@@ -233,6 +234,11 @@
 
         <!-- Test Dependencies -->
         <dependency>
+            <groupId>com.jcabi</groupId>
+            <artifactId>jcabi-jdbc</artifactId>
+            <version>0.15</version>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>4.11</version>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java
index 2b56f43..828ac38 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java
@@ -18,70 +18,122 @@
 
 package org.apache.phoenix.pherf;
 
+import com.jcabi.jdbc.JdbcSession;
+import com.jcabi.jdbc.Outcome;
 import org.apache.phoenix.pherf.configuration.Column;
+import org.apache.phoenix.pherf.configuration.DataModel;
 import org.apache.phoenix.pherf.configuration.DataTypeMapping;
 import org.apache.phoenix.pherf.configuration.Scenario;
-import org.apache.phoenix.pherf.configuration.XMLConfigParser;
-import org.apache.phoenix.pherf.loaddata.DataLoader;
 import org.apache.phoenix.pherf.rules.DataValue;
 import org.apache.phoenix.pherf.rules.RulesApplier;
-import org.apache.phoenix.pherf.schema.SchemaReader;
-import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.QueryExecutor;
+import org.apache.phoenix.pherf.workload.WorkloadExecutor;
+import org.apache.phoenix.pherf.workload.WriteWorkload;
+import org.junit.Before;
 import org.junit.Test;
 
-import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 public class DataIngestIT extends ResultBaseTestIT {
-    protected static PhoenixUtil util = new PhoenixUtil(true);
-    static final String matcherScenario = ".*scenario/.*test.*xml";
-    static final String matcherSchema = ".*datamodel/.*test.*sql";
 
-    @Test
-    public void generateData() throws Exception {
-        util.setZookeeper("localhost");
-        SchemaReader reader = new SchemaReader(util, matcherSchema);
-        XMLConfigParser parser = new XMLConfigParser(matcherScenario);
+    @Before
+    public void applySchema() throws Exception {
+        reader.applySchema();
+        resources = new ArrayList<>(reader.getResourceList());
 
-        // 1. Generate table schema from file
-        List<Path> resources = new ArrayList<>(reader.getResourceList());
         assertTrue("Could not pull list of schema files.", resources.size() > 0);
         assertNotNull("Could not read schema file.", reader.resourceToString(resources.get(0)));
-        reader.applySchema();
+    }
+
+    @Test
+    public void testColumnRulesApplied() {
+
+        Scenario scenario = null;
+        try {
+            scenario = parser.getScenarioByName("testScenario");
+            List<Column>
+                    columnListFromPhoenix =
+                    util.getColumnsFromPhoenix(scenario.getSchemaName(),
+                            scenario.getTableNameWithoutSchemaName(), util.getConnection());
+            assertTrue("Could not get phoenix columns.", columnListFromPhoenix.size() > 0);
+
+            WriteWorkload loader = new WriteWorkload(util, parser, scenario);
+            WorkloadExecutor executor = new WorkloadExecutor();
+            executor.add(loader);
+
+            RulesApplier rulesApplier = loader.getRulesApplier();
+            List<Map> modelList = rulesApplier.getModelList();
+            assertTrue("Could not generate the modelList", modelList.size() > 0);
+
+            for (Column column : columnListFromPhoenix) {
+                DataValue data = rulesApplier.getDataForRule(scenario, column);
 
-        // 2. Load the metadata of for the test tables
-        Scenario scenario = parser.getScenarios().get(0);
-        List<Column> columnListFromPhoenix = util.getColumnsFromPhoenix(scenario.getSchemaName(), scenario.getTableNameWithoutSchemaName(), util.getConnection());
-        assertTrue("Could not get phoenix columns.", columnListFromPhoenix.size() > 0);
-        DataLoader loader = new DataLoader(util,parser);
-        RulesApplier rulesApplier = loader.getRulesApplier();
-        List<Map> modelList = rulesApplier.getModelList();
-        assertTrue("Could not generate the modelList", modelList.size() > 0);
-
-        for (Column column : columnListFromPhoenix) {
-            DataValue data = rulesApplier.getDataForRule(scenario, column);
-
-            // We are generating data values so the value should have been specified by this point.
-            assertTrue("Failed to retrieve data for column type: " + column.getType(), data != null);
-
-            // Test that we still retrieve the GENERAL_CHAR rule even after an override is applied to another CHAR type.
-            // NEWVAL_STRING Column does not  specify an override so we should get the default rule.
-            if ((column.getType() == DataTypeMapping.VARCHAR) && (column.getName().equals("NEWVAL_STRING"))) {
-                assertTrue("Failed to retrieve data for column type: ", data.getDistribution() == Integer.MIN_VALUE);
+                // We are generating data values
+                // so the value should have been specified by this point.
+                assertTrue("Failed to retrieve data for column type: " + column.getType(),
+                        data != null);
+
+                // Test that we still retrieve the GENERAL_CHAR rule even after an override is
+                // applied to another CHAR type. NEWVAL_STRING Column does not specify an override
+                // so we should get the default rule.
+                if ((column.getType() == DataTypeMapping.VARCHAR) && (column.getName()
+                        .equals("NEWVAL_STRING"))) {
+                    assertTrue("Failed to retrieve data for column type: ",
+                            data.getDistribution() == Integer.MIN_VALUE);
+                }
             }
+        } catch (Exception e) {
+            fail("We had an exception: " + e.getMessage());
         }
+    }
+
+    @Test
+    public void testRWWorkload() throws Exception {
+
+        Connection connection = util.getConnection();
+
+        WorkloadExecutor executor = new WorkloadExecutor();
+        DataModel dataModel = parser.getDataModelByName("test_scenario");
+        List<DataModel> dataModels = new ArrayList<>();
+        dataModels.add(dataModel);
+        QueryExecutor
+                qe =
+                new QueryExecutor(parser, util, executor.getPool(), dataModels, null, false,
+                        PherfConstants.RunMode.PERFORMANCE);
+        executor.add(qe);
+        Scenario scenario = parser.getScenarioByName("testScenarioRW");
+
+        String sql = "select count(*) from " + scenario.getTableName();
 
-        // Load up the data.
         try {
-            loader.execute();
+            // Wait for data to load up.
+            executor.get();
+            executor.shutdown();
+
+            // Verify data has been loaded
+            Integer count = new JdbcSession(connection).sql(sql).select(new Outcome<Integer>() {
+                @Override public Integer handle(ResultSet resultSet, Statement statement)
+                        throws SQLException {
+                    while (resultSet.next()) {
+                        return resultSet.getInt(1);
+                    }
+                    return null;
+                }
+            });
+            assertNotNull("Could not retrieve count. " + count);
+
+            // It would be better to sum up all the rowcounts for the scenarios, but this is fine
+            assertTrue("Could not query any rows for in " + scenario.getTableName(), count > 0);
         } catch (Exception e) {
-            fail("Failed to lead data. An exception was thrown: " + e.getMessage());
+            fail("Failed to load data. An exception was thrown: " + e.getMessage());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
new file mode 100644
index 0000000..2407ef4
--- /dev/null
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.phoenix.pherf;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.ExpectedSystemExit;
+
+public class PherfMainIT extends ResultBaseTestIT {
+    @Rule
+    public final ExpectedSystemExit exit = ExpectedSystemExit.none();
+
+    @Test
+    public void testPherfMain() {
+        String[] args = { "-q",
+                "--scenarioFile", ".*prod_test_unsalted_scenario.*",
+                "-m", "--monitorFrequency", "10" };
+        Pherf.main(args);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java
index 6e103b8..d2c5173 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java
@@ -19,27 +19,38 @@
 package org.apache.phoenix.pherf;
 
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.pherf.configuration.XMLConfigParser;
 import org.apache.phoenix.pherf.result.ResultUtil;
+import org.apache.phoenix.pherf.schema.SchemaReader;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
 import org.junit.BeforeClass;
 
+import java.nio.file.Path;
+import java.util.List;
 import java.util.Properties;
 
 public class ResultBaseTestIT extends BaseHBaseManagedTimeIT {
-    private static boolean isSetUpDone = false;
+    protected static final String matcherScenario = ".*scenario/.*test.*xml";
+    protected static final String matcherSchema = ".*datamodel/.*test.*sql";
 
-    @BeforeClass
-    public static void setUp() throws Exception {
-        if (isSetUpDone) {
-            return;
-        }
+    protected static PhoenixUtil util = PhoenixUtil.create(true);
+    protected static Properties properties;
+    protected static SchemaReader reader;
+    protected static XMLConfigParser parser;
+    protected static List<Path> resources;
+    protected static ResultUtil resultUtil = new ResultUtil();
+
+    @BeforeClass public static void setUp() throws Exception {
 
-        ResultUtil util = new ResultUtil();
         PherfConstants constants = PherfConstants.create();
-        Properties properties = constants.getProperties(PherfConstants.PHERF_PROPERTIES);
+        properties = constants.getProperties(PherfConstants.PHERF_PROPERTIES);
         String dir = properties.getProperty("pherf.default.results.dir");
         String targetDir = "target/" + dir;
         properties.setProperty("pherf.default.results.dir", targetDir);
-        util.ensureBaseDirExists(targetDir);
-        isSetUpDone = true;
+        resultUtil.ensureBaseDirExists(targetDir);
+
+        util.setZookeeper("localhost");
+        reader = new SchemaReader(util, matcherSchema);
+        parser = new XMLConfigParser(matcherScenario);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java
index 2cb7c13..bce1e91 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java
@@ -34,15 +34,12 @@ import java.sql.Connection;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 public class SchemaReaderIT extends BaseHBaseManagedTimeIT {
-    protected static PhoenixUtil util = new PhoenixUtil(true);
+    protected static PhoenixUtil util = PhoenixUtil.create(true);
 
-	@Test
-    public void testSchemaReader() {
+    @Test public void testSchemaReader() {
         // Test for the unit test version of the schema files.
         assertApplySchemaTest();
     }
@@ -55,7 +52,8 @@ public class SchemaReaderIT extends BaseHBaseManagedTimeIT {
             List<Path> resources = new ArrayList<>(reader.getResourceList());
             assertTrue("Could not pull list of schema files.", resources.size() > 0);
             assertNotNull("Could not read schema file.", this.getClass().getResourceAsStream(
-                    PherfConstants.RESOURCE_DATAMODEL + "/" + resources.get(0).getFileName().toString()));
+                    PherfConstants.RESOURCE_DATAMODEL + "/" + resources.get(0).getFileName()
+                            .toString()));
             assertNotNull("Could not read schema file.", reader.resourceToString(resources.get(0)));
             reader.applySchema();
 
@@ -67,7 +65,10 @@ public class SchemaReaderIT extends BaseHBaseManagedTimeIT {
             DataModel data = XMLConfigParser.readDataModel(resourcePath);
             List<Scenario> scenarioList = data.getScenarios();
             Scenario scenario = scenarioList.get(0);
-            List<Column> columnList = util.getColumnsFromPhoenix(scenario.getSchemaName(), scenario.getTableNameWithoutSchemaName(), connection);
+            List<Column>
+                    columnList =
+                    util.getColumnsFromPhoenix(scenario.getSchemaName(),
+                            scenario.getTableNameWithoutSchemaName(), connection);
             assertTrue("Could not retrieve Metadata from Phoenix", columnList.size() > 0);
         } catch (Exception e) {
             fail("Could not initialize SchemaReader");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
index 073c661..5a9f45f 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
@@ -18,44 +18,61 @@
 
 package org.apache.phoenix.pherf;
 
+import org.apache.commons.cli.*;
 import org.apache.phoenix.pherf.configuration.XMLConfigParser;
+import org.apache.phoenix.pherf.jmx.MonitorManager;
 import org.apache.phoenix.pherf.schema.SchemaReader;
 import org.apache.phoenix.pherf.util.PhoenixUtil;
 import org.apache.phoenix.pherf.util.ResourceList;
+import org.apache.phoenix.pherf.workload.QueryExecutor;
+import org.apache.phoenix.pherf.workload.Workload;
 import org.apache.phoenix.pherf.workload.WorkloadExecutor;
-
-import org.apache.commons.cli.*;
+import org.apache.phoenix.pherf.workload.WriteWorkload;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Properties;
 
 public class Pherf {
     private static final Logger logger = LoggerFactory.getLogger(Pherf.class);
     private static final Options options = new Options();
+    private final PhoenixUtil phoenixUtil = PhoenixUtil.create();
 
     static {
+        options.addOption("disableSchemaApply", false, "Set to disable schema from being applied.");
+        options.addOption("z", "zookeeper", true,
+                "HBase Zookeeper address for connection. Default: localhost");
+        options.addOption("q", "query", false, "Executes multi-threaded query sets");
+        options.addOption("listFiles", false, "List available resource files");
+        options.addOption("l", "load", false,
+                "Pre-loads data according to specified configuration values.");
+        options.addOption("scenarioFile", true,
+                "Regex or file name for the Test Scenario configuration .xml file to use.");
+        options.addOption("drop", true, "Regex drop all tables with schema name as PHERF. "
+                + "\nExample drop Event tables: -drop .*(EVENT).* Drop all: -drop .* or -drop all");
+        options.addOption("schemaFile", true,
+                "Regex or file name for the Test phoenix table schema .sql to use.");
         options.addOption("m", "monitor", false, "Launch the stats profilers");
-        options.addOption("monitorFrequency", true, "Override for frequency in Ms for which monitor should log stats. " +
-                "\n See pherf.default.monitorFrequency in pherf.properties");
-        options.addOption("d", "debug", false, "Put tool in debug mode");
-        options.addOption("z", "zookeeper", true, "HBase Zookeeper address for connection. Default: localhost");
-        options.addOption("l", "load", false, "Loads data according to specified configuration values.");
-        options.addOption("scenarioFile", true, "Regex or file name for the Test Scenario configuration .xml file to use.");
-        options.addOption("drop", true, "Regex drop all tables with schema name as PHERF. " +
-                "\nExample drop Event tables: -drop .*(EVENT).* Drop all: -drop .* or -drop all");
-        options.addOption("schemaFile", true, "Regex or file name for the Test phoenix table schema .sql to use.");
-        options.addOption("rowCountOverride", true, "Row count override to use instead of one specified in scenario.");
+        options.addOption("monitorFrequency", true,
+                "Override for frequency in Ms for which monitor should log stats. "
+                        + "\n See pherf.default.monitorFrequency in pherf.properties");
+        options.addOption("rowCountOverride", true,
+                "Row count override to use instead of one specified in scenario.");
         options.addOption("hint", true, "Executes all queries with specified hint. Example SMALL");
-        options.addOption("diff", false, "Run pherf in verification mode and diff with exported results");
-        options.addOption("export", false, "Exports query results to CSV files in " + PherfConstants.EXPORT_DIR + " directory");
-        options.addOption("listFiles", false, "List available resource files");
-        options.addOption("writerThreadSize", true, "Override the default number of writer threads. " +
-                "See pherf.default.dataloader.threadpool in Pherf.properties.");
-        options.addOption("q", "query", false, "Executes multi-threaded query sets");
+        options.addOption("diff", false,
+                "Run pherf in verification mode and diff with exported results");
+        options.addOption("export", false,
+                "Exports query results to CSV files in " + PherfConstants.EXPORT_DIR
+                        + " directory");
+        options.addOption("writerThreadSize", true,
+                "Override the default number of writer threads. "
+                        + "See pherf.default.dataloader.threadpool in Pherf.properties.");
         options.addOption("h", "help", false, "Get help on using this utility.");
+        options.addOption("d", "debug", false, "Put tool in debug mode");
     }
 
     private final String zookeeper;
@@ -63,14 +80,15 @@ public class Pherf {
     private final String schemaFile;
     private final String queryHint;
     private final Properties properties;
-    private final boolean loadData;
+    private final boolean preLoadData;
     private final String dropPherfTablesRegEx;
     private final boolean executeQuerySets;
     private final boolean exportCSV;
     private final boolean diff;
     private final boolean monitor;
     private final int rowCountOverride;
-    private  final boolean listFiles;
+    private final boolean listFiles;
+    private final boolean applySchema;
 
     public Pherf(String[] args) throws Exception {
         CommandLineParser parser = new PosixParser();
@@ -87,30 +105,35 @@ public class Pherf {
         properties = PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES);
         dropPherfTablesRegEx = command.getOptionValue("drop", null);
         monitor = command.hasOption("m");
-        String monitorFrequency = (command.hasOption("m") && command.hasOption("monitorFrequency"))
-                ? command.getOptionValue("monitorFrequency")
-                : properties.getProperty("pherf.default.monitorFrequency");
+        String
+                monitorFrequency =
+                (command.hasOption("m") && command.hasOption("monitorFrequency")) ?
+                        command.getOptionValue("monitorFrequency") :
+                        properties.getProperty("pherf.default.monitorFrequency");
         properties.setProperty("pherf.default.monitorFrequency", monitorFrequency);
 
         logger.debug("Using Monitor: " + monitor);
         logger.debug("Monitor Frequency Ms:" + monitorFrequency);
-        loadData = command.hasOption("l");
+        preLoadData = command.hasOption("l");
         executeQuerySets = command.hasOption("q");
         zookeeper = command.getOptionValue("z", "localhost");
         queryHint = command.getOptionValue("hint", null);
         exportCSV = command.hasOption("export");
         diff = command.hasOption("diff");
         listFiles = command.hasOption("listFiles");
-        scenarioFile = command.hasOption("scenarioFile") ? command.getOptionValue("scenarioFile") : null;
+        applySchema = !command.hasOption("disableSchemaApply");
+        scenarioFile =
+                command.hasOption("scenarioFile") ? command.getOptionValue("scenarioFile") : null;
         schemaFile = command.hasOption("schemaFile") ? command.getOptionValue("schemaFile") : null;
         rowCountOverride = Integer.parseInt(command.getOptionValue("rowCountOverride", "0"));
-        String writerThreadPoolSize = command.getOptionValue("writerThreadSize",
-                properties.getProperty("pherf.default.dataloader.threadpool"));
+        String
+                writerThreadPoolSize =
+                command.getOptionValue("writerThreadSize",
+                        properties.getProperty("pherf.default.dataloader.threadpool"));
         properties.setProperty("pherf. default.dataloader.threadpool", writerThreadPoolSize);
 
-
-        if ((command.hasOption("h") || (args == null || args.length == 0))
-                && !command.hasOption("listFiles")) {
+        if ((command.hasOption("h") || (args == null || args.length == 0)) && !command
+                .hasOption("listFiles")) {
             hf.printHelp("Pherf", options);
             System.exit(1);
         }
@@ -128,17 +151,22 @@ public class Pherf {
     }
 
     public void run() throws Exception {
-        WorkloadExecutor workloadExec = null;
+        MonitorManager monitorManager = null;
+        List<Workload> workloads = new ArrayList<>();
+        WorkloadExecutor workloadExecutor = new WorkloadExecutor(properties, workloads);
         try {
             if (listFiles) {
                 ResourceList list = new ResourceList(PherfConstants.RESOURCE_DATAMODEL);
-                Collection<Path> schemaFiles = list.getResourceList(PherfConstants.SCHEMA_ROOT_PATTERN + ".sql");
+                Collection<Path>
+                        schemaFiles =
+                        list.getResourceList(PherfConstants.SCHEMA_ROOT_PATTERN + ".sql");
                 System.out.println("Schema Files:");
                 for (Path path : schemaFiles) {
                     System.out.println(path);
                 }
                 list = new ResourceList(PherfConstants.RESOURCE_SCENARIO);
-                Collection<Path> scenarioFiles =
+                Collection<Path>
+                        scenarioFiles =
                         list.getResourceList(PherfConstants.SCENARIO_ROOT_PATTERN + ".xml");
                 System.out.println("Scenario Files:");
                 for (Path path : scenarioFiles) {
@@ -146,49 +174,86 @@ public class Pherf {
                 }
                 return;
             }
-            workloadExec = (scenarioFile == null)
-                    ? new WorkloadExecutor(properties,
-                    new XMLConfigParser(PherfConstants.DEFAULT_FILE_PATTERN),
-                    monitor)
-                    : new WorkloadExecutor(properties,
-                    new XMLConfigParser(scenarioFile),
-                    monitor);
+            XMLConfigParser parser = new XMLConfigParser(scenarioFile);
 
             // Drop tables with PHERF schema and regex comparison
             if (null != dropPherfTablesRegEx) {
-                logger.info("\nDropping existing table with PHERF namename and "
-                        + dropPherfTablesRegEx + " regex expression.");
-                new PhoenixUtil().deleteTables(dropPherfTablesRegEx);
+                logger.info(
+                        "\nDropping existing table with PHERF namename and " + dropPherfTablesRegEx
+                                + " regex expression.");
+                phoenixUtil.deleteTables(dropPherfTablesRegEx);
             }
 
-            // Schema and Data Load
-            if (loadData) {
+            if (monitor) {
+                monitorManager =
+                        new MonitorManager(Integer.parseInt(
+                                properties.getProperty("pherf.default.monitorFrequency")));
+                workloadExecutor.add(monitorManager);
+            }
+
+            if (applySchema) {
                 logger.info("\nStarting to apply schema...");
-                SchemaReader reader = (schemaFile == null)
-                        ? new SchemaReader(".*.sql")
-                        : new SchemaReader(schemaFile);
+                SchemaReader
+                        reader =
+                        (schemaFile == null) ?
+                                new SchemaReader(".*.sql") :
+                                new SchemaReader(schemaFile);
                 reader.applySchema();
+            }
 
+            // Schema and Data Load
+            if (preLoadData) {
                 logger.info("\nStarting Data Load...");
-                workloadExec.executeDataLoad();
+                WriteWorkload workload = new WriteWorkload(parser);
+                workloadExecutor.add(workload);
+
+                // Wait for dataLoad to complete
+                workloadExecutor.get(workload);
 
                 logger.info("\nGenerate query gold files after data load");
-                workloadExec.executeMultithreadedQueryExecutor(queryHint, true, PherfConstants.RunMode.FUNCTIONAL);
+                QueryExecutor
+                        goldFileGenerator =
+                        new QueryExecutor(parser, phoenixUtil, workloadExecutor.getPool(),
+                                parser.getDataModels(), queryHint, true,
+                                PherfConstants.RunMode.FUNCTIONAL);
+                workloadExecutor
+                        .add(goldFileGenerator);
+
+                // Wait for dataLoad to complete
+                workloadExecutor.get(goldFileGenerator);
             } else {
-                logger.info("\nSKIPPED: Data Load and schema creation as -l argument not specified");
+                logger.info(
+                        "\nSKIPPED: Data Load and schema creation as -l argument not specified");
             }
 
             // Execute multi-threaded query sets
             if (executeQuerySets) {
-                logger.info("\nStarting to apply schema...");
-                workloadExec.executeMultithreadedQueryExecutor(queryHint, exportCSV, diff ? PherfConstants.RunMode.FUNCTIONAL : PherfConstants.RunMode.PERFORMANCE);
+                logger.info("\nStarting to apply Execute Queries...");
+
+                workloadExecutor
+                        .add(new QueryExecutor(parser, phoenixUtil, workloadExecutor.getPool(),
+                                parser.getDataModels(), queryHint, exportCSV, diff ?
+                                PherfConstants.RunMode.FUNCTIONAL :
+                                PherfConstants.RunMode.PERFORMANCE));
+
             } else {
-                logger.info("\nSKIPPED: Multithreaded query set execution as -q argument not specified");
+                logger.info(
+                        "\nSKIPPED: Multithreaded query set execution as -q argument not specified");
+            }
+
+            // Clean up the monitor explicitly
+            if (monitorManager != null) {
+                logger.info("Run completed. Shutting down Monitor.");
+                monitorManager.complete();
             }
+
+            // Collect any final jobs
+            workloadExecutor.get();
+
         } finally {
-            if (workloadExec != null) {
-                logger.info("Run completed. Shutting down Monitor if it was running.");
-                workloadExec.shutdown();
+            if (workloadExecutor != null) {
+                logger.info("Run completed. Shutting down thread pool.");
+                workloadExecutor.shutdown();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/PherfConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/PherfConstants.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/PherfConstants.java
index 493f5a8..e060e53 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/PherfConstants.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/PherfConstants.java
@@ -28,14 +28,13 @@ public class PherfConstants {
     public static final int DEFAULT_THREAD_POOL_SIZE = 10;
     public static final int DEFAULT_BATCH_SIZE = 1000;
     public static final String DEFAULT_DATE_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";
-    public static final String DEFAULT_FILE_PATTERN = ".*scenario.xml";
     public static final String RESOURCE_SCENARIO = "/scenario";
     public static final String
             SCENARIO_ROOT_PATTERN =
             ".*" + PherfConstants.RESOURCE_SCENARIO.substring(1) + ".*";
     public static final String SCHEMA_ROOT_PATTERN = ".*";
     public static final String PHERF_PROPERTIES = "pherf.properties";
-//    public static final String RESULT_DIR = "RESULTS";
+
     public static final String EXPORT_DIR = "CSV_EXPORT";
     public static final String RESULT_PREFIX = "RESULT_";
     public static final String PATH_SEPARATOR = "/";
@@ -51,6 +50,7 @@ public class PherfConstants {
 
     public static final String PHERF_SCHEMA_NAME = "PHERF";
 
+    // TODO MOve to properties
     // log out data load per n rows
     public static final int LOG_PER_NROWS = 1000000;
     public static final String COMBINED_FILE_NAME = "COMBINED";
@@ -86,7 +86,9 @@ public class PherfConstants {
         InputStream is = null;
         try {
             is = getClass().getClassLoader().getResourceAsStream(fileName);
-            properties.load(is);
+            if (is != null) {
+                properties.load(is);
+            }
         } finally {
             if (is != null) {
                 is.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java
index 25c0df1..8eb42ff 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java
@@ -26,7 +26,6 @@ import java.util.List;
 
 @XmlRootElement(name = "datamodel")
 public class DataModel {
-    private String release;
     private String name;
     private List<Scenario> scenarios;
     private List<Column> dataMappingColumns;
@@ -34,15 +33,6 @@ public class DataModel {
     public DataModel() {
     }
 
-    public String getRelease() {
-        return this.release;
-    }
-
-    @XmlAttribute()
-    public void setRelease(String release) {
-        this.release = release;
-    }
-
     public List<Scenario> getScenarios() {
         return scenarios;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
index d2f113a..7de96cc 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
@@ -34,10 +34,12 @@ public class Scenario {
     private int rowCount;
     private Map<String, String> phoenixProperties;
     private DataOverride dataOverride;
-    private List<QuerySet> querySet = new ArrayList<QuerySet>();
+    private List<QuerySet> querySet = new ArrayList<>();
+    private WriteParams writeParams;
     private String name;
 
     public Scenario() {
+        writeParams = new WriteParams();
     }
 
     /**
@@ -161,6 +163,14 @@ public class Scenario {
         this.name = name;
     }
 
+    public WriteParams getWriteParams() {
+        return writeParams;
+    }
+
+    public void setWriteParams(WriteParams writeParams) {
+        this.writeParams = writeParams;
+    }
+
     @Override
     public String toString() {
         StringBuilder stringBuilder = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/WriteParams.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/WriteParams.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/WriteParams.java
new file mode 100644
index 0000000..04be239
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/WriteParams.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.configuration;
+
+import javax.xml.bind.annotation.XmlAttribute;
+
+public class WriteParams {
+    private int writerThreadCount;
+    private long threadSleepDuration;
+    private long batchSize;
+    private long executionDurationInMs;
+
+    public WriteParams() {
+        this.batchSize = Long.MIN_VALUE;
+        this.writerThreadCount = Integer.MIN_VALUE;
+        this.threadSleepDuration = Long.MIN_VALUE;
+        this.executionDurationInMs = Long.MAX_VALUE;
+    }
+
+    public long getThreadSleepDuration() {
+        return threadSleepDuration;
+    }
+
+    @SuppressWarnings("unused")
+    public void setThreadSleepDuration(long threadSleepDuration) {
+        this.threadSleepDuration = threadSleepDuration;
+    }
+
+    public long getBatchSize() {
+        return batchSize;
+    }
+
+    @SuppressWarnings("unused")
+    public void setBatchSize(long batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public int getWriterThreadCount() {
+        return writerThreadCount;
+    }
+
+    @SuppressWarnings("unused")
+    public void setWriterThreadCount(int writerThreadCount) {
+        this.writerThreadCount = writerThreadCount;
+    }
+
+    @XmlAttribute()
+    public long getExecutionDurationInMs() {
+        return executionDurationInMs;
+    }
+
+    @SuppressWarnings("unused")
+    public void setExecutionDurationInMs(long executionDurationInMs) {
+        this.executionDurationInMs = executionDurationInMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java
index 9b5a9e9..393fa7e 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java
@@ -52,6 +52,24 @@ public class XMLConfigParser {
         return dataModels;
     }
 
+    public DataModel getDataModelByName(String name) {
+        for (DataModel dataModel : getDataModels()) {
+            if (dataModel.getName().equals(name)) {
+                return dataModel;
+            }
+        }
+        return null;
+    }
+
+    public Scenario getScenarioByName(String name) throws Exception {
+        for (Scenario scenario : getScenarios()) {
+            if (scenario.getName().equals(name)) {
+                return scenario;
+            }
+        }
+        return null;
+    }
+
     public synchronized Collection<Path> getPaths(String strPattern) throws Exception {
         if (paths != null) {
             return paths;
@@ -87,7 +105,8 @@ public class XMLConfigParser {
      * Unmarshall an XML data file
      *
      * @param file Name of File
-     * @return
+     * @return {@link org.apache.phoenix.pherf.configuration.DataModel} Returns DataModel from
+     * XML configuration
      * @throws JAXBException
      */
     // TODO Remove static calls
@@ -151,8 +170,6 @@ public class XMLConfigParser {
     }
 
     private Collection<Path> getResources(String pattern) throws Exception {
-        Collection<Path> resourceFiles = new ArrayList<Path>();
-        resourceFiles = resourceList.getResourceList(pattern);
-        return resourceFiles;
+        return resourceList.getResourceList(pattern);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java
index 6f97551..5b39b2b 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java
@@ -21,48 +21,54 @@ package org.apache.phoenix.pherf.jmx;
 import org.apache.phoenix.pherf.PherfConstants;
 import org.apache.phoenix.pherf.exception.FileLoaderRuntimeException;
 import org.apache.phoenix.pherf.jmx.monitors.Monitor;
-import org.apache.phoenix.pherf.result.file.ResultFileDetails;
-import org.apache.phoenix.pherf.result.impl.CSVResultHandler;
 import org.apache.phoenix.pherf.result.Result;
 import org.apache.phoenix.pherf.result.ResultHandler;
+import org.apache.phoenix.pherf.result.file.ResultFileDetails;
+import org.apache.phoenix.pherf.result.impl.CSVResultHandler;
+import org.apache.phoenix.pherf.workload.Workload;
 import org.apache.phoenix.util.DateUtil;
 
-import javax.management.*;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * This class starts JMX stats for the configured monitors. Monitors should be configured in MonitorDetails Enum.
+ * This class starts JMX stats for the configured monitors.
+ * Monitors should be configured in MonitorDetails Enum.
  * Each stat implements {@link org.apache.phoenix.pherf.jmx.monitors.Monitor}.
  *
- * For the duration of any Pherf run, when the configured {@link org.apache.phoenix.pherf.PherfConstants#MONITOR_FREQUENCY}
- * is reached a snapshot of each monitor is taken and dumped out to a log file.
+ * For the duration of any Pherf run, when the configured
+ * {@link org.apache.phoenix.pherf.PherfConstants#MONITOR_FREQUENCY} is reached a snapshot of
+ * each monitor is taken and dumped out to a log file.
  */
-public class MonitorManager implements Runnable {
+public class MonitorManager implements Workload {
     // List of MonitorDetails for all the running monitors.
     // TODO Move this out to config. Possible use Guice and use IOC to inject it in.
-    private static final List<MonitorDetails> MONITOR_DETAILS_LIST =
+    private static final List<MonitorDetails>
+            MONITOR_DETAILS_LIST =
             Arrays.asList(MonitorDetails.values());
     private final ResultHandler resultHandler;
-    private final long monitorFrequency;
-    private AtomicLong rowCount;
-    private volatile boolean shouldStop = false;
-    private volatile boolean isRunning = false;
+    private final AtomicLong monitorFrequency;
+    private final AtomicLong rowCount;
+    private final AtomicBoolean shouldStop = new AtomicBoolean(false);
+    private final AtomicBoolean isRunning = new AtomicBoolean(false);
 
-    @SuppressWarnings("unused")
-    public MonitorManager() throws Exception {
+    @SuppressWarnings("unused") public MonitorManager() throws Exception {
         this(PherfConstants.MONITOR_FREQUENCY);
     }
 
     /**
-     *
      * @param monitorFrequency Frequency at which monitor stats are written to a log file.
      * @throws Exception
      */
     public MonitorManager(long monitorFrequency) throws Exception {
-        this.monitorFrequency = monitorFrequency;
+        this.monitorFrequency = new AtomicLong(monitorFrequency);
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 
         // Register all the monitors to JMX
@@ -77,74 +83,87 @@ public class MonitorManager implements Runnable {
             }
         }
         rowCount = new AtomicLong(0);
-        this.resultHandler = new CSVResultHandler(PherfConstants.MONITOR_FILE_NAME, ResultFileDetails.CSV);
+        this.resultHandler =
+                new CSVResultHandler(PherfConstants.MONITOR_FILE_NAME, ResultFileDetails.CSV);
     }
 
-    @Override
-    public void run() {
-        try {
-            while (!shouldStop()) {
-                isRunning = true;
-                List rowValues = new ArrayList<String>();
-                synchronized (resultHandler) {
-                    for (MonitorDetails monitorDetails : MONITOR_DETAILS_LIST) {
-                        rowValues.clear();
-                        try {
-                            StandardMBean bean = new StandardMBean(monitorDetails.getMonitor(), Monitor.class);
-
-                            Calendar calendar = new GregorianCalendar();
-                            rowValues.add(monitorDetails);
-
-                            rowValues.add(((Monitor) bean.getImplementation()).getStat());
-                            rowValues.add(DateUtil.DEFAULT_MS_DATE_FORMATTER.format(calendar.getTime()));
-                            Result
-                                    result = new Result(ResultFileDetails.CSV, ResultFileDetails.CSV_MONITOR.getHeader().toString(), rowValues);
-                            resultHandler.write(result);
-                        } catch (Exception e) {
-                            throw new FileLoaderRuntimeException("Could not log monitor result.", e);
+    @Override public synchronized void complete() {
+        this.shouldStop.set(true);
+    }
+
+    @Override public Runnable execute() {
+        return new Runnable() {
+            @Override public void run() {
+                try {
+                    while (!shouldStop()) {
+                        isRunning.set(true);
+                        List rowValues = new ArrayList<String>();
+                        synchronized (resultHandler) {
+                            for (MonitorDetails monitorDetails : MONITOR_DETAILS_LIST) {
+                                rowValues.clear();
+                                try {
+                                    StandardMBean
+                                            bean =
+                                            new StandardMBean(monitorDetails.getMonitor(),
+                                                    Monitor.class);
+
+                                    Calendar calendar = new GregorianCalendar();
+                                    rowValues.add(monitorDetails);
+
+                                    rowValues.add(((Monitor) bean.getImplementation()).getStat());
+                                    rowValues.add(DateUtil.DEFAULT_MS_DATE_FORMATTER
+                                            .format(calendar.getTime()));
+                                    Result
+                                            result =
+                                            new Result(ResultFileDetails.CSV,
+                                                    ResultFileDetails.CSV_MONITOR.getHeader()
+                                                            .toString(), rowValues);
+                                    resultHandler.write(result);
+                                } catch (Exception e) {
+                                    throw new FileLoaderRuntimeException(
+                                            "Could not log monitor result.", e);
+                                }
+                                rowCount.getAndIncrement();
+                            }
+                            try {
+                                resultHandler.flush();
+                                Thread.sleep(getMonitorFrequency());
+                            } catch (Exception e) {
+                                Thread.currentThread().interrupt();
+                                e.printStackTrace();
+                            }
                         }
-                        rowCount.getAndIncrement();
                     }
+                } finally {
                     try {
-                        resultHandler.flush();
-                        Thread.sleep(getMonitorFrequency());
+                        isRunning.set(false);
+                        if (resultHandler != null) {
+                            resultHandler.close();
+                        }
                     } catch (Exception e) {
-                        Thread.currentThread().interrupt();
-                        e.printStackTrace();
+                        throw new FileLoaderRuntimeException("Could not close monitor results.", e);
                     }
                 }
             }
-        } finally {
-            try {
-                isRunning = false;
-                if (resultHandler != null) {
-                    resultHandler.close();
-                }
-            } catch (Exception e) {
-                throw new FileLoaderRuntimeException("Could not close monitor results.", e);
-            }
-        }
-
+        };
     }
 
     public long getMonitorFrequency() {
-        return monitorFrequency;
-    }
-
-    public synchronized boolean shouldStop() {
-        return shouldStop;
+        return monitorFrequency.get();
     }
 
-    public synchronized void stop() {
-        this.shouldStop = true;
+    public boolean shouldStop() {
+        return shouldStop.get();
     }
 
-    public synchronized long getRowCount() {
+    // Convenience method for testing.
+    @SuppressWarnings("unused")
+    public long getRowCount() {
         return rowCount.get();
     }
 
-    public synchronized boolean isRunning() {
-        return isRunning;
+    public boolean isRunning() {
+        return isRunning.get();
     }
 
     /**
@@ -157,7 +176,9 @@ public class MonitorManager implements Runnable {
         ResultHandler handler = null;
         try {
             if (resultHandler.isClosed()) {
-                handler = new CSVResultHandler(PherfConstants.MONITOR_FILE_NAME, ResultFileDetails.CSV);
+                handler =
+                        new CSVResultHandler(PherfConstants.MONITOR_FILE_NAME,
+                                ResultFileDetails.CSV);
                 return handler.read();
             } else {
                 return resultHandler.read();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/loaddata/DataLoader.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/loaddata/DataLoader.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/loaddata/DataLoader.java
deleted file mode 100644
index c521822..0000000
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/loaddata/DataLoader.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.phoenix.pherf.loaddata;
-
-import java.math.BigDecimal;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Types;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.phoenix.pherf.result.ResultUtil;
-import org.apache.phoenix.pherf.util.ResourceList;
-import org.apache.phoenix.pherf.util.RowCalculator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.phoenix.pherf.PherfConstants;
-import org.apache.phoenix.pherf.configuration.Column;
-import org.apache.phoenix.pherf.configuration.DataModel;
-import org.apache.phoenix.pherf.configuration.Scenario;
-import org.apache.phoenix.pherf.configuration.XMLConfigParser;
-import org.apache.phoenix.pherf.exception.PherfException;
-import org.apache.phoenix.pherf.result.DataLoadThreadTime;
-import org.apache.phoenix.pherf.result.DataLoadTimeSummary;
-import org.apache.phoenix.pherf.rules.DataValue;
-import org.apache.phoenix.pherf.rules.RulesApplier;
-import org.apache.phoenix.pherf.util.PhoenixUtil;
-
-public class DataLoader {
-    private static final Logger logger = LoggerFactory.getLogger(DataLoader.class);
-    private final PhoenixUtil pUtil;
-    private final XMLConfigParser parser;
-    private final RulesApplier rulesApplier;
-    private final ResultUtil resultUtil;
-    private final ExecutorService pool;
-
-    private final int threadPoolSize;
-    private final int batchSize;
-
-    public DataLoader(XMLConfigParser parser) throws Exception {
-        this(new PhoenixUtil(), parser);
-    }
-
-    public DataLoader(PhoenixUtil phoenixUtil, XMLConfigParser parser) throws Exception{
-        this(phoenixUtil, PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES), parser);
-    }
-
-    /**
-     * Default the writers to use up all available cores for threads.
-     *
-     * @param parser
-     * @throws Exception
-     */
-    public DataLoader(PhoenixUtil phoenixUtil, Properties properties, XMLConfigParser parser) throws Exception {
-        this.pUtil = phoenixUtil;
-        this.parser = parser;
-        this.rulesApplier = new RulesApplier(parser);
-        this.resultUtil = new ResultUtil();
-        int size = Integer.parseInt(properties.getProperty("pherf.default.dataloader.threadpool"));
-        this.threadPoolSize = (size == 0) ? Runtime.getRuntime().availableProcessors() : size;
-        this.pool = Executors.newFixedThreadPool(this.threadPoolSize);
-        String bSize = properties.getProperty("pherf.default.dataloader.batchsize");
-        this.batchSize = (bSize == null) ? PherfConstants.DEFAULT_BATCH_SIZE : Integer.parseInt(bSize);
-    }
-
-    public void execute() throws Exception {
-        try {
-            DataLoadTimeSummary dataLoadTimeSummary = new DataLoadTimeSummary();
-            DataLoadThreadTime dataLoadThreadTime = new DataLoadThreadTime();
-
-            for (Scenario scenario : getParser().getScenarios()) {
-                List<Future> writeBatches = new ArrayList<Future>();
-                logger.info("\nLoading " + scenario.getRowCount()
-                        + " rows for " + scenario.getTableName());
-                long start = System.currentTimeMillis();
-
-                RowCalculator rowCalculator = new RowCalculator(getThreadPoolSize(), scenario.getRowCount());
-                for (int i = 0; i < getThreadPoolSize(); i++) {
-                    List<Column> phxMetaCols = pUtil.getColumnsFromPhoenix(
-                            scenario.getSchemaName(),
-                            scenario.getTableNameWithoutSchemaName(),
-                            pUtil.getConnection());
-                    int threadRowCount = rowCalculator.getNext();
-                    logger.info("Kick off thread (#" + i + ")for upsert with (" + threadRowCount + ") rows.");
-                    Future<Info> write = upsertData(scenario, phxMetaCols,
-                            scenario.getTableName(), threadRowCount, dataLoadThreadTime);
-                    writeBatches.add(write);
-                }
-
-                if (writeBatches.isEmpty()) {
-                    throw new PherfException(
-                            "Holy shit snacks! Throwing up hands in disbelief and exiting. Could not write data for some unknown reason.");
-                }
-
-                int sumRows = 0, sumDuration = 0;
-                // Wait for all the batch threads to complete
-                for (Future<Info> write : writeBatches) {
-                    Info writeInfo = write.get();
-                    sumRows += writeInfo.getRowCount();
-                    sumDuration += writeInfo.getDuration();
-                    logger.info("Executor writes complete with row count ("
-                                    + writeInfo.getRowCount()
-                                    + ") in Ms ("
-                                    + writeInfo.getDuration() + ")");
-                }
-                logger.info("Writes completed with total row count (" + sumRows
-                        + ") with total time of(" + sumDuration + ") Ms");
-                dataLoadTimeSummary.add(scenario.getTableName(), sumRows, (int) (System.currentTimeMillis() - start));
-
-
-                // always update stats for Phoenix base tables
-                updatePhoenixStats(scenario.getTableName());
-            }
-            resultUtil.write(dataLoadTimeSummary);
-            resultUtil.write(dataLoadThreadTime);
-
-        } finally {
-            pool.shutdown();
-        }
-    }
-
-    /**
-     * TODO Move this method to PhoenixUtil
-     * Update Phoenix table stats
-     *
-     * @param tableName
-     * @throws Exception
-     */
-    public void updatePhoenixStats(String tableName) throws Exception {
-        logger.info("Updating stats for " + tableName);
-        pUtil.executeStatement("UPDATE STATISTICS " + tableName);
-    }
-
-    public Future<Info> upsertData(final Scenario scenario,
-                                   final List<Column> columns, final String tableName,
-                                   final int rowCount, final DataLoadThreadTime dataLoadThreadTime) {
-        Future<Info> future = pool.submit(new Callable<Info>() {
-            @Override
-            public Info call() throws Exception {
-                int rowsCreated = 0;
-                Info info = null;
-                long start = 0, duration = 0, totalDuration = 0;
-                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-                Connection connection = null;
-                try {
-                    connection = pUtil.getConnection();
-                    long logStartTime = System.currentTimeMillis();
-                    for (int i = 0; i < rowCount; i++) {
-                        String sql = buildSql(columns, tableName);
-                        PreparedStatement stmt = connection
-                                .prepareStatement(sql);
-                        stmt = buildStatement(scenario, columns, stmt, simpleDateFormat);
-                        start = System.currentTimeMillis();
-                        rowsCreated += stmt.executeUpdate();
-                        stmt.close();
-                        if ((i % getBatchSize()) == 0) {
-                            connection.commit();
-                            duration = System.currentTimeMillis() - start;
-                            logger.info("Committed Batch. Total " + tableName + " rows for this thread (" + this.hashCode() + ") in ("
-                                    + duration + ") Ms");
-
-                            if (i % PherfConstants.LOG_PER_NROWS == 0 && i != 0) {
-                                dataLoadThreadTime.add(tableName, Thread.currentThread().getName(), i, System.currentTimeMillis() - logStartTime);
-                                logStartTime = System.currentTimeMillis();
-                            }
-                        }
-                    }
-                } finally {
-                    if (connection != null) {
-                        try {
-                            connection.commit();
-                            duration = System.currentTimeMillis() - start;
-                            logger.info("Committed Final Batch. Duration (" + duration + ") Ms");
-                            connection.close();
-                        } catch (SQLException e) {
-                            // Swallow since we are closing anyway
-                            e.printStackTrace();
-                        }
-                    }
-                }
-                totalDuration = System.currentTimeMillis() - start;
-                return new Info(totalDuration, rowsCreated);
-            }
-        });
-        return future;
-    }
-
-    private PreparedStatement buildStatement(Scenario scenario,
-                                             List<Column> columns, PreparedStatement statement, SimpleDateFormat simpleDateFormat) throws Exception {
-        int count = 1;
-        for (Column column : columns) {
-
-            DataValue dataValue = getRulesApplier().getDataForRule(scenario,
-                    column);
-            switch (column.getType()) {
-                case VARCHAR:
-                    if (dataValue.getValue().equals("")) {
-                        statement.setNull(count, Types.VARCHAR);
-                    } else {
-                        statement.setString(count, dataValue.getValue());
-                    }
-                    break;
-                case CHAR:
-                    if (dataValue.getValue().equals("")) {
-                        statement.setNull(count, Types.CHAR);
-                    } else {
-                        statement.setString(count, dataValue.getValue());
-                    }
-                    break;
-                case DECIMAL:
-                    if (dataValue.getValue().equals("")) {
-                        statement.setNull(count, Types.DECIMAL);
-                    } else {
-                        statement.setBigDecimal(count,
-                                new BigDecimal(dataValue.getValue()));
-                    }
-                    break;
-                case INTEGER:
-                    if (dataValue.getValue().equals("")) {
-                        statement.setNull(count, Types.INTEGER);
-                    } else {
-                        statement.setInt(count,
-                                Integer.parseInt(dataValue.getValue()));
-                    }
-                    break;
-                case DATE:
-                    if (dataValue.getValue().equals("")) {
-                        statement.setNull(count, Types.DATE);
-                    } else {
-                        Date date = new java.sql.Date(simpleDateFormat.parse(dataValue.getValue()).getTime());
-                        statement.setDate(count, date);
-                    }
-                    break;
-                default:
-                    break;
-            }
-            count++;
-        }
-        return statement;
-    }
-
-    private String buildSql(final List<Column> columns, final String tableName) {
-        StringBuilder builder = new StringBuilder();
-        builder.append("upsert into ");
-        builder.append(tableName);
-        builder.append(" (");
-        int count = 1;
-        for (Column column : columns) {
-            builder.append(column.getName());
-            if (count < columns.size()) {
-                builder.append(",");
-            } else {
-                builder.append(")");
-            }
-            count++;
-        }
-        builder.append(" VALUES (");
-        for (int i = 0; i < columns.size(); i++) {
-            if (i < columns.size() - 1) {
-                builder.append("?,");
-            } else {
-                builder.append("?)");
-            }
-        }
-        return builder.toString();
-    }
-
-    public XMLConfigParser getParser() {
-        return parser;
-    }
-
-    public RulesApplier getRulesApplier() {
-        return rulesApplier;
-    }
-
-    public int getBatchSize() {
-        return batchSize;
-    }
-
-    public int getThreadPoolSize() {
-        return threadPoolSize;
-    }
-
-    private class Info {
-
-        private final int rowCount;
-        private final long duration;
-
-        public Info(long duration, int rows) {
-            this(0, 0, 0, duration, rows);
-        }
-
-        public Info(int regionSize, int completedIterations, int timesSeen,
-                    long duration, int rows) {
-            this.duration = duration;
-            this.rowCount = rows;
-        }
-
-        public long getDuration() {
-            return duration;
-        }
-
-        public int getRowCount() {
-            return rowCount;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java
index 23dcdd5..e5553cc 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java
@@ -18,61 +18,68 @@
 
 package org.apache.phoenix.pherf.result;
 
+import org.apache.phoenix.pherf.PherfConstants;
+
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.phoenix.pherf.PherfConstants;
-
 public class DataLoadThreadTime {
-	private List<WriteThreadTime> threadTime = new ArrayList<WriteThreadTime>();
+    private List<WriteThreadTime> threadTime = new ArrayList<WriteThreadTime>();
+
+    public List<WriteThreadTime> getThreadTime() {
+        return threadTime;
+    }
 
-	public List<WriteThreadTime> getThreadTime() {
-		return threadTime;
-	}
+    public void add(String tableName, String threadName, long rowsUpserted,
+            long timeInMsPerMillionRows) {
+        threadTime.add(new WriteThreadTime(tableName, threadName, rowsUpserted,
+                timeInMsPerMillionRows));
+    }
 
-	public void add(String tableName, String threadName, int rowsUpserted, long timeInMsPerMillionRows) {
-		threadTime.add(new WriteThreadTime(tableName, threadName, rowsUpserted, timeInMsPerMillionRows));	
-	}
-	
-	public String getCsvTitle() {
-		return "TABLE_NAME,THREAD_NAME,ROWS_UPSERTED,TIME_IN_MS_PER_" + PherfConstants.LOG_PER_NROWS + "_ROWS\n";
-	}
+    public String getCsvTitle() {
+        return "TABLE_NAME,THREAD_NAME,ROWS_UPSERTED,TIME_IN_MS_PER_" + PherfConstants.LOG_PER_NROWS
+                + "_ROWS\n";
+    }
 }
 
 class WriteThreadTime {
-	private String tableName;
-	private String threadName;
-	private int rowsUpserted;
-	private long timeInMsPerMillionRows;
-	
-	public WriteThreadTime(String tableName, String threadName, int rowsUpserted, long timeInMsPerMillionRows) {
-		this.tableName = tableName;
-		this.threadName = threadName;
-		this.rowsUpserted = rowsUpserted;
-		this.timeInMsPerMillionRows = timeInMsPerMillionRows;
-	}
-	
-	public String getTableName() {
-		return tableName;
-	}
-	public String getThreadName() {
-		return threadName;
-	}
-	public long getTimeInMsPerMillionRows() {
-		return timeInMsPerMillionRows;
-	}
+    private String tableName;
+    private String threadName;
+    private long rowsUpserted;
+    private long timeInMsPerMillionRows;
+
+    public WriteThreadTime(String tableName, String threadName, long rowsUpserted,
+            long timeInMsPerMillionRows) {
+        this.tableName = tableName;
+        this.threadName = threadName;
+        this.rowsUpserted = rowsUpserted;
+        this.timeInMsPerMillionRows = timeInMsPerMillionRows;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public String getThreadName() {
+        return threadName;
+    }
+
+    public long getTimeInMsPerMillionRows() {
+        return timeInMsPerMillionRows;
+    }
 
-	public List<ResultValue> getCsvRepresentation(ResultUtil util) {
+    public List<ResultValue> getCsvRepresentation(ResultUtil util) {
         List<ResultValue> rowValues = new ArrayList<>();
         rowValues.add(new ResultValue(util.convertNull(getTableName())));
         rowValues.add(new ResultValue(util.convertNull(getThreadName())));
         rowValues.add(new ResultValue(util.convertNull(String.valueOf(getRowsUpserted()))));
-        rowValues.add(new ResultValue(util.convertNull(String.valueOf(getTimeInMsPerMillionRows()))));
+        rowValues.add(new ResultValue(
+                util.convertNull(String.valueOf(getTimeInMsPerMillionRows()))));
 
         return rowValues;
-	}
+    }
 
-	public int getRowsUpserted() {
-		return rowsUpserted;
-	}
+    public long getRowsUpserted() {
+        return rowsUpserted;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java
index bb23f16..0ff5c59 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java
@@ -22,29 +22,29 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class DataLoadTimeSummary {
-	private List<TableLoadTime> tableLoadTime = new ArrayList<TableLoadTime>();
+    private List<TableLoadTime> tableLoadTime = new ArrayList<TableLoadTime>();
 
-	public List<TableLoadTime> getTableLoadTime() {
-		return tableLoadTime;
-	}
-	
-	public void add(String tableName, int rowCount, int durationInMs) {
-		tableLoadTime.add(new TableLoadTime(tableName, rowCount, durationInMs));
-	}
+    public List<TableLoadTime> getTableLoadTime() {
+        return tableLoadTime;
+    }
+
+    public void add(String tableName, int rowCount, int durationInMs) {
+        tableLoadTime.add(new TableLoadTime(tableName, rowCount, durationInMs));
+    }
 }
 
 class TableLoadTime {
-	private int durationInMs;
-	private String tableName;
-	private int rowCount;
+    private int durationInMs;
+    private String tableName;
+    private int rowCount;
+
+    public TableLoadTime(String tableName, int rowCount, int durationInMs) {
+        this.tableName = tableName;
+        this.rowCount = rowCount;
+        this.durationInMs = durationInMs;
+    }
 
-	public TableLoadTime(String tableName, int rowCount, int durationInMs) {
-		this.tableName = tableName;
-		this.rowCount = rowCount;
-		this.durationInMs = durationInMs;
-	}
-	
-	public List<ResultValue> getCsvRepresentation(ResultUtil util) {
+    public List<ResultValue> getCsvRepresentation(ResultUtil util) {
         List<ResultValue> rowValues = new ArrayList<>();
         rowValues.add(new ResultValue(util.convertNull(getTableName())));
         rowValues.add(new ResultValue(util.convertNull(String.valueOf(getRowCount()))));
@@ -53,15 +53,15 @@ class TableLoadTime {
         return rowValues;
     }
 
-	public int getDurationInMs() {
-		return durationInMs;
-	}
+    public int getDurationInMs() {
+        return durationInMs;
+    }
 
-	public String getTableName() {
-		return tableName;
-	}
+    public String getTableName() {
+        return tableName;
+    }
 
-	public int getRowCount() {
-		return rowCount;
-	}
+    public int getRowCount() {
+        return rowCount;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java
index 72920fa..5c07ffe 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java
@@ -18,61 +18,57 @@
 
 package org.apache.phoenix.pherf.result;
 
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.phoenix.pherf.configuration.DataModel;
 
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
 
-import org.apache.phoenix.pherf.configuration.DataModel;
+@XmlRootElement(namespace = "org.apache.phoenix.pherf.result") public class DataModelResult
+        extends DataModel {
+    private List<ScenarioResult> scenarioResult = new ArrayList<ScenarioResult>();
+    private String zookeeper;
 
-@XmlRootElement(namespace = "org.apache.phoenix.pherf.result")
-public class DataModelResult extends DataModel {
-	private List<ScenarioResult> scenarioResult = new ArrayList<ScenarioResult>();
-	private String zookeeper;
+    public List<ScenarioResult> getScenarioResult() {
+        return scenarioResult;
+    }
 
-	public List<ScenarioResult> getScenarioResult() {
-		return scenarioResult;
-	}
+    @SuppressWarnings("unused") public void setScenarioResult(List<ScenarioResult> scenarioResult) {
+        this.scenarioResult = scenarioResult;
+    }
 
-    @SuppressWarnings("unused")
-	public void setScenarioResult(List<ScenarioResult> scenarioResult) {
-		this.scenarioResult = scenarioResult;
-	}
-	
-	public DataModelResult() {
-	}
+    public DataModelResult() {
+    }
 
-    private DataModelResult(String name, String release, String zookeeper) {
+    private DataModelResult(String name, String zookeeper) {
         this.setName(name);
-        this.setRelease(release);
         this.zookeeper = zookeeper;
     }
 
     /**
      * Copy constructor
-     * 
+     *
      * @param dataModelResult
      */
     public DataModelResult(DataModelResult dataModelResult) {
-        this(dataModelResult.getName(), dataModelResult.getRelease(), dataModelResult.getZookeeper());
+        this(dataModelResult.getName(), dataModelResult.getZookeeper());
         this.scenarioResult = dataModelResult.getScenarioResult();
     }
-	
-	public DataModelResult(DataModel dataModel, String zookeeper) {
-	    this(dataModel.getName(), dataModel.getRelease(), zookeeper);
-	}
-	
-	public DataModelResult(DataModel dataModel) {
-		this(dataModel, null);
-	}
 
-	@XmlAttribute()
-	public String getZookeeper() {
-		return zookeeper;
-	}
+    public DataModelResult(DataModel dataModel, String zookeeper) {
+        this(dataModel.getName(), zookeeper);
+    }
+
+    public DataModelResult(DataModel dataModel) {
+        this(dataModel, null);
+    }
+
+    @XmlAttribute() public String getZookeeper() {
+        return zookeeper;
+    }
 
-	public void setZookeeper(String zookeeper) {
-		this.zookeeper = zookeeper;
-	}
+    public void setZookeeper(String zookeeper) {
+        this.zookeeper = zookeeper;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
index b5fd082..1a682da 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
@@ -18,14 +18,14 @@
 
 package org.apache.phoenix.pherf.result;
 
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
 import org.apache.phoenix.pherf.PherfConstants.RunMode;
 import org.apache.phoenix.pherf.configuration.Query;
 import org.apache.phoenix.util.DateUtil;
 
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
 public class QueryResult extends Query {
     private List<ThreadTime> threadTimes = new ArrayList<ThreadTime>();
 
@@ -47,8 +47,7 @@ public class QueryResult extends Query {
         this.setId(query.getId());
     }
 
-    @SuppressWarnings("unused")
-    public QueryResult() {
+    @SuppressWarnings("unused") public QueryResult() {
     }
 
     public Date getStartTime() {
@@ -136,8 +135,8 @@ public class QueryResult extends Query {
     }
 
     private String getStartTimeText() {
-        return (null == this.getStartTime())
-                ? ""
-                : DateUtil.DEFAULT_MS_DATE_FORMATTER.format(this.getStartTime());
+        return (null == this.getStartTime()) ?
+                "" :
+                DateUtil.DEFAULT_MS_DATE_FORMATTER.format(this.getStartTime());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java
index 9010c21..c2be5a3 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java
@@ -18,31 +18,31 @@
 
 package org.apache.phoenix.pherf.result;
 
+import org.apache.phoenix.pherf.configuration.QuerySet;
+
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.phoenix.pherf.configuration.QuerySet;
-
 public class QuerySetResult extends QuerySet {
-	
-	private List<QueryResult> queryResults = new ArrayList<QueryResult>();
-
-	public QuerySetResult(QuerySet querySet) {
-		this.setConcurrency(querySet.getConcurrency());
-		this.setNumberOfExecutions(querySet.getNumberOfExecutions());
-		this.setExecutionDurationInMs(querySet.getExecutionDurationInMs());
-		this.setExecutionType(querySet.getExecutionType());
-	}
-	
-	public QuerySetResult() {
-	}
-
-	public List<QueryResult> getQueryResults() {
-		return queryResults;
-	}
+
+    private List<QueryResult> queryResults = new ArrayList<>();
+
+    public QuerySetResult(QuerySet querySet) {
+        this.setConcurrency(querySet.getConcurrency());
+        this.setNumberOfExecutions(querySet.getNumberOfExecutions());
+        this.setExecutionDurationInMs(querySet.getExecutionDurationInMs());
+        this.setExecutionType(querySet.getExecutionType());
+    }
+
+    public QuerySetResult() {
+    }
+
+    public List<QueryResult> getQueryResults() {
+        return queryResults;
+    }
 
     @SuppressWarnings("unused")
     public void setQueryResults(List<QueryResult> queryResults) {
-		this.queryResults = queryResults;
-	}	
+        this.queryResults = queryResults;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java
index 4ccdd2b..158ed11 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java
@@ -18,10 +18,10 @@
 
 package org.apache.phoenix.pherf.result;
 
-import java.util.List;
-
 import org.apache.phoenix.pherf.result.file.ResultFileDetails;
 
+import java.util.List;
+
 /**
  * Common container for Pherf results.
  */
@@ -33,10 +33,9 @@ public class Result {
     private final String header;
 
     /**
-     *
-     * @param type {@link org.apache.phoenix.pherf.result.file.ResultFileDetails} Currently unused, but gives metadata about the
-     *                                                           contents of the result.
-     * @param header Used for CSV, otherwise pass null. For CSV pass comma separated string of header fields.
+     * @param type          {@link org.apache.phoenix.pherf.result.file.ResultFileDetails} Currently unused, but gives metadata about the
+     *                      contents of the result.
+     * @param header        Used for CSV, otherwise pass null. For CSV pass comma separated string of header fields.
      * @param messageValues List<{@link ResultValue} All fields combined represent the data
      *                      for a row to be written.
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java
index f650cbb..5b71300 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java
@@ -29,9 +29,14 @@ import java.util.List;
  */
 public interface ResultHandler {
     public void write(Result result) throws Exception;
+
     public void flush() throws Exception;
+
     public void close() throws Exception;
+
     public List<Result> read() throws Exception;
+
     public boolean isClosed();
+
     public ResultFileDetails getResultFileDetails();
 }