You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/07/02 23:01:20 UTC

[17/47] phoenix git commit: PHOENIX-1920 - Pherf - Add support for mixed r/w workloads

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java
index 78f18ca..c9333a0 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryVerifier.java
@@ -43,153 +43,160 @@ import difflib.DiffUtils;
 import difflib.Patch;
 
 public class QueryVerifier {
-	private PhoenixUtil pUtil = new PhoenixUtil();
-	private static final Logger logger = LoggerFactory
-			.getLogger(QueryVerifier.class);
-	private boolean useTemporaryOutput;
-	private String directoryLocation;
-
-	public QueryVerifier(boolean useTemporaryOutput) {
-		this.useTemporaryOutput = useTemporaryOutput;
-		this.directoryLocation = this.useTemporaryOutput ? 
-				PherfConstants.EXPORT_TMP : PherfConstants.EXPORT_DIR;
-		
-		ensureBaseDirExists();
-	}
-	
-	/***
-	 * Export query resultSet to CSV file
-	 * @param query
-	 * @throws Exception
-	 */
-	public String exportCSV(Query query) throws Exception {
-		Connection conn = null;
-		PreparedStatement statement = null;
-		ResultSet rs = null;
-		String fileName = getFileName(query);
-		FileOutputStream fos = new FileOutputStream(fileName);
-		try {
-			conn = pUtil.getConnection(query.getTenantId());
-			statement = conn.prepareStatement(query.getStatement());
-			boolean isQuery = statement.execute();
-			if (isQuery) {
-				rs = statement.executeQuery();
-				int columnCount = rs.getMetaData().getColumnCount();
-				while (rs.next()) {
-					for (int columnNum = 1; columnNum <= columnCount; columnNum++) {
-						fos.write((rs.getString(columnNum) + PherfConstants.RESULT_FILE_DELIMETER).getBytes());
-					}
-					fos.write(PherfConstants.NEW_LINE.getBytes());
-				}
-			} else {
-				conn.commit();
-			}
-		} catch (Exception e) {
-			e.printStackTrace();
-		} finally {
-			if (rs != null) rs.close();
-			if (statement != null) statement.close();
-			if (conn != null) conn.close();
-			fos.flush();
-			fos.close();
-		}
-		return fileName;
-	}
-	
-	/***
-	 * Do a diff between exported query results and temporary CSV file
-	 * @param query
-	 * @param newCSV
-	 * @return
-	 */
-	public boolean doDiff(Query query, String newCSV) {
+    private PhoenixUtil pUtil = PhoenixUtil.create();
+    private static final Logger logger = LoggerFactory.getLogger(QueryVerifier.class);
+    private boolean useTemporaryOutput;
+    private String directoryLocation;
+
+    public QueryVerifier(boolean useTemporaryOutput) {
+        this.useTemporaryOutput = useTemporaryOutput;
+        this.directoryLocation =
+                this.useTemporaryOutput ? PherfConstants.EXPORT_TMP : PherfConstants.EXPORT_DIR;
+
+        ensureBaseDirExists();
+    }
+
+    /**
+     * Export query resultSet to CSV file
+     *
+     * @param query
+     * @throws Exception
+     */
+    public String exportCSV(Query query) throws Exception {
+        Connection conn = null;
+        PreparedStatement statement = null;
+        ResultSet rs = null;
+        String fileName = getFileName(query);
+        FileOutputStream fos = new FileOutputStream(fileName);
+        try {
+            conn = pUtil.getConnection(query.getTenantId());
+            statement = conn.prepareStatement(query.getStatement());
+            boolean isQuery = statement.execute();
+            if (isQuery) {
+                rs = statement.executeQuery();
+                int columnCount = rs.getMetaData().getColumnCount();
+                while (rs.next()) {
+                    for (int columnNum = 1; columnNum <= columnCount; columnNum++) {
+                        fos.write((rs.getString(columnNum) + PherfConstants.RESULT_FILE_DELIMETER)
+                                .getBytes());
+                    }
+                    fos.write(PherfConstants.NEW_LINE.getBytes());
+                }
+            } else {
+                conn.commit();
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (rs != null) rs.close();
+            if (statement != null) statement.close();
+            if (conn != null) conn.close();
+            fos.flush();
+            fos.close();
+        }
+        return fileName;
+    }
+
+    /**
+     * Do a diff between exported query results and temporary CSV file
+     *
+     * @param query
+     * @param newCSV
+     * @return
+     */
+    public boolean doDiff(Query query, String newCSV) {
         List<String> original = fileToLines(getCSVName(query, PherfConstants.EXPORT_DIR, ""));
-        List<String> newLines  = fileToLines(newCSV);
-        
+        List<String> newLines = fileToLines(newCSV);
+
         Patch patch = DiffUtils.diff(original, newLines);
         if (patch.getDeltas().isEmpty()) {
-        	logger.info("Match: " + query.getId() + " with " + newCSV);
-        	return true;
+            logger.info("Match: " + query.getId() + " with " + newCSV);
+            return true;
         } else {
-        	logger.error("DIFF FAILED: " + query.getId() + " with " + newCSV);
-        	return false;
+            logger.error("DIFF FAILED: " + query.getId() + " with " + newCSV);
+            return false;
         }
-	}
-	
-	/***
-	 * Helper method to load file
-	 * @param filename
-	 * @return
-	 */
+    }
+
+    /**
+     * Helper method to load file
+     *
+     * @param filename
+     * @return
+     */
     private static List<String> fileToLines(String filename) {
-            List<String> lines = new LinkedList<String>();
-            String line = "";
-            try {
-                    BufferedReader in = new BufferedReader(new FileReader(filename));
-                    while ((line = in.readLine()) != null) {
-                            lines.add(line);
-                    }
-                    in.close();
-            } catch (IOException e) {
-                    e.printStackTrace();
+        List<String> lines = new LinkedList<String>();
+        String line = "";
+        try {
+            BufferedReader in = new BufferedReader(new FileReader(filename));
+            while ((line = in.readLine()) != null) {
+                lines.add(line);
             }
-            
-            return lines;
+            in.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        return lines;
     }
 
     /**
      * Get explain plan for a query
+     *
      * @param query
      * @return
      * @throws SQLException
      */
-	public String getExplainPlan(Query query) throws SQLException {
-		Connection conn = null;
-		ResultSet rs = null;
-		PreparedStatement statement = null;
-		StringBuilder buf = new StringBuilder();
-		try {
-			conn = pUtil.getConnection(query.getTenantId());
-			statement = conn.prepareStatement("EXPLAIN " + query.getStatement());
-			rs = statement.executeQuery();
-	        while (rs.next()) {
-	            buf.append(rs.getString(1).trim().replace(",", "-"));
-	        }
-			statement.close();
-		} catch (Exception e) {
-			e.printStackTrace();
-		} finally {
-			if (rs != null) rs.close();
-			if (statement != null) statement.close();
-			if (conn != null) conn.close();
-		}
-		return buf.toString();
-	}
-	
-    /***
+    public String getExplainPlan(Query query) throws SQLException {
+        Connection conn = null;
+        ResultSet rs = null;
+        PreparedStatement statement = null;
+        StringBuilder buf = new StringBuilder();
+        try {
+            conn = pUtil.getConnection(query.getTenantId());
+            statement = conn.prepareStatement("EXPLAIN " + query.getStatement());
+            rs = statement.executeQuery();
+            while (rs.next()) {
+                buf.append(rs.getString(1).trim().replace(",", "-"));
+            }
+            statement.close();
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (rs != null) rs.close();
+            if (statement != null) statement.close();
+            if (conn != null) conn.close();
+        }
+        return buf.toString();
+    }
+
+    /**
      * Helper method to generate CSV file name
+     *
      * @param query
      * @return
      * @throws FileNotFoundException
      */
-	private String getFileName(Query query) throws FileNotFoundException {
-		String tempExt = "";
-		if (this.useTemporaryOutput) {
-			tempExt = "_" + java.util.UUID.randomUUID().toString();
-		}
-		return getCSVName(query, this.directoryLocation, tempExt);
-	}
-	
-	private String getCSVName(Query query, String directory, String tempExt) {
-		String csvFile = directory + PherfConstants.PATH_SEPARATOR
-		        + query.getId() + tempExt + Extension.CSV.toString();
-				return csvFile;
-	}
-	
-	private void ensureBaseDirExists() {
-		File baseDir = new File(this.directoryLocation);
-		if (!baseDir.exists()) {
-			baseDir.mkdir();
-		}
-	}
+    private String getFileName(Query query) throws FileNotFoundException {
+        String tempExt = "";
+        if (this.useTemporaryOutput) {
+            tempExt = "_" + java.util.UUID.randomUUID().toString();
+        }
+        return getCSVName(query, this.directoryLocation, tempExt);
+    }
+
+    private String getCSVName(Query query, String directory, String tempExt) {
+        String
+                csvFile =
+                directory + PherfConstants.PATH_SEPARATOR + query.getId() + tempExt + Extension.CSV
+                        .toString();
+        return csvFile;
+    }
+
+    private void ensureBaseDirExists() {
+        File baseDir = new File(this.directoryLocation);
+        if (!baseDir.exists()) {
+            baseDir.mkdir();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
new file mode 100644
index 0000000..16a493e
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
@@ -0,0 +1,10 @@
+package org.apache.phoenix.pherf.workload;
+
+public interface Workload {
+    public Runnable execute() throws Exception;
+
+    /**
+     * Use this method to perform any cleanup or forced shutdown of the thread.
+     */
+    public void complete();
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
index cf2f038..a65b4aa 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
@@ -19,95 +19,96 @@
 package org.apache.phoenix.pherf.workload;
 
 import org.apache.phoenix.pherf.PherfConstants;
-import org.apache.phoenix.pherf.PherfConstants.RunMode;
-import org.apache.phoenix.pherf.configuration.XMLConfigParser;
-import org.apache.phoenix.pherf.jmx.MonitorManager;
-import org.apache.phoenix.pherf.loaddata.DataLoader;
-
-import org.apache.phoenix.pherf.util.ResourceList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 public class WorkloadExecutor {
     private static final Logger logger = LoggerFactory.getLogger(WorkloadExecutor.class);
-    private final XMLConfigParser parser;
-    private MonitorManager monitor;
-    private Future monitorThread;
     private final int poolSize;
 
-    private final ExecutorService pool;
+    // Jobs can be accessed by multiple threads
+    private final Map<Workload, Future> jobs = new ConcurrentHashMap<>();
 
+    private final ExecutorService pool;
 
     public WorkloadExecutor() throws Exception {
         this(PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES));
     }
 
-    public WorkloadExecutor(Properties properties) throws Exception{
-        this(properties,PherfConstants.DEFAULT_FILE_PATTERN);
+    public WorkloadExecutor(Properties properties) throws Exception {
+        this(properties, new ArrayList());
     }
 
-    public WorkloadExecutor(Properties properties, String filePattern) throws Exception {
-        this(properties,
-                new XMLConfigParser(filePattern),
-                true);
+    public WorkloadExecutor(Properties properties, List<Workload> workloads) throws Exception {
+        this.poolSize =
+                (properties.getProperty("pherf.default.threadpool") == null) ?
+                        PherfConstants.DEFAULT_THREAD_POOL_SIZE :
+                        Integer.parseInt(properties.getProperty("pherf.default.threadpool"));
+
+        this.pool = Executors.newFixedThreadPool(this.poolSize);
+        init(workloads);
     }
 
-    public WorkloadExecutor(Properties properties, XMLConfigParser parser, boolean monitor) throws Exception {
-        this.parser = parser;
-        this.poolSize = (properties.getProperty("pherf.default.threadpool") == null)
-                ? PherfConstants.DEFAULT_THREAD_POOL_SIZE
-                : Integer.parseInt(properties.getProperty("pherf.default.threadpool"));
+    public void add(Workload workload) throws Exception {
+        this.jobs.put(workload, pool.submit(workload.execute()));
+    }
 
-        this.pool = Executors.newFixedThreadPool(this.poolSize);
-        if (monitor) {
-            initMonitor(Integer.parseInt(properties.getProperty("pherf.default.monitorFrequency")));
+    /**
+     * Blocks on waiting for all workloads to finish. If a
+     * {@link org.apache.phoenix.pherf.workload.Workload} Requires complete() to be called, it must
+     * be called prior to using this method. Otherwise it will block infinitely.
+     */
+    public void get() {
+        for (Workload workload : jobs.keySet()) {
+            get(workload);
         }
     }
 
     /**
-     * Executes all scenarios dataload
+     * Calls the {@link java.util.concurrent.Future#get()} method pertaining to this workflow.
+     * Once the Future competes, the workflow is removed from the list.
      *
-     * @throws Exception
+     * @param workload Key entry in the HashMap
      */
-    public void executeDataLoad() throws Exception {
-        logger.info("\n\nStarting Data Loader...");
-        DataLoader dataLoader = new DataLoader(parser);
-        dataLoader.execute();
+    public void get(Workload workload) {
+        try {
+            Future future = jobs.get(workload);
+            future.get();
+            jobs.remove(workload);
+        } catch (InterruptedException | ExecutionException e) {
+            logger.error("", e);
+        }
     }
 
     /**
-     * Executes all scenario multi-threaded query sets
-     *
-     * @param queryHint
-     * @throws Exception
+     * Complete all workloads in the list.
+     * Entries in the job Map will persist until {#link WorkloadExecutorNew#get()} is called
      */
-    public void executeMultithreadedQueryExecutor(String queryHint, boolean export, RunMode runMode) throws Exception {
-        logger.info("\n\nStarting Query Executor...");
-        QueryExecutor queryExecutor = new QueryExecutor(parser);
-        queryExecutor.execute(queryHint, export, runMode);
+    public void complete() {
+        for (Workload workload : jobs.keySet()) {
+            workload.complete();
+        }
     }
 
-    public void shutdown() throws Exception {
-		if (null != monitor && monitor.isRunning()) {
-            this.monitor.stop();
-            this.monitorThread.get(60, TimeUnit.SECONDS);
-            this.pool.shutdown();
-        }
+    public void shutdown() {
+        // Make sure any Workloads still on pool have been properly shutdown
+        complete();
+        pool.shutdownNow();
     }
 
-    // Just used for testing
-    public XMLConfigParser getParser() {
-        return parser;
+    public ExecutorService getPool() {
+        return pool;
     }
 
-    private void initMonitor(int monitorFrequency) throws Exception {
-        this.monitor = new MonitorManager(monitorFrequency);
-        monitorThread = pool.submit(this.monitor);
+    private void init(List<Workload> workloads) throws Exception {
+        for (Workload workload : workloads) {
+            this.jobs.put(workload, pool.submit(workload.execute()));
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
new file mode 100644
index 0000000..305521b
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload;
+
+import org.apache.phoenix.pherf.PherfConstants;
+import org.apache.phoenix.pherf.configuration.Column;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.configuration.WriteParams;
+import org.apache.phoenix.pherf.configuration.XMLConfigParser;
+import org.apache.phoenix.pherf.exception.PherfException;
+import org.apache.phoenix.pherf.result.DataLoadThreadTime;
+import org.apache.phoenix.pherf.result.DataLoadTimeSummary;
+import org.apache.phoenix.pherf.result.ResultUtil;
+import org.apache.phoenix.pherf.rules.DataValue;
+import org.apache.phoenix.pherf.rules.RulesApplier;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.util.RowCalculator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.sql.*;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+public class WriteWorkload implements Workload {
+    private static final Logger logger = LoggerFactory.getLogger(WriteWorkload.class);
+    private final PhoenixUtil pUtil;
+    private final XMLConfigParser parser;
+    private final RulesApplier rulesApplier;
+    private final ResultUtil resultUtil;
+    private final ExecutorService pool;
+    private final WriteParams writeParams;
+    private final Scenario scenario;
+    private final long threadSleepDuration;
+
+    private final int threadPoolSize;
+    private final int batchSize;
+
+    public WriteWorkload(XMLConfigParser parser) throws Exception {
+        this(PhoenixUtil.create(), parser);
+    }
+
+    public WriteWorkload(PhoenixUtil util, XMLConfigParser parser) throws Exception {
+        this(util, parser, null);
+    }
+
+    public WriteWorkload(PhoenixUtil phoenixUtil, XMLConfigParser parser, Scenario scenario)
+            throws Exception {
+        this(phoenixUtil, PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES),
+                parser, scenario);
+    }
+
+    /**
+     * Default the writers to use up all available cores for threads. If writeParams are used in
+     * the config files, they will override the defaults. writeParams are used for read/write mixed
+     * workloads.
+     * TODO extract notion of the scenario list and have 1 write workload per scenario
+     *
+     * @param phoenixUtil {@link org.apache.phoenix.pherf.util.PhoenixUtil} Query helper
+     * @param properties  {@link java.util.Properties} default properties to use
+     * @param parser      {@link org.apache.phoenix.pherf.configuration.XMLConfigParser}
+     * @param scenario    {@link org.apache.phoenix.pherf.configuration.Scenario} If null is passed
+     *                    it will run against all scenarios in the parsers list.
+     * @throws Exception
+     */
+    public WriteWorkload(PhoenixUtil phoenixUtil, Properties properties, XMLConfigParser parser,
+            Scenario scenario) throws Exception {
+        this.pUtil = phoenixUtil;
+        this.parser = parser;
+        this.rulesApplier = new RulesApplier(parser);
+        this.resultUtil = new ResultUtil();
+
+        // Overwrite defaults properties with those given in the configuration. This indicates the
+        // scenario is a R/W mixed workload.
+        if (scenario != null) {
+            this.scenario = scenario;
+            writeParams = scenario.getWriteParams();
+            threadSleepDuration = writeParams.getThreadSleepDuration();
+        } else {
+            writeParams = null;
+            this.scenario = null;
+            threadSleepDuration = 0;
+        }
+
+        int size = Integer.parseInt(properties.getProperty("pherf.default.dataloader.threadpool"));
+
+        this.threadPoolSize = (size == 0) ? Runtime.getRuntime().availableProcessors() : size;
+
+        // TODO Move pool management up to WorkloadExecutor
+        this.pool = Executors.newFixedThreadPool(this.threadPoolSize);
+
+        String
+                bSize =
+                (writeParams == null) || (writeParams.getBatchSize() == Long.MIN_VALUE) ?
+                        properties.getProperty("pherf.default.dataloader.batchsize") :
+                        String.valueOf(writeParams.getBatchSize());
+        this.batchSize =
+                (bSize == null) ? PherfConstants.DEFAULT_BATCH_SIZE : Integer.parseInt(bSize);
+    }
+
+    @Override public void complete() {
+    }
+
+    public Runnable execute() throws Exception {
+        return new Runnable() {
+            @Override public void run() {
+                try {
+                    DataLoadTimeSummary dataLoadTimeSummary = new DataLoadTimeSummary();
+                    DataLoadThreadTime dataLoadThreadTime = new DataLoadThreadTime();
+
+                    if (WriteWorkload.this.scenario == null) {
+                        for (Scenario scenario : getParser().getScenarios()) {
+                            exec(dataLoadTimeSummary, dataLoadThreadTime, scenario);
+                        }
+                    } else {
+                        exec(dataLoadTimeSummary, dataLoadThreadTime, WriteWorkload.this.scenario);
+                    }
+                    resultUtil.write(dataLoadTimeSummary);
+                    resultUtil.write(dataLoadThreadTime);
+
+                } catch (Exception e) {
+                    logger.warn("", e);
+                }
+            }
+        };
+    }
+
+    private synchronized void exec(DataLoadTimeSummary dataLoadTimeSummary,
+            DataLoadThreadTime dataLoadThreadTime, Scenario scenario) throws Exception {
+        logger.info("\nLoading " + scenario.getRowCount() + " rows for " + scenario.getTableName());
+        long start = System.currentTimeMillis();
+
+        List<Future> writeBatches = getBatches(dataLoadThreadTime, scenario);
+
+        waitForBatches(dataLoadTimeSummary, scenario, start, writeBatches);
+
+        // always update stats for Phoenix base tables
+        updatePhoenixStats(scenario.getTableName());
+    }
+
+    private List<Future> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario)
+            throws Exception {
+        RowCalculator
+                rowCalculator =
+                new RowCalculator(getThreadPoolSize(), scenario.getRowCount());
+        List<Future> writeBatches = new ArrayList<>();
+
+        for (int i = 0; i < getThreadPoolSize(); i++) {
+            List<Column>
+                    phxMetaCols =
+                    pUtil.getColumnsFromPhoenix(scenario.getSchemaName(),
+                            scenario.getTableNameWithoutSchemaName(), pUtil.getConnection());
+            int threadRowCount = rowCalculator.getNext();
+            logger.info(
+                    "Kick off thread (#" + i + ")for upsert with (" + threadRowCount + ") rows.");
+            Future<Info>
+                    write =
+                    upsertData(scenario, phxMetaCols, scenario.getTableName(), threadRowCount,
+                            dataLoadThreadTime);
+            writeBatches.add(write);
+        }
+        if (writeBatches.isEmpty()) {
+            throw new PherfException(
+                    "Holy shit snacks! Throwing up hands in disbelief and exiting. Could not write data for some unknown reason.");
+        }
+
+        return writeBatches;
+    }
+
+    private void waitForBatches(DataLoadTimeSummary dataLoadTimeSummary, Scenario scenario,
+            long start, List<Future> writeBatches)
+            throws InterruptedException, java.util.concurrent.ExecutionException {
+        int sumRows = 0, sumDuration = 0;
+        // Wait for all the batch threads to complete
+        for (Future<Info> write : writeBatches) {
+            Info writeInfo = write.get();
+            sumRows += writeInfo.getRowCount();
+            sumDuration += writeInfo.getDuration();
+            logger.info("Executor (" + this.hashCode() + ") writes complete with row count ("
+                    + writeInfo.getRowCount() + ") in Ms (" + writeInfo.getDuration() + ")");
+        }
+        logger.info("Writes completed with total row count (" + sumRows + ") with total time of("
+                + sumDuration + ") Ms");
+        dataLoadTimeSummary
+                .add(scenario.getTableName(), sumRows, (int) (System.currentTimeMillis() - start));
+    }
+
+    /**
+     * TODO Move this method to PhoenixUtil
+     * Update Phoenix table stats
+     *
+     * @param tableName
+     * @throws Exception
+     */
+    public void updatePhoenixStats(String tableName) throws Exception {
+        logger.info("Updating stats for " + tableName);
+        pUtil.executeStatement("UPDATE STATISTICS " + tableName);
+    }
+
+    public Future<Info> upsertData(final Scenario scenario, final List<Column> columns,
+            final String tableName, final int rowCount,
+            final DataLoadThreadTime dataLoadThreadTime) {
+        Future<Info> future = pool.submit(new Callable<Info>() {
+            @Override public Info call() throws Exception {
+                int rowsCreated = 0;
+                long start = 0, duration, totalDuration;
+                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+                Connection connection = null;
+                try {
+                    connection = pUtil.getConnection();
+                    long logStartTime = System.currentTimeMillis();
+                    long
+                            maxDuration =
+                            (WriteWorkload.this.writeParams == null) ?
+                                    Long.MAX_VALUE :
+                                    WriteWorkload.this.writeParams.getExecutionDurationInMs();
+
+                    for (long i = rowCount; (i > 0) && ((System.currentTimeMillis() - logStartTime)
+                            < maxDuration); i--) {
+                        String sql = buildSql(columns, tableName);
+                        PreparedStatement stmt = connection.prepareStatement(sql);
+                        stmt = buildStatement(scenario, columns, stmt, simpleDateFormat);
+                        start = System.currentTimeMillis();
+                        rowsCreated += stmt.executeUpdate();
+                        stmt.close();
+                        if ((i % getBatchSize()) == 0) {
+                            connection.commit();
+                            duration = System.currentTimeMillis() - start;
+                            logger.info("Writer (" + Thread.currentThread().getName()
+                                    + ") committed Batch. Total " + getBatchSize()
+                                    + " rows for this thread (" + this.hashCode() + ") in ("
+                                    + duration + ") Ms");
+
+                            if (i % PherfConstants.LOG_PER_NROWS == 0 && i != 0) {
+                                dataLoadThreadTime
+                                        .add(tableName, Thread.currentThread().getName(), i,
+                                                System.currentTimeMillis() - logStartTime);
+                                logStartTime = System.currentTimeMillis();
+                            }
+
+                            // Pause for throttling if configured to do so
+                            Thread.sleep(threadSleepDuration);
+                        }
+                    }
+                } finally {
+                    if (connection != null) {
+                        try {
+                            connection.commit();
+                            duration = System.currentTimeMillis() - start;
+                            logger.info("Writer ( " + Thread.currentThread().getName()
+                                    + ") committed Final Batch. Duration (" + duration + ") Ms");
+                            connection.close();
+                        } catch (SQLException e) {
+                            // Swallow since we are closing anyway
+                            e.printStackTrace();
+                        }
+                    }
+                }
+                totalDuration = System.currentTimeMillis() - start;
+                return new Info(totalDuration, rowsCreated);
+            }
+        });
+        return future;
+    }
+
+    private PreparedStatement buildStatement(Scenario scenario, List<Column> columns,
+            PreparedStatement statement, SimpleDateFormat simpleDateFormat) throws Exception {
+        int count = 1;
+        for (Column column : columns) {
+
+            DataValue dataValue = getRulesApplier().getDataForRule(scenario, column);
+            switch (column.getType()) {
+            case VARCHAR:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.VARCHAR);
+                } else {
+                    statement.setString(count, dataValue.getValue());
+                }
+                break;
+            case CHAR:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.CHAR);
+                } else {
+                    statement.setString(count, dataValue.getValue());
+                }
+                break;
+            case DECIMAL:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.DECIMAL);
+                } else {
+                    statement.setBigDecimal(count, new BigDecimal(dataValue.getValue()));
+                }
+                break;
+            case INTEGER:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.INTEGER);
+                } else {
+                    statement.setInt(count, Integer.parseInt(dataValue.getValue()));
+                }
+                break;
+            case DATE:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.DATE);
+                } else {
+                    Date
+                            date =
+                            new java.sql.Date(
+                                    simpleDateFormat.parse(dataValue.getValue()).getTime());
+                    statement.setDate(count, date);
+                }
+                break;
+            default:
+                break;
+            }
+            count++;
+        }
+        return statement;
+    }
+
+    private String buildSql(final List<Column> columns, final String tableName) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("upsert into ");
+        builder.append(tableName);
+        builder.append(" (");
+        int count = 1;
+        for (Column column : columns) {
+            builder.append(column.getName());
+            if (count < columns.size()) {
+                builder.append(",");
+            } else {
+                builder.append(")");
+            }
+            count++;
+        }
+        builder.append(" VALUES (");
+        for (int i = 0; i < columns.size(); i++) {
+            if (i < columns.size() - 1) {
+                builder.append("?,");
+            } else {
+                builder.append("?)");
+            }
+        }
+        return builder.toString();
+    }
+
+    public XMLConfigParser getParser() {
+        return parser;
+    }
+
+    public RulesApplier getRulesApplier() {
+        return rulesApplier;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public int getThreadPoolSize() {
+        return threadPoolSize;
+    }
+
+    private class Info {
+
+        private final int rowCount;
+        private final long duration;
+
+        public Info(long duration, int rows) {
+            this.duration = duration;
+            this.rowCount = rows;
+        }
+
+        public long getDuration() {
+            return duration;
+        }
+
+        public int getRowCount() {
+            return rowCount;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
index 9514089..8f93685 100644
--- a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
+++ b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
@@ -304,6 +304,41 @@
         </column>
     </datamapping>
     <scenarios>
+        <scenario tableName="PHERF.PHERF_PROD_TEST_UNSALTED" rowCount="100" name="readWriteScenario">
+            <!-- Scenario level rule overrides will be unsupported in V1.
+                    You can use the general datamappings in the mean time-->
+            <dataOverride>
+                <column>
+                    <type>VARCHAR</type>
+                    <userDefined>true</userDefined>
+                    <dataSequence>LIST</dataSequence>
+                    <name>TENANT_ID</name>
+                </column>
+            </dataOverride>
+            <writeParams executionDurationInMs="10000">
+                <!--
+                    Number of writer it insert into the threadpool
+                -->
+                <writerThreadCount>5</writerThreadCount>
+
+                <!--
+                    Time in Ms that each thread will sleep between batch writes. This helps to
+                    throttle writers.
+                -->
+                <threadSleepDuration>10</threadSleepDuration>
+
+                <batchSize>100</batchSize>
+            </writeParams>
+            <!--Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first -->
+            <querySet concurrency="1" executionType="PARALLEL" executionDurationInMs="60000"
+                      numberOfExecutions="100">
+                <!--  Aggregate queries on a per tenant basis -->
+                <query tenantId="00Dxx0000001gER"
+                       ddl="CREATE VIEW IF NOT EXISTS PHERF.PHERF_TEST_VIEW_UNSALTED AS SELECT * FROM PHERF.PHERF_PROD_TEST_UNSALTED"
+                       statement="select count(*) from PHERF.PHERF_TEST_VIEW_UNSALTED"/>
+            </querySet>
+
+        </scenario>
         <scenario tableName="PHERF.PHERF_PROD_TEST_UNSALTED" rowCount="10">
             <!-- Scenario level rule overrides will be unsupported in V1.
                     You can use the general datamappings in the mean time-->

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
index f362842..6f25fbd 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.phoenix.pherf;
 
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -27,7 +28,6 @@ import java.util.List;
 
 import org.apache.phoenix.pherf.configuration.*;
 import org.apache.phoenix.pherf.rules.DataValue;
-import org.apache.phoenix.pherf.workload.WorkloadExecutor;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,53 +38,55 @@ import javax.xml.bind.Marshaller;
 
 import static org.junit.Assert.*;
 
-public class ConfigurationParserTest extends ResultBaseTest{
+public class ConfigurationParserTest extends ResultBaseTest {
     private static final Logger logger = LoggerFactory.getLogger(ConfigurationParserTest.class);
 
     @Test
-    public void testConfigFilesParsing() {
-        try {
-        	WorkloadExecutor workloadExec = new WorkloadExecutor();
-            List<Scenario> scenarioList = workloadExec.getParser().getScenarios();
-            assertTrue("Could not load the scenarios from xml.", (scenarioList != null) && (scenarioList.size() > 0));
-            logger.info("Number of scenarios loaded: " + scenarioList.size());
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail();
+    public void testReadWriteWorkloadReader() throws Exception {
+        String scenarioName = "testScenarioRW";
+        List<Scenario> scenarioList = getScenarios();
+        Scenario target = null;
+        for (Scenario scenario : scenarioList) {
+            if (scenarioName.equals(scenario.getName())) {
+                target = scenario;
+            }
         }
+        assertNotNull("Could not find scenario: " + scenarioName, target);
+        WriteParams params = target.getWriteParams();
+
+        assertNotNull("Could not find writeParams in scenario: " + scenarioName, params);
+        assertNotNull("Could not find batch size: ", params.getBatchSize());
+        assertNotNull("Could not find execution duration: ", params.getExecutionDurationInMs());
+        assertNotNull("Could not find sleep duration: ", params.getThreadSleepDuration());
+        assertNotNull("Could not find writer count: ", params.getWriterThreadCount());
     }
 
-	@Test
+    @Test
     // TODO Break this into multiple smaller tests.
-    public void testConfigReader(){
-		URL resourceUrl = getClass().getResource("/scenario/test_scenario.xml");
-        assertNotNull("Test data XML file is missing", resourceUrl);
-
-		try {
+    public void testConfigReader() {
+        try {
 
             logger.debug("DataModel: " + writeXML());
-			Path resourcePath = Paths.get(resourceUrl.toURI());
-            DataModel data = XMLConfigParser.readDataModel(resourcePath);
-            List<Scenario> scenarioList = data.getScenarios();
-            assertTrue("Could not load the scenarios from xml.", (scenarioList != null) && (scenarioList.size() > 0));
-            List<Column> dataMappingColumns = data.getDataMappingColumns();
-            assertTrue("Could not load the data columns from xml.", (dataMappingColumns != null) && (dataMappingColumns.size() > 0));
+            List<Scenario> scenarioList = getScenarios();
+            List<Column> dataMappingColumns = getDataModel().getDataMappingColumns();
+            assertTrue("Could not load the data columns from xml.",
+                    (dataMappingColumns != null) && (dataMappingColumns.size() > 0));
             assertTrue("Could not load the data DataValue list from xml.",
                     (dataMappingColumns.get(6).getDataValues() != null)
-                    && (dataMappingColumns.get(6).getDataValues().size() > 0));
+                            && (dataMappingColumns.get(6).getDataValues().size() > 0));
 
             assertDateValue(dataMappingColumns);
 
             // Validate column mappings
             for (Column column : dataMappingColumns) {
-                assertNotNull("Column ("+ column.getName() + ") is missing its type",column.getType());
+                assertNotNull("Column (" + column.getName() + ") is missing its type",
+                        column.getType());
             }
 
-            Scenario scenario = scenarioList.get(0);
+            Scenario scenario = scenarioList.get(1);
             assertNotNull(scenario);
             assertEquals("PHERF.TEST_TABLE", scenario.getTableName());
-            assertEquals(10, scenario.getRowCount());
+            assertEquals(30, scenario.getRowCount());
             assertEquals(1, scenario.getDataOverride().getColumn().size());
             QuerySet qs = scenario.getQuerySet().get(0);
             assertEquals(ExecutionType.SERIAL, qs.getExecutionType());
@@ -99,27 +101,50 @@ public class ConfigurationParserTest extends ResultBaseTest{
             assertEquals("select count(*) from PHERF.TEST_TABLE", firstQuery.getStatement());
             assertEquals("123456789012345", firstQuery.getTenantId());
             assertEquals(null, firstQuery.getDdl());
-            assertEquals(0, (long)firstQuery.getExpectedAggregateRowCount());
+            assertEquals(0, (long) firstQuery.getExpectedAggregateRowCount());
 
             Query secondQuery = qs.getQuery().get(1);
-            assertEquals("Could not get statement.", "select sum(SOME_INT) from PHERF.TEST_TABLE", secondQuery.getStatement());
+            assertEquals("Could not get statement.", "select sum(SOME_INT) from PHERF.TEST_TABLE",
+                    secondQuery.getStatement());
             assertEquals("Could not get queryGroup.", "g1", secondQuery.getQueryGroup());
 
             // Make sure anything in the overrides matches a real column in the data mappings
             DataOverride override = scenario.getDataOverride();
             for (Column column : override.getColumn()) {
-                assertTrue("Could not lookup Column (" + column.getName() + ") in DataMapping columns: " + dataMappingColumns, dataMappingColumns.contains(column));
+                assertTrue("Could not lookup Column (" + column.getName()
+                        + ") in DataMapping columns: " + dataMappingColumns,
+                        dataMappingColumns.contains(column));
             }
 
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail();
-		}
-	}
+        } catch (Exception e) {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+    private URL getResourceUrl() {
+        URL resourceUrl = getClass().getResource("/scenario/test_scenario.xml");
+        assertNotNull("Test data XML file is missing", resourceUrl);
+        return resourceUrl;
+    }
+
+    private List<Scenario> getScenarios() throws URISyntaxException, JAXBException{
+        DataModel data = getDataModel();
+        List<Scenario> scenarioList = data.getScenarios();
+        assertTrue("Could not load the scenarios from xml.",
+                (scenarioList != null) && (scenarioList.size() > 0));
+        return scenarioList;
+    }
+
+    private DataModel getDataModel() throws URISyntaxException, JAXBException {
+        Path resourcePath = Paths.get(getResourceUrl().toURI());
+        return XMLConfigParser.readDataModel(resourcePath);
+    }
 
     private void assertDateValue(List<Column> dataMappingColumns) {
         for (Column dataMapping : dataMappingColumns) {
-            if ((dataMapping.getType() == DataTypeMapping.DATE) && (dataMapping.getName().equals("CREATED_DATE"))) {
+            if ((dataMapping.getType() == DataTypeMapping.DATE) && (dataMapping.getName()
+                    .equals("CREATED_DATE"))) {
                 // First rule should have min/max set
                 assertNotNull(dataMapping.getDataValues().get(0).getMinValue());
                 assertNotNull(dataMapping.getDataValues().get(0).getMaxValue());
@@ -139,7 +164,7 @@ public class ConfigurationParserTest extends ResultBaseTest{
     /*
         Used for debugging to dump out a simple xml filed based on the bound objects.
      */
-	private String writeXML() {
+    private String writeXML() {
         DataModel data = new DataModel();
         try {
             DataValue dataValue = new DataValue();
@@ -156,7 +181,6 @@ public class ConfigurationParserTest extends ResultBaseTest{
             List<Column> columnList = new ArrayList<>();
             columnList.add(column);
 
-            data.setRelease("192");
             data.setDataMappingColumns(columnList);
 
             Scenario scenario = new Scenario();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
index a202437..4ccf95c 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultTest.java
@@ -33,7 +33,6 @@ import org.apache.phoenix.pherf.result.file.ResultFileDetails;
 import org.apache.phoenix.pherf.result.impl.CSVResultHandler;
 import org.apache.phoenix.pherf.result.impl.XMLResultHandler;
 import org.apache.phoenix.pherf.result.*;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.phoenix.pherf.configuration.Query;
@@ -72,7 +71,7 @@ public class ResultTest extends ResultBaseTest {
     public void testMonitorResult() throws Exception {
         ExecutorService executorService = Executors.newFixedThreadPool(1);
         MonitorManager monitor = new MonitorManager(100);
-        Future future = executorService.submit(monitor);
+        Future future = executorService.submit(monitor.execute());
         List<Result> records;
         final int TIMEOUT = 30;
 
@@ -83,7 +82,7 @@ public class ResultTest extends ResultBaseTest {
             Thread.sleep(100);
             if (ct == max) {
                 int timer = 0;
-                monitor.stop();
+                monitor.complete();
                 while (monitor.isRunning() && (timer < TIMEOUT)) {
                     System.out.println("Waiting for monitor to finish. Seconds Waited :" + timer);
                     Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
index 15d4608..92604d4 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
@@ -19,7 +19,7 @@
 package org.apache.phoenix.pherf;
 
 import org.apache.phoenix.pherf.configuration.*;
-import org.apache.phoenix.pherf.loaddata.DataLoader;
+import org.apache.phoenix.pherf.workload.WriteWorkload;
 import org.apache.phoenix.pherf.rules.DataValue;
 import org.apache.phoenix.pherf.rules.RulesApplier;
 import org.apache.phoenix.pherf.util.PhoenixUtil;
@@ -28,20 +28,19 @@ import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 import org.junit.Test;
 
-import java.sql.Types;
 import java.util.*;
 
 import static org.junit.Assert.*;
 
 public class RuleGeneratorTest {
-    static PhoenixUtil util = new PhoenixUtil(true);
-    static final String matcherScenario = PherfConstants.SCENARIO_ROOT_PATTERN + ".xml";
+    private static PhoenixUtil util = PhoenixUtil.create(true);
+    private static final String matcherScenario = PherfConstants.SCENARIO_ROOT_PATTERN + ".xml";
 
     @Test
     public void testDateGenerator() throws Exception {
         XMLConfigParser parser = new XMLConfigParser(matcherScenario);
         DataModel model = parser.getDataModels().get(0);
-        DataLoader loader = new DataLoader(parser);
+        WriteWorkload loader = new WriteWorkload(parser);
         RulesApplier rulesApplier = loader.getRulesApplier();
 
         for (Column dataMapping : model.getDataMappingColumns()) {
@@ -68,7 +67,7 @@ public class RuleGeneratorTest {
     public void testNullChance() throws Exception {
         XMLConfigParser parser = new XMLConfigParser(matcherScenario);
         DataModel model = parser.getDataModels().get(0);
-        DataLoader loader = new DataLoader(parser);
+        WriteWorkload loader = new WriteWorkload(parser);
         RulesApplier rulesApplier = loader.getRulesApplier();
         int sampleSize = 100;
         List<String> values = new ArrayList<>(sampleSize);
@@ -96,7 +95,7 @@ public class RuleGeneratorTest {
     public void testSequentialDataSequence() throws Exception {
         XMLConfigParser parser = new XMLConfigParser(matcherScenario);
         DataModel model = parser.getDataModels().get(0);
-        DataLoader loader = new DataLoader(parser);
+        WriteWorkload loader = new WriteWorkload(parser);
         RulesApplier rulesApplier = loader.getRulesApplier();
 
         Column targetColumn = null;
@@ -181,7 +180,7 @@ public class RuleGeneratorTest {
         expectedValues.add("cCCyYhnNbBs9kWr");
 
         XMLConfigParser parser = new XMLConfigParser(".*test_scenario.xml");
-        DataLoader loader = new DataLoader(parser);
+        WriteWorkload loader = new WriteWorkload(parser);
         RulesApplier rulesApplier = loader.getRulesApplier();
         Scenario scenario = parser.getScenarios().get(0);
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7175dcbc/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
index 45d36d2..fddf022 100644
--- a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
+++ b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
@@ -127,10 +127,50 @@
             <name>NEWVAL_STRING</name>
             <prefix>TSTPRFX</prefix>
         </column>
-
     </datamapping>
     <scenarios>
-        <scenario tableName="PHERF.TEST_TABLE" rowCount="10" name="testScenario">
+        <scenario tableName="PHERF.TEST_TABLE" rowCount="100" name="testScenarioRW">
+            <!-- Scenario level rule overrides will be unsupported in V1.
+                    You can use the general datamappings in the mean time-->
+            <dataOverride>
+                <column>
+                    <type>VARCHAR</type>
+                    <userDefined>true</userDefined>
+                    <dataSequence>RANDOM</dataSequence>
+                    <length>10</length>
+                    <name>FIELD</name>
+                </column>
+            </dataOverride>
+
+            <!--
+                This is used to add mixed R/W workloads.
+
+                If this tag exists, a writer pool will be created based on the below properties.
+                These props will override the default values in pherf.properties, but only for this
+                scenario.The write jobs will run in conjunction with the querySet below.
+            -->
+            <writeParams executionDurationInMs="10000">
+                <!--
+                    Number of writer it insert into the threadpool
+                -->
+                <writerThreadCount>2</writerThreadCount>
+
+                <!--
+                    Time in Ms that each thread will sleep between batch writes. This helps to
+                    throttle writers.
+                -->
+                <threadSleepDuration>10</threadSleepDuration>
+
+                <batchSize>1000</batchSize>
+            </writeParams>
+            <querySet concurrency="1" executionType="PARALLEL" executionDurationInMs="10000">
+                <query id="q3" statement="select count(*) from PHERF.TEST_TABLE"/>
+                <query id="q4" statement="select sum(DIVISION) from PHERF.TEST_TABLE"/>
+            </querySet>
+
+        </scenario>
+
+        <scenario tableName="PHERF.TEST_TABLE" rowCount="30" name="testScenario">
             <!-- Scenario level rule overrides will be unsupported in V1.
                     You can use the general datamappings in the mean time-->
             <dataOverride>
@@ -145,16 +185,20 @@
             <!--Note: 1. Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first 
                       2. DDL included in query are executed only once on start of querySet execution.
             -->
-            <querySet concurrency="1-3" executionType="SERIAL" executionDurationInMs="5000" numberOfExecutions="100">
-                <query id="q1" tenantId="123456789012345" expectedAggregateRowCount="0" statement="select count(*) from PHERF.TEST_TABLE"/>
+            <querySet concurrency="1-3" executionType="SERIAL" executionDurationInMs="5000"
+                      numberOfExecutions="100">
+                <query id="q1" tenantId="123456789012345" expectedAggregateRowCount="0"
+                       statement="select count(*) from PHERF.TEST_TABLE"/>
                 <!-- queryGroup is a way to organize queries across tables or scenario files.
                     The value will be dumped to results. This gives a value to group by on reporting to compare queries -->
-                <query id="q2" queryGroup="g1" statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
+                <query id="q2" queryGroup="g1"
+                       statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
             </querySet>
             <!--Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first -->
-            <querySet concurrency="2-3" executionType="PARALLEL" executionDurationInMs="10000" numberOfExecutions="10">
+            <querySet concurrency="2-3" executionType="PARALLEL" executionDurationInMs="10000"
+                      numberOfExecutions="10">
                 <query id="q3" statement="select count(*) from PHERF.TEST_TABLE"/>
-                <query id="q4" statement="select sum(DIVISION) from PHERF.TEST_TABLE"/>
+                <query id="q4" statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
             </querySet>
         </scenario>
     </scenarios>