You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by mu...@apache.org on 2018/02/07 19:48:10 UTC

phoenix git commit: PHOENIX-4549 Pherf - Column override and sequenced index creation support

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 5e3671916 -> 907e90932


PHOENIX-4549 Pherf - Column override and sequenced index creation support


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/907e9093
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/907e9093
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/907e9093

Branch: refs/heads/4.x-HBase-0.98
Commit: 907e90932223443e6f370c4ebe6cdba5d98b7699
Parents: 5e36719
Author: Mujtaba <mu...@apache.org>
Authored: Wed Feb 7 11:47:28 2018 -0800
Committer: Mujtaba <mu...@apache.org>
Committed: Wed Feb 7 11:47:28 2018 -0800

----------------------------------------------------------------------
 .../phoenix/pherf/configuration/Column.java     |  23 +--
 .../pherf/configuration/DataTypeMapping.java    |   6 +-
 .../phoenix/pherf/configuration/Query.java      |  25 ++++
 .../phoenix/pherf/configuration/Scenario.java   |   3 +-
 .../phoenix/pherf/result/QueryResult.java       |  10 +-
 .../phoenix/pherf/result/ResultManager.java     |  18 ++-
 .../apache/phoenix/pherf/result/ResultUtil.java |   9 +-
 .../phoenix/pherf/rules/RulesApplier.java       | 142 +++++++++++++++----
 .../phoenix/pherf/schema/SchemaReader.java      |   2 +-
 .../apache/phoenix/pherf/util/PhoenixUtil.java  |  44 +++++-
 .../pherf/workload/MultiThreadedRunner.java     |  35 ++++-
 .../phoenix/pherf/workload/QueryExecutor.java   |  36 ++---
 .../phoenix/pherf/workload/WriteWorkload.java   |  54 ++++++-
 .../scenario/prod_test_unsalted_scenario.xml    |  14 +-
 .../org/apache/phoenix/pherf/ColumnTest.java    |   3 +
 .../phoenix/pherf/ConfigurationParserTest.java  |   4 +-
 .../apache/phoenix/pherf/RuleGeneratorTest.java | 107 +++++++++++++-
 .../test/resources/datamodel/test_schema.sql    |   4 +
 .../test/resources/scenario/test_scenario.xml   |  48 ++++++-
 19 files changed, 484 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Column.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Column.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Column.java
index 7c9e180..0d64a39 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Column.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Column.java
@@ -28,7 +28,8 @@ public class Column {
 	private String name;
     private String prefix;
     private DataSequence dataSequence;
-    private int length, minValue, maxValue, precision;
+    private int length, precision;
+    private long minValue, maxValue;
     private int nullChance;
     private boolean userDefined;
     private List<DataValue> dataValues;
@@ -40,8 +41,8 @@ public class Column {
         // Initialize int to negative value so we can distinguish 0 in mutations
         // Object fields can be detected with null
         this.length = Integer.MIN_VALUE;
-        this.minValue = Integer.MIN_VALUE;
-        this.maxValue = Integer.MIN_VALUE;
+        this.minValue = Long.MIN_VALUE;
+        this.maxValue = Long.MIN_VALUE;
         this.precision = Integer.MIN_VALUE;
         this.nullChance = Integer.MIN_VALUE;
         this.userDefined = false;
@@ -84,6 +85,10 @@ public class Column {
 	public int getLength() {
 		return length;
 	}
+	
+	public int getLengthExcludingPrefix() {
+		return (this.getPrefix() == null) ? this.length : this.length - this.getPrefix().length();
+	}
 
 	public void setLength(int length) {
 		this.length = length;
@@ -97,19 +102,19 @@ public class Column {
 		this.type = type;
 	}
 
-    public int getMinValue() {
+    public long getMinValue() {
         return minValue;
     }
 
-    public void setMinValue(int minValue) {
+    public void setMinValue(long minValue) {
         this.minValue = minValue;
     }
 
-    public int getMaxValue() {
+    public long getMaxValue() {
         return maxValue;
     }
 
-    public void setMaxValue(int maxValue) {
+    public void setMaxValue(long maxValue) {
         this.maxValue = maxValue;
     }
 
@@ -134,11 +139,11 @@ public class Column {
      *               obj contains only the fields you want to mutate this object into.
      */
     public void mutate(Column column) {
-        if (column.getMinValue() != Integer.MIN_VALUE) {
+        if (column.getMinValue() != Long.MIN_VALUE) {
             setMinValue(column.getMinValue());
         }
 
-        if (column.getMaxValue() != Integer.MIN_VALUE) {
+        if (column.getMaxValue() != Long.MIN_VALUE) {
             setMaxValue(column.getMaxValue());
         }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
index c266a57..0476df2 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
@@ -25,7 +25,11 @@ public enum DataTypeMapping {
     CHAR("CHAR", Types.CHAR),
     DECIMAL("DECIMAL", Types.DECIMAL),
     INTEGER("INTEGER", Types.INTEGER),
-    DATE("DATE", Types.DATE);
+    DATE("DATE", Types.DATE),
+    UNSIGNED_LONG("UNSIGNED_LONG", Types.LONGVARCHAR),
+    VARCHAR_ARRAY("VARCHAR ARRAY", Types.ARRAY),
+    VARBINARY("VARBINARY", Types.VARBINARY),
+    TIMESTAMP("TIMESTAMP", Types.TIMESTAMP);
 
     private final String sType;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
index 1e5cabe..e283715 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
@@ -18,9 +18,14 @@
 
 package org.apache.phoenix.pherf.configuration;
 
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlType;
 
+import org.apache.phoenix.pherf.rules.RulesApplier;
+
 @XmlType
 public class Query {
 
@@ -30,7 +35,12 @@ public class Query {
     private String ddl;
     private String queryGroup;
     private String id;
+    private Pattern pattern;
 
+    public Query() {
+    	pattern = Pattern.compile("\\[.*?\\]");
+    }
+    
     /**
      * SQL statement
      *
@@ -40,6 +50,21 @@ public class Query {
     public String getStatement() {
         return statement;
     }
+    
+    public String getDynamicStatement(RulesApplier ruleApplier, Scenario scenario) throws Exception {
+    	String ret = this.statement;
+    	String needQuotes = "";
+    	Matcher m = pattern.matcher(ret);
+        while(m.find()) {
+        	String dynamicField = m.group(0).replace("[", "").replace("]", "");
+        	Column dynamicColumn = ruleApplier.getRule(dynamicField, scenario);
+			needQuotes = (dynamicColumn.getType() == DataTypeMapping.CHAR || dynamicColumn
+					.getType() == DataTypeMapping.VARCHAR) ? "'" : "";
+			ret = ret.replace("[" + dynamicField + "]",
+					needQuotes + ruleApplier.getDataValue(dynamicColumn).getValue() + needQuotes);
+     }
+      	return ret;    	
+    }
 
     public void setStatement(String statement) {
         // normalize statement - merge all consecutive spaces into one

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
index 02e5cc7..132207b 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
@@ -37,7 +37,7 @@ public class Scenario {
     private Map<String, String> phoenixProperties;
     private DataOverride dataOverride;
     private List<QuerySet> querySet = new ArrayList<>();
-    private WriteParams writeParams;
+    private WriteParams writeParams = null;
     private String name;
     private String tenantId;
     private List<Ddl> preScenarioDdls;
@@ -45,7 +45,6 @@ public class Scenario {
     
    
     public Scenario() {
-        writeParams = new WriteParams();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
index 669a472..cef24f4 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java
@@ -19,7 +19,9 @@
 package org.apache.phoenix.pherf.result;
 
 import org.apache.phoenix.pherf.configuration.Query;
+import org.apache.phoenix.pherf.configuration.Scenario;
 import org.apache.phoenix.pherf.result.file.ResultFileDetails;
+import org.apache.phoenix.pherf.rules.RulesApplier;
 import org.apache.phoenix.pherf.util.PhoenixUtil;
 import org.apache.phoenix.util.DateUtil;
 
@@ -99,7 +101,7 @@ public class QueryResult extends Query {
         return totalRunTime / getThreadTimes().size();
     }
 
-    public List<ResultValue> getCsvRepresentation(ResultUtil util) {
+    public List<ResultValue> getCsvRepresentation(ResultUtil util, Scenario scenario, RulesApplier ruleApplier) {
         List<ResultValue> rowValues = new ArrayList<>();
         rowValues.add(new ResultValue(util.convertNull(getStartTimeText())));
         rowValues.add(new ResultValue(util.convertNull(this.getQueryGroup())));
@@ -109,14 +111,14 @@ public class QueryResult extends Query {
         rowValues.add(new ResultValue(util.convertNull(String.valueOf(getAvgRunTimeInMs()))));
         rowValues.add(new ResultValue(util.convertNull(String.valueOf(getAvgMinRunTimeInMs()))));
         rowValues.add(new ResultValue(util.convertNull(String.valueOf(getRunCount()))));
-        rowValues.add(new ResultValue(util.convertNull(String.valueOf(getExplainPlan()))));
+        rowValues.add(new ResultValue(util.convertNull(String.valueOf(getExplainPlan(scenario, ruleApplier)))));
         rowValues.add(new ResultValue(util.convertNull(String.valueOf(getResultRowCount()))));
         return rowValues;
     }
     
-    private String getExplainPlan() {
+    private String getExplainPlan(Scenario scenario, RulesApplier ruleApplier) {
     	try {
-			return pUtil.getExplainPlan(this);
+			return pUtil.getExplainPlan(this, scenario, ruleApplier);
 		} catch (SQLException e) {
 			e.printStackTrace();
 		}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java
index 5e0f242..929f96a 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java
@@ -22,6 +22,7 @@ import org.apache.phoenix.pherf.PherfConstants;
 import org.apache.phoenix.pherf.result.file.ResultFileDetails;
 import org.apache.phoenix.pherf.result.impl.CSVFileResultHandler;
 import org.apache.phoenix.pherf.result.impl.XMLResultHandler;
+import org.apache.phoenix.pherf.rules.RulesApplier;
 import org.apache.phoenix.util.InstanceResolver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,18 +82,23 @@ public class ResultManager {
         }
     }
 
+    
+    public synchronized void write(DataModelResult result) throws Exception {
+    	write(result, null);
+    }
+    
     /**
      * Write out the result to each writer in the pool
      *
      * @param result {@link DataModelResult}
      * @throws Exception
      */
-    public synchronized void write(DataModelResult result) throws Exception {
+    public synchronized void write(DataModelResult result, RulesApplier ruleApplier) throws Exception {
         try {
             util.ensureBaseResultDirExists();
             final DataModelResult dataModelResultCopy = new DataModelResult(result);
             for (ResultHandler handler : resultHandlers) {
-                util.write(handler, dataModelResultCopy);
+                util.write(handler, dataModelResultCopy, ruleApplier);
             }
         } finally {
             for (ResultHandler handler : resultHandlers) {
@@ -108,13 +114,17 @@ public class ResultManager {
         }
     }
 
+    public synchronized void write(List<DataModelResult> dataModelResults) throws Exception {
+    	write(dataModelResults, null);
+    }
+    
     /**
      * Write a combined set of results for each result in the list.
      *
      * @param dataModelResults List<{@link DataModelResult > </>}
      * @throws Exception
      */
-    public synchronized void write(List<DataModelResult> dataModelResults) throws Exception {
+    public synchronized void write(List<DataModelResult> dataModelResults, RulesApplier rulesApplier) throws Exception {
         util.ensureBaseResultDirExists();
 
         CSVFileResultHandler detailsCSVWriter = null;
@@ -123,7 +133,7 @@ public class ResultManager {
             detailsCSVWriter.setResultFileDetails(ResultFileDetails.CSV_DETAILED_PERFORMANCE);
             detailsCSVWriter.setResultFileName(PherfConstants.COMBINED_FILE_NAME);
             for (DataModelResult dataModelResult : dataModelResults) {
-                util.write(detailsCSVWriter, dataModelResult);
+                util.write(detailsCSVWriter, dataModelResult, rulesApplier);
             }
         } finally {
             if (detailsCSVWriter != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
index 0c2a7b8..30988ef 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultUtil.java
@@ -23,6 +23,7 @@ import org.apache.phoenix.pherf.PherfConstants;
 import org.apache.phoenix.pherf.result.file.ResultFileDetails;
 import org.apache.phoenix.pherf.result.impl.CSVFileResultHandler;
 import org.apache.phoenix.pherf.result.impl.CSVResultHandler;
+import org.apache.phoenix.pherf.rules.RulesApplier;
 import org.apache.phoenix.pherf.util.PhoenixUtil;
 
 import java.io.File;
@@ -117,7 +118,7 @@ public class ResultUtil {
         }
     }
 
-    public synchronized void write(ResultHandler resultHandler, DataModelResult dataModelResult)
+    public synchronized void write(ResultHandler resultHandler, DataModelResult dataModelResult, RulesApplier ruleApplier)
             throws Exception {
         ResultFileDetails resultFileDetails = resultHandler.getResultFileDetails();
         switch (resultFileDetails) {
@@ -126,7 +127,7 @@ public class ResultUtil {
         case CSV_DETAILED_FUNCTIONAL:
             List<List<ResultValue>>
                     rowDetails =
-                    getCSVResults(dataModelResult, resultFileDetails);
+                    getCSVResults(dataModelResult, resultFileDetails, ruleApplier);
             for (List<ResultValue> row : rowDetails) {
                 Result
                         result =
@@ -199,7 +200,7 @@ public class ResultUtil {
     }
 
     private List<List<ResultValue>> getCSVResults(DataModelResult dataModelResult,
-            ResultFileDetails resultFileDetails) {
+            ResultFileDetails resultFileDetails, RulesApplier ruleApplier) {
         List<List<ResultValue>> rowList = new ArrayList<>();
 
         for (ScenarioResult result : dataModelResult.getScenarioResult()) {
@@ -207,7 +208,7 @@ public class ResultUtil {
                 for (QueryResult queryResult : querySetResult.getQueryResults()) {
                     switch (resultFileDetails) {
                     case CSV_AGGREGATE_PERFORMANCE:
-                        List<ResultValue> csvResult = queryResult.getCsvRepresentation(this);
+                        List<ResultValue> csvResult = queryResult.getCsvRepresentation(this, result, ruleApplier);
                         rowList.add(csvResult);
                         break;
                     case CSV_DETAILED_PERFORMANCE:

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
index 454050b..2afc29a 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
@@ -39,7 +39,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 public class RulesApplier {
     private static final Logger logger = LoggerFactory.getLogger(RulesApplier.class);
-    private static final AtomicLong COUNTER = new AtomicLong(100);
+    private static final AtomicLong COUNTER = new AtomicLong(0);
 
     // Used to bail out of random distribution if it takes too long
     // This should never happen when distributions add up to 100
@@ -51,6 +51,9 @@ public class RulesApplier {
 
     private final XMLConfigParser parser;
     private final List<Map> modelList;
+    private final Map<String, Column> columnMap;
+    private String cachedScenarioOverrideName;
+    private Map<DataTypeMapping, List> scenarioOverrideMap;
 
 
     public RulesApplier(XMLConfigParser parser) {
@@ -60,15 +63,39 @@ public class RulesApplier {
     public RulesApplier(XMLConfigParser parser, long seed) {
         this.parser = parser;
         this.modelList = new ArrayList<Map>();
+        this.columnMap = new HashMap<String, Column>();
         this.rndNull = new Random(seed);
         this.rndVal = new Random(seed);
         this.randomDataGenerator = new RandomDataGenerator();
+        this.cachedScenarioOverrideName = null;
         populateModelList();
     }
 
     public List<Map> getModelList() {
         return Collections.unmodifiableList(this.modelList);
     }
+    
+    private Map<DataTypeMapping, List> getCachedScenarioOverrides(Scenario scenario) {
+    	if (this.cachedScenarioOverrideName == null || this.cachedScenarioOverrideName != scenario.getName()) {
+    		this.cachedScenarioOverrideName = scenario.getName();
+    		this.scenarioOverrideMap = new HashMap<DataTypeMapping, List>();
+
+    	       if (scenario.getDataOverride() != null) {
+				for (Column column : scenario.getDataOverride().getColumn()) {
+					List<Column> cols;
+					DataTypeMapping type = column.getType();
+					if (this.scenarioOverrideMap.containsKey(type)) {
+						this.scenarioOverrideMap.get(type).add(column);
+					} else {
+						cols = new LinkedList<Column>();
+						cols.add(column);
+						this.scenarioOverrideMap.put(type, cols);
+					}
+				}
+			}
+    	}
+		return scenarioOverrideMap;
+    }
 
 
     /**
@@ -84,11 +111,26 @@ public class RulesApplier {
      */
     public DataValue getDataForRule(Scenario scenario, Column phxMetaColumn) throws Exception {
         // TODO Make a Set of Rules that have already been applied so that so we don't generate for every value
-
+    	
         List<Scenario> scenarios = parser.getScenarios();
         DataValue value = null;
         if (scenarios.contains(scenario)) {
             logger.debug("We found a correct Scenario");
+            
+            Map<DataTypeMapping, List> overrideRuleMap = this.getCachedScenarioOverrides(scenario);
+            
+            if (overrideRuleMap != null) {
+	            List<Column> overrideRuleList = this.getCachedScenarioOverrides(scenario).get(phxMetaColumn.getType());
+	            
+				if (overrideRuleList != null && overrideRuleList.contains(phxMetaColumn)) {
+					logger.debug("We found a correct override column rule");
+					Column columnRule = getColumnForRuleOverride(overrideRuleList, phxMetaColumn);
+					if (columnRule != null) {
+						return getDataValue(columnRule);
+					}
+				}
+            }
+            
             // Assume the first rule map
             Map<DataTypeMapping, List> ruleMap = modelList.get(0);
             List<Column> ruleList = ruleMap.get(phxMetaColumn.getType());
@@ -107,6 +149,7 @@ public class RulesApplier {
             }
 
         }
+        
         return value;
     }
 
@@ -140,19 +183,9 @@ public class RulesApplier {
 
         switch (column.getType()) {
             case VARCHAR:
-                // Use the specified data values from configs if they exist
-                if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
-                    data = pickDataValueFromList(dataValues);
-                } else {
-                    Preconditions.checkArgument(length > 0, "length needs to be > 0");
-                    if (column.getDataSequence() == DataSequence.SEQUENTIAL) {
-                        data = getSequentialDataValue(column);
-                    } else {
-                        data = getRandomDataValue(column);
-                    }
-                }
-                break;
+            case VARBINARY:
             case CHAR:
+                // Use the specified data values from configs if they exist
                 if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
                     data = pickDataValueFromList(dataValues);
                 } else {
@@ -164,6 +197,17 @@ public class RulesApplier {
                     }
                 }
                 break;
+            case VARCHAR_ARRAY:
+            	//only list datavalues are supported
+            	String arr = "";
+            	for (DataValue dv : dataValues) {
+            		arr += "," + dv.getValue();
+            	}
+            	if (arr.startsWith(",")) {
+            		arr = arr.replaceFirst(",", "");
+            	}
+            	data = new DataValue(column.getType(), arr);
+            	break;
             case DECIMAL:
                 if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
                     data = pickDataValueFromList(dataValues);
@@ -171,8 +215,8 @@ public class RulesApplier {
                     int precision = column.getPrecision();
                     double minDbl = column.getMinValue();
                     Preconditions.checkArgument((precision > 0) && (precision <= 18), "Precision must be between 0 and 18");
-                    Preconditions.checkArgument(minDbl >= 0, "minvalue must be set in configuration");
-                    Preconditions.checkArgument(column.getMaxValue() > 0, "maxValue must be set in configuration");
+                    Preconditions.checkArgument(minDbl >= 0, "minvalue must be set in configuration for decimal");
+                    Preconditions.checkArgument(column.getMaxValue() > 0, "maxValue must be set in configuration decimal");
                     StringBuilder maxValueStr = new StringBuilder();
 
                     for (int i = 0; i < precision; i++) {
@@ -188,22 +232,34 @@ public class RulesApplier {
                 if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
                     data = pickDataValueFromList(dataValues);
                 } else {
-                    int minInt = column.getMinValue();
-                    int maxInt = column.getMaxValue();
-                    Preconditions.checkArgument((minInt > 0) && (maxInt > 0), "min and max values need to be set in configuration");
+                    int minInt = (int) column.getMinValue();
+                    int maxInt = (int) column.getMaxValue();
+                    Preconditions.checkArgument((minInt > 0) && (maxInt > 0), "min and max values need to be set in configuration for integers " + column.getName());
                     int intVal = RandomUtils.nextInt(minInt, maxInt);
                     data = new DataValue(column.getType(), String.valueOf(intVal));
                 }
                 break;
+            case UNSIGNED_LONG:
+                if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
+                    data = pickDataValueFromList(dataValues);
+                } else {
+                    long minLong = column.getMinValue();
+                    long maxLong = column.getMaxValue();
+                    Preconditions.checkArgument((minLong > 0) && (maxLong > 0), "min and max values need to be set in configuration for unsigned_longs " + column.getName());
+                    long longVal = RandomUtils.nextLong(minLong, maxLong);
+                    data = new DataValue(column.getType(), String.valueOf(longVal));
+                }
+                break;
             case DATE:
+            case TIMESTAMP:
                 if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
                     data = pickDataValueFromList(dataValues);
                     // Check if date has right format or not
                     data.setValue(checkDatePattern(data.getValue()));
                 } else if (column.getUseCurrentDate() != true){
-                    int minYear = column.getMinValue();
-                    int maxYear = column.getMaxValue();
-                    Preconditions.checkArgument((minYear > 0) && (maxYear > 0), "min and max values need to be set in configuration");
+                    int minYear = (int) column.getMinValue();
+                    int maxYear = (int) column.getMaxValue();
+                    Preconditions.checkArgument((minYear > 0) && (maxYear > 0), "min and max values need to be set in configuration for date/timestamps " + column.getName());
 
                     String dt = generateRandomDate(minYear, maxYear);
                     data = new DataValue(column.getType(), dt);
@@ -353,13 +409,15 @@ public class RulesApplier {
         if (!modelList.isEmpty()) {
             return;
         }
-
+        
         // Support for multiple models, but rules are only relevant each model
         for (DataModel model : parser.getDataModels()) {
 
             // Step 1
             final Map<DataTypeMapping, List> ruleMap = new HashMap<DataTypeMapping, List>();
             for (Column column : model.getDataMappingColumns()) {
+            	columnMap.put(column.getName(), column);
+            	
                 List<Column> cols;
                 DataTypeMapping type = column.getType();
                 if (ruleMap.containsKey(type)) {
@@ -382,7 +440,33 @@ public class RulesApplier {
         List<Column> ruleList = ruleMap.get(phxMetaColumn.getType());
         return getColumnForRule(ruleList, phxMetaColumn);
     }
+    
+    public Column getRule(String columnName) {
+    	return getRule(columnName, null);
+    }
+    
+    public Column getRule(String columnName, Scenario scenario) {
+    	if (null != scenario && null != scenario.getDataOverride()) {
+    		for (Column column: scenario.getDataOverride().getColumn()) {
+    			if (column.getName().equals(columnName)) {
+    				return column;
+    			}
+    		}
+    	}
+
+    	return columnMap.get(columnName);
+    }
 
+    private Column getColumnForRuleOverride(List<Column> ruleList, Column phxMetaColumn) {
+        for (Column columnRule : ruleList) {
+            if (columnRule.getName().equals(phxMetaColumn.getName())) {
+                return new Column(columnRule);
+            }
+        }
+
+       	return null;
+    }
+    
     private Column getColumnForRule(List<Column> ruleList, Column phxMetaColumn) {
 
         // Column pointer to head of list
@@ -400,7 +484,7 @@ public class RulesApplier {
             ruleAppliedColumn.mutate(columnRule);
         }
 
-        return ruleAppliedColumn;
+       	return ruleAppliedColumn;
     }
 
     /**
@@ -414,10 +498,12 @@ public class RulesApplier {
         DataValue data = null;
         long inc = COUNTER.getAndIncrement();
         String strInc = String.valueOf(inc);
-        String varchar = RandomStringUtils.randomAlphanumeric(column.getLength() - strInc.length());
-        varchar = (column.getPrefix() != null) ? column.getPrefix() + strInc + varchar :
-                strInc + varchar;
-
+		int paddedLength = column.getLengthExcludingPrefix();
+		String strInc1 = StringUtils.leftPad(strInc, paddedLength, "0");
+		String strInc2 = StringUtils.right(strInc1, column.getLengthExcludingPrefix());
+        String varchar = (column.getPrefix() != null) ? column.getPrefix() + strInc2:
+                strInc2;
+        
         // Truncate string back down if it exceeds length
         varchar = StringUtils.left(varchar,column.getLength());
         data = new DataValue(column.getType(), varchar);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java
index 439f87e..5ccdaaa 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/schema/SchemaReader.java
@@ -62,7 +62,7 @@ public class SchemaReader {
     public void applySchema() throws Exception {
         Connection connection = null;
         try {
-            connection = pUtil.getConnection();
+            connection = pUtil.getConnection(null);
             for (Path file : resourceList) {
                 logger.info("\nApplying schema to file: " + file);
                 pUtil.executeStatement(resourceToString(file), connection);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
index 38dcd64..72ab3e0 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
@@ -25,6 +25,8 @@ import org.apache.phoenix.pherf.configuration.*;
 import org.apache.phoenix.pherf.jmx.MonitorManager;
 import org.apache.phoenix.pherf.result.DataLoadThreadTime;
 import org.apache.phoenix.pherf.result.DataLoadTimeSummary;
+import org.apache.phoenix.pherf.rules.RulesApplier;
+import org.apache.phoenix.pherf.util.GoogleChartGenerator.Node;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +35,7 @@ import java.sql.*;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
@@ -50,6 +53,7 @@ public class PhoenixUtil {
     private static String queryServerUrl;
     private static final String ASYNC_KEYWORD = "ASYNC";
     private static final int ONE_MIN_IN_MS = 60000;
+    private static String CurrentSCN = null;
 
     private PhoenixUtil() {
         this(false);
@@ -86,10 +90,14 @@ public class PhoenixUtil {
     }
 
     public Connection getConnection(String tenantId) throws Exception {
-        return getConnection(tenantId, testEnabled);
+        return getConnection(tenantId, testEnabled, null);
+    }
+    
+    public Connection getConnection(String tenantId, Map<String, String> phoenixProperty) throws Exception {
+        return getConnection(tenantId, testEnabled, phoenixProperty);
     }
 
-    private Connection getConnection(String tenantId, boolean testEnabled) throws Exception {
+    public Connection getConnection(String tenantId, boolean testEnabled, Map<String, String> phoenixProperty) throws Exception {
         if (useThinDriver) {
             if (null == queryServerUrl) {
                 throw new IllegalArgumentException("QueryServer URL must be set before" +
@@ -112,6 +120,16 @@ public class PhoenixUtil {
                 props.setProperty("TenantId", tenantId);
                 logger.debug("\nSetting tenantId to " + tenantId);
             }
+            
+            if (phoenixProperty != null) {
+            	for (Map.Entry<String, String> phxProperty: phoenixProperty.entrySet()) {
+            		props.setProperty(phxProperty.getKey(), phxProperty.getValue());
+					System.out.println("Setting connection property "
+							+ phxProperty.getKey() + " to "
+							+ phxProperty.getValue());
+            	}
+            }
+            
             String url = "jdbc:phoenix:" + zookeeper + (testEnabled ? ";test=true" : "");
             return DriverManager.getConnection(url, props);
         }
@@ -232,7 +250,7 @@ public class PhoenixUtil {
     public ResultSet getColumnsMetaData(String schemaName, String tableName, Connection connection)
             throws SQLException {
         DatabaseMetaData dbmd = connection.getMetaData();
-        ResultSet resultSet = dbmd.getColumns(null, schemaName, tableName, null);
+        ResultSet resultSet = dbmd.getColumns(null, schemaName.toUpperCase(), tableName.toUpperCase(), null);
         return resultSet;
     }
 
@@ -245,7 +263,7 @@ public class PhoenixUtil {
             while (resultSet.next()) {
                 Column column = new Column();
                 column.setName(resultSet.getString("COLUMN_NAME"));
-                column.setType(DataTypeMapping.valueOf(resultSet.getString("TYPE_NAME")));
+                column.setType(DataTypeMapping.valueOf(resultSet.getString("TYPE_NAME").replace(" ", "_")));
                 column.setLength(resultSet.getInt("COLUMN_SIZE"));
                 columnList.add(column);
             }
@@ -392,21 +410,35 @@ public class PhoenixUtil {
         executeStatement("UPDATE STATISTICS " + tableName, scenario);
     }
 
+    public String getExplainPlan(Query query) throws SQLException {
+    	return getExplainPlan(query, null, null);
+    }
+    
     /**
      * Get explain plan for a query
      *
      * @param query
+     * @param ruleApplier 
+     * @param scenario 
      * @return
      * @throws SQLException
      */
-    public String getExplainPlan(Query query) throws SQLException {
+    public String getExplainPlan(Query query, Scenario scenario, RulesApplier ruleApplier) throws SQLException {
         Connection conn = null;
         ResultSet rs = null;
         PreparedStatement statement = null;
         StringBuilder buf = new StringBuilder();
         try {
             conn = getConnection(query.getTenantId());
-            statement = conn.prepareStatement("EXPLAIN " + query.getStatement());
+            String explainQuery;
+            if (scenario != null && ruleApplier != null) {
+            	explainQuery = query.getDynamicStatement(ruleApplier, scenario);
+            }
+            else {
+            	explainQuery = query.getStatement();
+            }
+            
+            statement = conn.prepareStatement("EXPLAIN " + explainQuery);
             rs = statement.executeQuery();
             while (rs.next()) {
                 buf.append(rs.getString(1).trim().replace(",", "-"));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
index 24c68dc..7b9313f 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
@@ -28,10 +28,14 @@ import org.apache.phoenix.pherf.result.DataModelResult;
 import org.apache.phoenix.pherf.result.ResultManager;
 import org.apache.phoenix.pherf.result.RunTime;
 import org.apache.phoenix.pherf.result.ThreadTime;
+import org.apache.phoenix.pherf.rules.RulesApplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import org.apache.phoenix.pherf.PherfConstants.GeneratePhoenixStats;
 import org.apache.phoenix.pherf.configuration.Query;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.configuration.WriteParams;
+import org.apache.phoenix.pherf.configuration.XMLConfigParser;
 import org.apache.phoenix.pherf.util.PhoenixUtil;
 
 class MultiThreadedRunner implements Runnable {
@@ -45,6 +49,11 @@ class MultiThreadedRunner implements Runnable {
     private long executionDurationInMs;
     private static long lastResultWritten = System.currentTimeMillis() - 1000;
     private final ResultManager resultManager;
+    private final RulesApplier ruleApplier;
+    private final Scenario scenario;
+    private final WorkloadExecutor workloadExecutor;
+    private final XMLConfigParser parser;
+    
 
     /**
      * MultiThreadedRunner
@@ -55,16 +64,21 @@ class MultiThreadedRunner implements Runnable {
      * @param threadTime
      * @param numberOfExecutions
      * @param executionDurationInMs
+     * @param ruleRunner 
      */
     MultiThreadedRunner(String threadName, Query query, DataModelResult dataModelResult,
-            ThreadTime threadTime, long numberOfExecutions, long executionDurationInMs, boolean writeRuntimeResults) {
+            ThreadTime threadTime, long numberOfExecutions, long executionDurationInMs, boolean writeRuntimeResults, RulesApplier ruleApplier, Scenario scenario, WorkloadExecutor workloadExecutor, XMLConfigParser parser) {
         this.query = query;
         this.threadName = threadName;
         this.threadTime = threadTime;
         this.dataModelResult = dataModelResult;
         this.numberOfExecutions = numberOfExecutions;
         this.executionDurationInMs = executionDurationInMs;
+        this.ruleApplier = ruleApplier;
+        this.scenario = scenario;
        	this.resultManager = new ResultManager(dataModelResult.getName(), writeRuntimeResults);
+       	this.workloadExecutor = workloadExecutor;
+       	this.parser = parser;
     }
 
     /**
@@ -81,7 +95,7 @@ class MultiThreadedRunner implements Runnable {
                 synchronized (resultManager) {
                     timedQuery();
                     if ((System.currentTimeMillis() - lastResultWritten) > 1000) {
-                        resultManager.write(dataModelResult);
+                        resultManager.write(dataModelResult, ruleApplier);
                         lastResultWritten = System.currentTimeMillis();
                     }
                 }
@@ -108,7 +122,7 @@ class MultiThreadedRunner implements Runnable {
     private void timedQuery() throws Exception {
         boolean
                 isSelectCountStatement =
-                query.getStatement().toUpperCase().trim().contains("COUNT(*)") ? true : false;
+                query.getStatement().toUpperCase().trim().contains("COUNT(") ? true : false;
 
         Connection conn = null;
         PreparedStatement statement = null;
@@ -119,8 +133,17 @@ class MultiThreadedRunner implements Runnable {
         long resultRowCount = 0;
 
         try {
-            conn = pUtil.getConnection(query.getTenantId());
-            statement = conn.prepareStatement(query.getStatement());
+            conn = pUtil.getConnection(query.getTenantId(), scenario.getPhoenixProperties());
+            conn.setAutoCommit(true);
+            final String statementString = query.getDynamicStatement(ruleApplier, scenario);
+            statement = conn.prepareStatement(statementString);
+            logger.info("Executing: " + statementString);
+            
+            if (scenario.getWriteParams() != null) {
+            	Workload writes = new WriteWorkload(PhoenixUtil.create(), parser, scenario, GeneratePhoenixStats.NO);
+            	workloadExecutor.add(writes);
+            }
+            
             boolean isQuery = statement.execute();
             if (isQuery) {
                 rs = statement.getResultSet();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
index 7f861f1..8d0ced5 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.phoenix.pherf.PherfConstants.GeneratePhoenixStats;
 import org.apache.phoenix.pherf.configuration.*;
 import org.apache.phoenix.pherf.result.*;
+import org.apache.phoenix.pherf.rules.RulesApplier;
 import org.apache.phoenix.pherf.util.PhoenixUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +43,7 @@ public class QueryExecutor implements Workload {
     private final PhoenixUtil util;
     private final WorkloadExecutor workloadExecutor;
     private final boolean writeRuntimeResults;
+    private RulesApplier ruleApplier;
 
     public QueryExecutor(XMLConfigParser parser, PhoenixUtil util,
             WorkloadExecutor workloadExecutor) {
@@ -64,6 +66,7 @@ public class QueryExecutor implements Workload {
         this.util = util;
         this.workloadExecutor = workloadExecutor;
         this.writeRuntimeResults = writeRuntimeResults;
+        this.ruleApplier = new RulesApplier(parser);
     }
 
     @Override
@@ -143,17 +146,6 @@ public class QueryExecutor implements Workload {
                         ScenarioResult scenarioResult = new ScenarioResult(scenario);
                         scenarioResult.setPhoenixProperties(phoenixProperty);
                         dataModelResult.getScenarioResult().add(scenarioResult);
-                        WriteParams writeParams = scenario.getWriteParams();
-
-                        if (writeParams != null) {
-                            int writerThreadCount = writeParams.getWriterThreadCount();
-                            for (int i = 0; i < writerThreadCount; i++) {
-                                logger.debug("Inserting write workload ( " + i + " ) of ( "
-                                        + writerThreadCount + " )");
-                                Workload writes = new WriteWorkload(PhoenixUtil.create(), parser, GeneratePhoenixStats.NO);
-                                workloadExecutor.add(writes);
-                            }
-                        }
 
                         for (QuerySet querySet : scenario.getQuerySet()) {
                             QuerySetResult querySetResult = new QuerySetResult(querySet);
@@ -161,14 +153,14 @@ public class QueryExecutor implements Workload {
 
                             util.executeQuerySetDdls(querySet);
                             if (querySet.getExecutionType() == ExecutionType.SERIAL) {
-                                executeQuerySetSerial(dataModelResult, querySet, querySetResult);
+                                executeQuerySetSerial(dataModelResult, querySet, querySetResult, scenario);
                             } else {
-                                executeQuerySetParallel(dataModelResult, querySet, querySetResult);
+                                executeQuerySetParallel(dataModelResult, querySet, querySetResult, scenario);
                             }
                         }
-                        resultManager.write(dataModelResult);
+                        resultManager.write(dataModelResult, ruleApplier);
                     }
-                    resultManager.write(dataModelResults);
+                    resultManager.write(dataModelResults, ruleApplier);
                     resultManager.flush();
                 } catch (Exception e) {
                     logger.warn("", e);
@@ -183,10 +175,11 @@ public class QueryExecutor implements Workload {
      * @param dataModelResult
      * @param querySet
      * @param querySetResult
+     * @param scenario 
      * @throws InterruptedException
      */
     protected void executeQuerySetSerial(DataModelResult dataModelResult, QuerySet querySet,
-            QuerySetResult querySetResult) throws InterruptedException {
+            QuerySetResult querySetResult, Scenario scenario) throws InterruptedException {
         for (Query query : querySet.getQuery()) {
             QueryResult queryResult = new QueryResult(query);
             querySetResult.getQueryResults().add(queryResult);
@@ -200,7 +193,7 @@ public class QueryExecutor implements Workload {
                     Runnable
                             thread =
                             executeRunner((i + 1) + "," + cr, dataModelResult, queryResult,
-                                    querySetResult);
+                                    querySetResult, scenario);
                     threads.add(workloadExecutor.getPool().submit(thread));
                 }
 
@@ -224,7 +217,7 @@ public class QueryExecutor implements Workload {
      * @throws InterruptedException
      */
     protected void executeQuerySetParallel(DataModelResult dataModelResult, QuerySet querySet,
-            QuerySetResult querySetResult) throws InterruptedException {
+            QuerySetResult querySetResult, Scenario scenario) throws InterruptedException {
         for (int cr = querySet.getMinConcurrency(); cr <= querySet.getMaxConcurrency(); cr++) {
             List<Future> threads = new ArrayList<>();
             for (int i = 0; i < cr; i++) {
@@ -235,7 +228,7 @@ public class QueryExecutor implements Workload {
                     Runnable
                             thread =
                             executeRunner((i + 1) + "," + cr, dataModelResult, queryResult,
-                                    querySetResult);
+                                    querySetResult, scenario);
                     threads.add(workloadExecutor.getPool().submit(thread));
                 }
 
@@ -257,10 +250,11 @@ public class QueryExecutor implements Workload {
      * @param dataModelResult
      * @param queryResult
      * @param querySet
+     * @param scenario 
      * @return
      */
     protected Runnable executeRunner(String name, DataModelResult dataModelResult,
-            QueryResult queryResult, QuerySet querySet) {
+            QueryResult queryResult, QuerySet querySet, Scenario scenario) {
         ThreadTime threadTime = new ThreadTime();
         queryResult.getThreadTimes().add(threadTime);
         threadTime.setThreadName(name);
@@ -271,7 +265,7 @@ public class QueryExecutor implements Workload {
             thread =
                     new MultiThreadedRunner(threadTime.getThreadName(), queryResult,
                             dataModelResult, threadTime, querySet.getNumberOfExecutions(),
-                            querySet.getExecutionDurationInMs(), writeRuntimeResults);
+                            querySet.getExecutionDurationInMs(), writeRuntimeResults, ruleApplier, scenario, workloadExecutor, parser);
         } else {
             thread =
                     new MultithreadedDiffer(threadTime.getThreadName(), queryResult, threadTime,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
index 3574761..4023383 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
@@ -19,10 +19,12 @@
 package org.apache.phoenix.pherf.workload;
 
 import java.math.BigDecimal;
+import java.sql.Array;
 import java.sql.Connection;
 import java.sql.Date;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.sql.Timestamp;
 import java.sql.Types;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -33,6 +35,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.pherf.PherfConstants;
 import org.apache.phoenix.pherf.PherfConstants.GeneratePhoenixStats;
 import org.apache.phoenix.pherf.configuration.Column;
@@ -108,21 +111,27 @@ public class WriteWorkload implements Workload {
         this.rulesApplier = new RulesApplier(parser);
         this.resultUtil = new ResultUtil();
         this.generateStatistics = generateStatistics;
-
+        int size = Integer.parseInt(properties.getProperty("pherf.default.dataloader.threadpool"));
+        
         // Overwrite defaults properties with those given in the configuration. This indicates the
         // scenario is a R/W mixed workload.
         if (scenario != null) {
             this.scenario = scenario;
             writeParams = scenario.getWriteParams();
-            threadSleepDuration = writeParams.getThreadSleepDuration();
+            if (writeParams != null) {
+            	threadSleepDuration = writeParams.getThreadSleepDuration();
+            	size = writeParams.getWriterThreadCount();
+            }
+            else {
+            	threadSleepDuration = 0;
+            }
+            	
         } else {
             writeParams = null;
             this.scenario = null;
             threadSleepDuration = 0;
         }
 
-        int size = Integer.parseInt(properties.getProperty("pherf.default.dataloader.threadpool"));
-
         // Should addBatch/executeBatch be used? Default: false
         this.useBatchApi = Boolean.getBoolean(USE_BATCH_API_PROPERTY);
 
@@ -381,17 +390,50 @@ public class WriteWorkload implements Workload {
                     statement.setInt(count, Integer.parseInt(dataValue.getValue()));
                 }
                 break;
+            case UNSIGNED_LONG:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.LONGVARCHAR);
+                } else {
+                    statement.setLong(count, Long.parseLong(dataValue.getValue()));
+                }
+                break;
             case DATE:
                 if (dataValue.getValue().equals("")) {
                     statement.setNull(count, Types.DATE);
                 } else {
                     Date
                             date =
-                            new java.sql.Date(
-                                    simpleDateFormat.parse(dataValue.getValue()).getTime());
+                            new java.sql.Date(simpleDateFormat.parse(dataValue.getValue()).getTime());
                     statement.setDate(count, date);
                 }
                 break;
+            case VARCHAR_ARRAY:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.ARRAY);
+                } else {
+                    Array
+                            arr =
+                            statement.getConnection().createArrayOf("VARCHAR", dataValue.getValue().split(","));
+                    statement.setArray(count, arr);
+                }
+            	break;
+            case VARBINARY:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.VARBINARY);
+                } else {
+                    statement.setBytes(count, dataValue.getValue().getBytes());
+                }
+                break;
+            case TIMESTAMP:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.TIMESTAMP);
+                } else {
+                    java.sql.Timestamp
+                            ts =
+                            new java.sql.Timestamp(simpleDateFormat.parse(dataValue.getValue()).getTime());
+                    statement.setTimestamp(count, ts);
+                }
+                break;
             default:
                 break;
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
index e538ac2..1c32b75 100644
--- a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
+++ b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
@@ -349,8 +349,6 @@
 
         </scenario>
         <scenario tableName="PHERF.PHERF_PROD_TEST_UNSALTED" rowCount="10">
-            <!-- Scenario level rule overrides will be unsupported in V1.
-                    You can use the general datamappings in the mean time-->
             <dataOverride>
                 <column>
                     <type>VARCHAR</type>
@@ -370,8 +368,20 @@
                     <name>TENANT_ID</name>
                 </column>
             </dataOverride>
+
+            <!--  Pre and post scenario indexes -->
+            <preScenarioDdls>
+                <ddl>CREATE INDEX IDX_DIVISION ON PHERF.PHERF_PROD_TEST_UNSALTED (DIVISION)</ddl>
+            </preScenarioDdls>
+
+            <postScenarioDdls>
+                <ddl>CREATE INDEX IDX_OLDVAL_STRING ON PHERF.PHERF_PROD_TEST_UNSALTED (OLDVAL_STRING)</ddl>
+                <ddl>CREATE INDEX IDX_CONNECTION_ID ON PHERF.PHERF_PROD_TEST_UNSALTED (CONNECTION_ID)</ddl>
+            </postScenarioDdls>
+
             <!--Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first -->
             <querySet concurrency="1" executionType="PARALLEL" executionDurationInMs="60000" numberOfExecutions="100">
+                <query statement="select count(*) from PHERF.PHERF_PROD_TEST_UNSALTED WHERE TENANT_ID=[TENANT_ID] AND TENANT_ID=[TENANT_ID]"/>
                 <!--  Aggregate queries on a per tenant basis -->
                 <query tenantId="00Dxx0000001gER"
                        ddl="CREATE VIEW IF NOT EXISTS PHERF.PHERF_TEST_VIEW_UNSALTED AS SELECT * FROM PHERF.PHERF_PROD_TEST_UNSALTED"

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ColumnTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ColumnTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ColumnTest.java
index e573c07..35e8754 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ColumnTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ColumnTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.phoenix.pherf;
 
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import org.apache.phoenix.pherf.configuration.Column;
 import org.apache.phoenix.pherf.configuration.DataTypeMapping;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
index 1f1006d..5afde69 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
@@ -71,8 +71,8 @@ public class ConfigurationParserTest extends ResultBaseTest {
             assertTrue("Could not load the data columns from xml.",
                     (dataMappingColumns != null) && (dataMappingColumns.size() > 0));
             assertTrue("Could not load the data DataValue list from xml.",
-                    (dataMappingColumns.get(6).getDataValues() != null)
-                            && (dataMappingColumns.get(6).getDataValues().size() > 0));
+                    (dataMappingColumns.get(8).getDataValues() != null)
+                            && (dataMappingColumns.get(8).getDataValues().size() > 0));
 
             assertDateValue(dataMappingColumns);
             assertCurrentDateValue(dataMappingColumns);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
index 228cd58..f4b0e5c 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/RuleGeneratorTest.java
@@ -24,8 +24,10 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
@@ -245,6 +247,87 @@ public class RuleGeneratorTest {
                 testSet.size() == (threadCount * increments));
     }
 
+	@Test
+    public void testTimestampRule() throws Exception {
+    	SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    	SimpleDateFormat df = new SimpleDateFormat("yyyy");
+        XMLConfigParser parser = new XMLConfigParser(matcherScenario);
+        WriteWorkload loader = new WriteWorkload(parser);
+        RulesApplier rulesApplier = loader.getRulesApplier();
+        Scenario scenario = parser.getScenarios().get(0);
+
+        Column simPhxCol = new Column();
+        simPhxCol.setName("TS_DATE");
+        simPhxCol.setType(DataTypeMapping.TIMESTAMP);
+
+        // Run this 10 times gives a reasonable chance that all the values will appear at least once
+        for (int i = 0; i < 10; i++) {
+            DataValue value = rulesApplier.getDataForRule(scenario, simPhxCol);
+            Date dt = simpleDateFormat.parse(value.getValue());
+            int year = Integer.parseInt(df.format(dt));
+            assertTrue("Got unexpected TS value" + value.getValue(), year >= 2020 && year <= 2025);
+        }
+    }
+	
+    @Test
+    public void testVarcharArray() throws Exception {
+
+        XMLConfigParser parser = new XMLConfigParser(matcherScenario);
+        WriteWorkload loader = new WriteWorkload(parser);
+        RulesApplier rulesApplier = loader.getRulesApplier();
+        
+        // Run this 15 times gives a reasonable chance that all the values will appear at least once
+        for (int i = 0; i < 15; i++) {
+        	Column c = rulesApplier.getRule("VAR_ARRAY");
+            DataValue value = rulesApplier.getDataValue(c);
+            assertTrue("Got a value not in the list for the rule. :" + value.getValue(), value.getValue().equals("Foo,Bar"));
+        }
+    }	
+
+    @Test
+    public void testVarBinary() throws Exception {
+        List<String> expectedValues = new ArrayList();
+        for (int i=0; i<10; i++) {
+            expectedValues.add("VBOxx00" + i);
+        }
+
+        XMLConfigParser parser = new XMLConfigParser(matcherScenario);
+        WriteWorkload loader = new WriteWorkload(parser);
+        RulesApplier rulesApplier = loader.getRulesApplier();
+        
+        for (int i = 0; i < 5; i++) {
+        	Column c = rulesApplier.getRule("VAR_BIN");
+            DataValue value = rulesApplier.getDataValue(c);
+            System.out.println(value.getValue());
+            assertTrue("Got a value not in the list for the rule. :" + value.getValue(), expectedValues.contains(value.getValue()));
+        }
+    }    
+
+    @Test
+    public void testPrefixSequence() throws Exception {
+        List<String> expectedValues = new ArrayList();
+        expectedValues.add("0F90000000000X0");
+        expectedValues.add("0F90000000000X1");
+        expectedValues.add("0F90000000000X2");
+        expectedValues.add("0F90000000000X3");
+        expectedValues.add("0F90000000000X4");
+        expectedValues.add("0F90000000000X5");
+        expectedValues.add("0F90000000000X6");
+        expectedValues.add("0F90000000000X7");
+        expectedValues.add("0F90000000000X8");
+        expectedValues.add("0F90000000000X9");
+
+        XMLConfigParser parser = new XMLConfigParser(matcherScenario);
+        WriteWorkload loader = new WriteWorkload(parser);
+        RulesApplier rulesApplier = loader.getRulesApplier();
+        
+        // Run this 15 times gives a reasonable chance that all the values will appear at least once
+        for (int i = 0; i < 15; i++) {
+            DataValue value = rulesApplier.getDataValue(rulesApplier.getRule("NEWVAL_STRING"));
+            assertTrue("Got a value not in the list for the rule. :" + value.getValue(), expectedValues.contains(value.getValue()));
+        }
+    }
+	
     @Test
     public void testValueListRule() throws Exception {
         List<String> expectedValues = new ArrayList();
@@ -301,12 +384,30 @@ public class RuleGeneratorTest {
         assertEquals("Did not find the matching rule type.", rule.getType(), simPhxCol.getType());
         assertEquals("Rule contains incorrect length.", rule.getLength(), 10);
         assertEquals("Rule contains incorrect prefix.", rule.getPrefix(), "MYPRFX");
-
+        
         value = rulesApplier.getDataForRule(scenario, simPhxCol);
-        assertEquals("Value returned does not match rule.", value.getValue().length(), 10);
-        assertTrue("Value returned start with prefix.",
+        assertEquals("Value returned does not match rule.", 10, value.getValue().length());
+        assertTrue("Value returned start with prefix. " + value.getValue(),
                 StringUtils.startsWith(value.getValue(), rule.getPrefix()));
+        
+    }
+    
+    
+    @Test
+    public void testScenarioLevelRuleOverride() throws Exception {
+        XMLConfigParser parser = new XMLConfigParser(matcherScenario);
+        WriteWorkload loader = new WriteWorkload(parser);
+        RulesApplier rulesApplier = loader.getRulesApplier();
+        Scenario scenario = parser.getScenarios().get(0);
+        
+        // Test scenario level overridden rule
+    	Column simPhxCol = new Column();
+        simPhxCol.setName("FIELD");
+        simPhxCol.setType(DataTypeMapping.VARCHAR);
+        DataValue value = rulesApplier.getDataForRule(scenario, simPhxCol);
+        assertEquals("Override rule should contain field length of 5", 5, value.getValue().length());
     }
+    
 
     /**
      * Asserts that the value field is between the min/max value fields

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/test/resources/datamodel/test_schema.sql
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/resources/datamodel/test_schema.sql b/phoenix-pherf/src/test/resources/datamodel/test_schema.sql
index 21034d9..fa9952b 100644
--- a/phoenix-pherf/src/test/resources/datamodel/test_schema.sql
+++ b/phoenix-pherf/src/test/resources/datamodel/test_schema.sql
@@ -20,9 +20,13 @@ CREATE TABLE IF NOT EXISTS PHERF.TEST_TABLE (
     PARENT_ID CHAR(15) NOT NULL,
     CREATED_DATE DATE NOT NULL,
     NOW_DATE DATE,
+    TS_DATE TIMESTAMP,
     PRESENT_DATE DATE,
     OTHER_ID CHAR(15),
     FIELD VARCHAR,
+    VAR_ARRAY VARCHAR ARRAY,
+    VAR_BIN VARBINARY,
+    DIVISION INTEGER,
     OLDVAL_STRING VARCHAR,
     NEWVAL_STRING VARCHAR,
     SOME_INT INTEGER

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907e9093/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
----------------------------------------------------------------------
diff --git a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
index e2915ba..99ce81b 100644
--- a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
+++ b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
@@ -33,6 +33,18 @@
             <name>GENERAL_CHAR</name>
         </column>
         <column>
+            <type>TIMESTAMP</type>
+            <!--SEQUENTIAL is unsupported for DATE -->
+            <dataSequence>RANDOM</dataSequence>
+            <!-- Number [0-100] that represents the probability of creating a null value -->
+            <!-- The higher the number, the more like the value will returned will be null -->
+            <!-- Leaving this tag out is equivalent to having a 0 probability. i.e. never null -->
+            <nullChance>0</nullChance>
+            <minValue>2020</minValue>
+            <maxValue>2025</maxValue>
+            <name>GENERAL_TIMESTAMP</name>
+        </column>
+        <column>
             <type>DATE</type>
             <!--SEQUENTIAL is unsupported for DATE -->
             <dataSequence>RANDOM</dataSequence>
@@ -150,9 +162,22 @@
             <type>VARCHAR</type>
             <length>15</length>
             <userDefined>true</userDefined>
-            <dataSequence>RANDOM</dataSequence>
+            <dataSequence>SEQUENTIAL</dataSequence>
             <name>NEWVAL_STRING</name>
-            <prefix>TSTPRFX</prefix>
+            <prefix>0F90000000000X</prefix>
+        </column>
+        <column>
+            <type>VARCHAR_ARRAY</type>
+            <userDefined>true</userDefined>
+            <name>VAR_ARRAY</name>
+            <valuelist>
+                <datavalue>
+                    <value>Foo</value>
+                </datavalue>
+                <datavalue>
+                    <value>Bar</value>
+                </datavalue>
+            </valuelist>
         </column>
         <column>
             <type>CHAR</type>
@@ -181,6 +206,21 @@
             <name>OTHER_ID</name>
             <prefix>z0Oxx00</prefix>
         </column>
+        <column>
+            <type>VARBINARY</type>
+            <userDefined>true</userDefined>
+            <dataSequence>SEQUENTIAL</dataSequence>
+            <length>8</length>
+            <name>VAR_BIN</name>
+            <prefix>VBOxx00</prefix>
+        </column>
+        <column>
+           <type>VARCHAR</type>
+           <userDefined>true</userDefined>
+           <dataSequence>SEQUENTIAL</dataSequence>
+           <length>1</length>
+           <name>FIELD</name>
+       </column>
     </datamapping>
     <scenarios>
         <scenario tableName="PHERF.TEST_TABLE" rowCount="100" name="testScenarioRW">
@@ -191,7 +231,7 @@
                     <type>VARCHAR</type>
                     <userDefined>true</userDefined>
                     <dataSequence>RANDOM</dataSequence>
-                    <length>10</length>
+                    <length>5</length>
                     <name>FIELD</name>
                 </column>
             </dataOverride>
@@ -219,7 +259,7 @@
             </writeParams>
             <querySet concurrency="1" executionType="PARALLEL" executionDurationInMs="10000">
                 <query id="q3" statement="select count(*) from PHERF.TEST_TABLE"/>
-                <query id="q4" statement="select sum(DIVISION) from PHERF.TEST_TABLE"/>
+                <query id="q4" statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
             </querySet>
 
         </scenario>