You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ya...@apache.org on 2021/03/26 00:29:04 UTC

[phoenix] branch 4.x updated: PHOENIX-6118: Multi Tenant Workloads using PHERF

This is an automated email from the ASF dual-hosted git repository.

yanxinyi pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 3535708  PHOENIX-6118: Multi Tenant Workloads using PHERF
3535708 is described below

commit 35357084a2e7cef1f97b654b1a09219b5fb82268
Author: jpisaac <ja...@gmail.com>
AuthorDate: Thu Mar 25 17:28:55 2021 -0700

    PHOENIX-6118: Multi Tenant Workloads using PHERF
---
 phoenix-pherf/pom.xml                              |  20 +-
 .../java/org/apache/phoenix/pherf/PherfMainIT.java |   1 +
 .../org/apache/phoenix/pherf/ResultBaseTestIT.java |   7 +-
 .../org/apache/phoenix/pherf/SchemaReaderIT.java   |   2 +-
 .../MultiTenantOperationBaseIT.java                |  79 +++++
 .../mt/tenantoperation/TenantOperationIT.java      | 112 +++++++
 .../tenantoperation/TenantOperationWorkloadIT.java | 166 +++++++++++
 .../datamodel/create_prod_test_unsalted.sql        |   0
 .../src/{main => it}/resources/hbase-site.xml      |   0
 .../scenario/prod_test_unsalted_scenario.xml       |   0
 .../main/java/org/apache/phoenix/pherf/Pherf.java  |  42 ++-
 .../pherf/configuration/DataTypeMapping.java       |   2 +
 .../phoenix/pherf/configuration/IdleTime.java      |  47 +++
 .../phoenix/pherf/configuration/LoadProfile.java   | 113 +++++++
 .../pherf/configuration/OperationGroup.java        |  44 +++
 .../apache/phoenix/pherf/configuration/Query.java  |  54 ++--
 .../phoenix/pherf/configuration/Scenario.java      |  52 +++-
 .../phoenix/pherf/configuration/TenantGroup.java   |  52 ++++
 .../apache/phoenix/pherf/configuration/Upsert.java | 135 +++++++++
 .../phoenix/pherf/configuration/UserDefined.java   |  55 ++++
 .../apache/phoenix/pherf/rules/RulesApplier.java   |  51 +++-
 .../rules/SequentialIntegerDataGenerator.java      |   2 +-
 .../org/apache/phoenix/pherf/util/PhoenixUtil.java | 183 +++++++++++-
 .../apache/phoenix/pherf/util/ResourceList.java    |   7 +-
 .../pherf/workload/MultiThreadedRunner.java        |   4 +-
 .../pherf/workload/MultithreadedDiffer.java        |   2 +-
 .../phoenix/pherf/workload/WorkloadExecutor.java   |   2 +-
 .../phoenix/pherf/workload/WriteWorkload.java      | 131 +--------
 .../phoenix/pherf/workload/mt/EventGenerator.java  |  30 ++
 .../pherf/workload/mt/IdleTimeOperation.java       |  29 ++
 .../pherf/workload/mt/MultiTenantWorkload.java     |  46 +++
 .../phoenix/pherf/workload/mt/Operation.java       |  31 ++
 .../phoenix/pherf/workload/mt/OperationStats.java  | 119 ++++++++
 .../pherf/workload/mt/PreScenarioOperation.java    |  31 ++
 .../phoenix/pherf/workload/mt/QueryOperation.java  |  29 ++
 .../phoenix/pherf/workload/mt/UpsertOperation.java |  29 ++
 .../pherf/workload/mt/UserDefinedOperation.java    |  29 ++
 .../mt/tenantoperation/BaseOperationSupplier.java  |  48 +++
 .../tenantoperation/IdleTimeOperationSupplier.java |  78 +++++
 .../PreScenarioOperationSupplier.java              |  84 ++++++
 .../mt/tenantoperation/QueryOperationSupplier.java |  91 ++++++
 .../TenantOperationEventGenerator.java             | 153 ++++++++++
 .../mt/tenantoperation/TenantOperationFactory.java | 323 +++++++++++++++++++++
 .../mt/tenantoperation/TenantOperationInfo.java    |  70 +++++
 .../TenantOperationWorkHandler.java                |  75 +++++
 .../tenantoperation/TenantOperationWorkload.java   | 192 ++++++++++++
 .../tenantoperation/UpsertOperationSupplier.java   | 140 +++++++++
 .../UserDefinedOperationSupplier.java              |  50 ++++
 .../phoenix/pherf/ConfigurationParserTest.java     |  62 +++-
 .../org/apache/phoenix/pherf/ResultBaseTest.java   |  11 +-
 .../TenantOperationEventGeneratorTest.java         | 127 ++++++++
 .../TenantOperationFactoryTest.java                | 116 ++++++++
 .../src/test/resources/datamodel/test_schema.sql   |   1 +
 .../{test_schema.sql => test_schema_mt_view.sql}   |  26 +-
 .../src/test/resources/scenario/test_evt_gen1.xml  | 184 ++++++++++++
 .../test/resources/scenario/test_mt_workload.xml   | 135 +++++++++
 .../src/test/resources/scenario/test_scenario.xml  |   2 +-
 ...rio.xml => test_workload_with_load_profile.xml} | 246 ++++++++--------
 58 files changed, 3617 insertions(+), 335 deletions(-)

diff --git a/phoenix-pherf/pom.xml b/phoenix-pherf/pom.xml
index 32f3e67..3157599 100644
--- a/phoenix-pherf/pom.xml
+++ b/phoenix-pherf/pom.xml
@@ -73,11 +73,15 @@
 			<artifactId>commons-math3</artifactId>
 			<version>3.3</version>
 		</dependency>
-    <dependency>
-      <groupId>org.apache.phoenix.thirdparty</groupId>
-      <artifactId>phoenix-shaded-commons-cli</artifactId>
-    </dependency>
-
+		<dependency>
+			<groupId>com.google.code.gson</groupId>
+			<artifactId>gson</artifactId>
+			<version>2.8.6</version>
+		</dependency>
+		<dependency>
+		  <groupId>org.apache.phoenix.thirdparty</groupId>
+		  <artifactId>phoenix-shaded-commons-cli</artifactId>
+		</dependency>
 		<!-- Test Dependencies -->
 		<dependency>
 			<groupId>junit</groupId>
@@ -133,7 +137,7 @@
 	<build>
 		<resources>
 			<resource>
-				<directory>src/main/resources</directory>
+				<directory>src/it/resources</directory>
 			</resource>
 			<resource>
 				<directory>config</directory>
@@ -221,7 +225,7 @@
 						<artifactSet>
 							<includes>
 								<include>org.apache.phoenix:phoenix-pherf</include>
-                                <include>com.google.guava:guava</include>
+								<include>org.apache.phoenix.thirdparty:phoenix-shaded-guava</include>
 								<include>com.googlecode.java-diff-utils:diffutils</include>
 								<include>org.apache.commons:commons-lang3</include>
 								<include>org.apache.commons:commons-math3</include>
@@ -230,6 +234,8 @@
 								<include>org.apache.commons:commons-csv</include>
 								<include>commons-lang:commons-lang</include>
 								<include>commons-io:commons-io</include>
+								<include>com.google.code.gson:gson</include>
+								<include>com.lmax:disruptor</include>
 							</includes>
 						</artifactSet>
 						<filters>
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
index 57aaae1..c71fda5 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
@@ -23,6 +23,7 @@ import org.apache.phoenix.pherf.result.Result;
 import org.apache.phoenix.pherf.result.ResultValue;
 import org.apache.phoenix.pherf.result.file.ResultFileDetails;
 import org.apache.phoenix.pherf.result.impl.CSVFileResultHandler;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.contrib.java.lang.system.ExpectedSystemExit;
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java
index fe1f2ea..75d6f6b 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java
@@ -25,6 +25,7 @@ import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.pherf.configuration.XMLConfigParser;
 import org.apache.phoenix.pherf.result.ResultUtil;
 import org.apache.phoenix.pherf.schema.SchemaReader;
@@ -37,9 +38,9 @@ import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
 
 @Category(NeedsOwnMiniClusterTest.class)
-public class ResultBaseTestIT extends BaseTest {
-    protected static final String matcherScenario = ".*scenario/.*test.*xml";
-    protected static final String matcherSchema = ".*datamodel/.*test.*sql";
+public class ResultBaseTestIT extends ParallelStatsDisabledIT {
+    protected static final String matcherScenario = ".*scenario/.*test_scenario.*xml";
+    protected static final String matcherSchema = ".*datamodel/.*test_schema.*sql";
 
     protected static PhoenixUtil util = PhoenixUtil.create(true);
     protected static Properties properties;
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java
index 901c92f..c5a93fe 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java
@@ -78,7 +78,7 @@ public class SchemaReaderIT extends BaseTest {
     private void assertApplySchemaTest() {
         try {
             util.setZookeeper("localhost");
-            SchemaReader reader = new SchemaReader(util, ".*datamodel/.*test.*sql");
+            SchemaReader reader = new SchemaReader(util, ".*datamodel/.*test_schema.*sql");
 
             List<Path> resources = new ArrayList<>(reader.getResourceList());
             assertTrue("Could not pull list of schema files.", resources.size() > 0);
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/MultiTenantOperationBaseIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/MultiTenantOperationBaseIT.java
new file mode 100644
index 0000000..bcdbdca
--- /dev/null
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/MultiTenantOperationBaseIT.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.pherf.PherfConstants;
+import org.apache.phoenix.pherf.XMLConfigParserTest;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.XMLConfigParser;
+import org.apache.phoenix.pherf.schema.SchemaReader;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.junit.BeforeClass;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class MultiTenantOperationBaseIT extends ParallelStatsDisabledIT {
+    enum TestOperationGroup {
+        upsertOp, queryOp1, queryOp2, idleOp, udfOp
+    }
+
+    static enum  TestTenantGroup {
+        tg1, tg2, tg3
+    }
+    protected static final String matcherScenario = ".*scenario/.*test_mt_workload.*xml";
+    protected static final String matcherSchema = ".*datamodel/.*test_schema_mt*.*sql";
+
+    protected static PhoenixUtil util = PhoenixUtil.create(true);
+    protected static Properties properties;
+    protected static SchemaReader reader;
+    protected static XMLConfigParser parser;
+    protected static List<Path> resources;
+
+    @BeforeClass public static synchronized void setUp() throws Exception {
+        PherfConstants constants = PherfConstants.create();
+        properties = constants.getProperties(PherfConstants.PHERF_PROPERTIES, false);
+
+        PhoenixUtil.setZookeeper("localhost");
+        reader = new SchemaReader(util, matcherSchema);
+        parser = new XMLConfigParser(matcherScenario);
+        reader.applySchema();
+        resources = new ArrayList<>(reader.getResourceList());
+
+        assertTrue("Could not pull list of schema files.", resources.size() > 0);
+        assertNotNull("Could not read schema file.", reader.resourceToString(resources.get(0)));
+
+    }
+
+    public DataModel readTestDataModel(String resourceName) throws Exception {
+        URL scenarioUrl = XMLConfigParserTest.class.getResource(resourceName);
+        assertNotNull(scenarioUrl);
+        Path p = Paths.get(scenarioUrl.toURI());
+        return XMLConfigParser.readDataModel(p);
+    }
+
+}
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationIT.java
new file mode 100644
index 0000000..737080a
--- /dev/null
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationIT.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.LoadProfile;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.mt.Operation;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.thirdparty.com.google.common.base.Supplier;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests focused on tenant operations and their validations
+ */
+public class TenantOperationIT extends MultiTenantOperationBaseIT {
+    private static final Logger LOGGER = LoggerFactory.getLogger(TenantOperationIT.class);
+
+    @Test
+    public void testVariousOperations() throws Exception {
+        int numTenantGroups = 3;
+        int numOpGroups = 5;
+        int numRuns = 10;
+        int numOperations = 10;
+
+        PhoenixUtil pUtil = PhoenixUtil.create();
+        DataModel model = readTestDataModel("/scenario/test_mt_workload.xml");
+        for (Scenario scenario : model.getScenarios()) {
+            LOGGER.debug(String.format("Testing %s", scenario.getName()));
+            LoadProfile loadProfile = scenario.getLoadProfile();
+            assertEquals("tenant group size is not as expected: ",
+                    numTenantGroups, loadProfile.getTenantDistribution().size());
+            assertEquals("operation group size is not as expected: ",
+                    numOpGroups, loadProfile.getOpDistribution().size());
+
+            TenantOperationFactory opFactory = new TenantOperationFactory(pUtil, model, scenario);
+            TenantOperationEventGenerator evtGen = new TenantOperationEventGenerator(
+                    opFactory.getOperations(), model, scenario);
+
+            assertEquals("operation group size from the factory is not as expected: ",
+                    numOpGroups, opFactory.getOperations().size());
+
+            int numRowsInserted = 0;
+            for (int i = 0; i < numRuns; i++) {
+                int ops = numOperations;
+                loadProfile.setNumOperations(ops);
+                while (ops-- > 0) {
+                    TenantOperationInfo info = evtGen.next();
+                    Supplier<Function<TenantOperationInfo, OperationStats>> opSupplier =
+                            opFactory.getOperationSupplier(info);
+                    OperationStats stats = opSupplier.get().apply(info);
+                    LOGGER.info(pUtil.getGSON().toJson(stats));
+                    if (info.getOperation().getType() == Operation.OperationType.PRE_RUN) continue;
+                    switch (TestOperationGroup.valueOf(info.getOperationGroupId())) {
+                    case upsertOp:
+                        assertTrue(opSupplier.getClass()
+                                .isAssignableFrom(UpsertOperationSupplier.class));
+                        numRowsInserted += stats.getRowCount();
+                        break;
+                    case queryOp1:
+                    case queryOp2:
+                        assertTrue(opFactory.getOperationSupplier(info).getClass()
+                                .isAssignableFrom(QueryOperationSupplier.class));
+
+                        // expected row count == num rows inserted
+                        assertEquals(numRowsInserted, stats.getRowCount());
+                        break;
+                    case idleOp:
+                        assertTrue(opFactory.getOperationSupplier(info).getClass()
+                                .isAssignableFrom(IdleTimeOperationSupplier.class));
+                        assertEquals(0, stats.getRowCount());
+                        // expected think time (no-op) to be ~50ms
+                        assertTrue(40 < stats.getDurationInMs() && stats.getDurationInMs() < 60);
+                        break;
+                    case udfOp:
+                        assertTrue(opFactory.getOperationSupplier(info).getClass()
+                                .isAssignableFrom(UserDefinedOperationSupplier.class));
+                        assertEquals(0, stats.getRowCount());
+                        break;
+                    default:
+                        Assert.fail();
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkloadIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkloadIT.java
new file mode 100644
index 0000000..c6d4dfd
--- /dev/null
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkloadIT.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import com.clearspring.analytics.util.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.thirdparty.com.google.common.base.Supplier;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import com.lmax.disruptor.LifecycleAware;
+import com.lmax.disruptor.WorkHandler;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.Workload;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.apache.phoenix.pherf.workload.mt.tenantoperation.TenantOperationWorkload.TenantOperationEvent;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests focused on tenant operation workloads {@link TenantOperationWorkload}
+ * and workload handlers {@link WorkHandler}
+ */
+public class TenantOperationWorkloadIT extends MultiTenantOperationBaseIT {
+
+    private static class EventCountingWorkHandler implements
+            WorkHandler<TenantOperationEvent>, LifecycleAware {
+        private final String handlerId;
+        private final TenantOperationFactory tenantOperationFactory;
+        private static final Logger LOGGER = LoggerFactory.getLogger(EventCountingWorkHandler.class);
+        private final Map<String, CountDownLatch> latches;
+        public EventCountingWorkHandler(TenantOperationFactory tenantOperationFactory,
+                String handlerId, Map<String, CountDownLatch> latches) {
+            this.handlerId = handlerId;
+            this.tenantOperationFactory = tenantOperationFactory;
+            this.latches = latches;
+        }
+
+        @Override public void onStart() {}
+
+        @Override public void onShutdown() {}
+
+        @Override public void onEvent(TenantOperationEvent event)
+                throws Exception {
+            TenantOperationInfo input = event.getTenantOperationInfo();
+            Supplier<Function<TenantOperationInfo, OperationStats>>
+                    opSupplier = tenantOperationFactory.getOperationSupplier(input);
+            OperationStats stats = opSupplier.get().apply(input);
+            LOGGER.info(tenantOperationFactory.getPhoenixUtil().getGSON().toJson(stats));
+            assertEquals(0, stats.getStatus());
+            latches.get(handlerId).countDown();
+        }
+    }
+
+    @Test
+    public void testWorkloadWithOneHandler() throws Exception {
+        int numOpGroups = 5;
+        int numHandlers = 1;
+        int totalOperations = 50;
+        int perHandlerCount = 50;
+
+        ExecutorService executor = null;
+        try {
+            executor = Executors.newFixedThreadPool(numHandlers);
+            PhoenixUtil pUtil = PhoenixUtil.create();
+            DataModel model = readTestDataModel("/scenario/test_mt_workload.xml");
+            for (Scenario scenario : model.getScenarios()) {
+                // Set the total number of operations for this load profile
+                scenario.getLoadProfile().setNumOperations(totalOperations);
+                TenantOperationFactory opFactory = new TenantOperationFactory(pUtil, model, scenario);
+                assertEquals("operation group size from the factory is not as expected: ",
+                        numOpGroups, opFactory.getOperations().size());
+
+                // populate the handlers and countdown latches.
+                String handlerId = String.format("%s.%d", InetAddress.getLocalHost().getHostName(), numHandlers);
+                List<WorkHandler> workers = Lists.newArrayList();
+                Map<String, CountDownLatch> latches = Maps.newConcurrentMap();
+                workers.add(new EventCountingWorkHandler(opFactory, handlerId, latches));
+                latches.put(handlerId, new CountDownLatch(perHandlerCount));
+                // submit the workload
+                Workload workload = new TenantOperationWorkload(pUtil, model, scenario, workers, properties);
+                Future status = executor.submit(workload.execute());
+                // Just make sure there are no exceptions
+                status.get();
+
+                // Wait for the handlers to count down
+                for (Map.Entry<String, CountDownLatch> latch : latches.entrySet()) {
+                    assertTrue(latch.getValue().await(60, TimeUnit.SECONDS));
+                }
+            }
+        } finally {
+            if (executor != null) {
+                executor.shutdown();
+            }
+        }
+    }
+
+    @Test
+    public void testWorkloadWithManyHandlers() throws Exception {
+        int numOpGroups = 5;
+        int numHandlers = 5;
+        int totalOperations = 500;
+        int perHandlerCount = 50;
+
+        ExecutorService executor = Executors.newFixedThreadPool(numHandlers);
+        PhoenixUtil pUtil = PhoenixUtil.create();
+        DataModel model = readTestDataModel("/scenario/test_mt_workload.xml");
+        for (Scenario scenario : model.getScenarios()) {
+            // Set the total number of operations for this load profile
+            scenario.getLoadProfile().setNumOperations(totalOperations);
+            TenantOperationFactory opFactory = new TenantOperationFactory(pUtil, model, scenario);
+            assertEquals("operation group size from the factory is not as expected: ",
+                    numOpGroups, opFactory.getOperations().size());
+
+            // populate the handlers and countdown latches.
+            List<WorkHandler> workers = Lists.newArrayList();
+            Map<String, CountDownLatch> latches = Maps.newConcurrentMap();
+            for (int i=0;i<numHandlers;i++) {
+                String handlerId = String.format("%s.%d", InetAddress.getLocalHost().getHostName(), i);
+                workers.add(new EventCountingWorkHandler(opFactory, handlerId, latches));
+                latches.put(handlerId, new CountDownLatch(perHandlerCount));
+            }
+            // submit the workload
+            Workload workload = new TenantOperationWorkload(pUtil, model, scenario, workers, properties);
+            Future status = executor.submit(workload.execute());
+            // Just make sure there are no exceptions
+            status.get();
+            // Wait for the handlers to count down
+            for (Map.Entry<String, CountDownLatch> latch : latches.entrySet()) {
+                assertTrue(latch.getValue().await(60, TimeUnit.SECONDS));
+            }
+        }
+        executor.shutdown();
+    }
+
+}
diff --git a/phoenix-pherf/src/main/resources/datamodel/create_prod_test_unsalted.sql b/phoenix-pherf/src/it/resources/datamodel/create_prod_test_unsalted.sql
similarity index 100%
rename from phoenix-pherf/src/main/resources/datamodel/create_prod_test_unsalted.sql
rename to phoenix-pherf/src/it/resources/datamodel/create_prod_test_unsalted.sql
diff --git a/phoenix-pherf/src/main/resources/hbase-site.xml b/phoenix-pherf/src/it/resources/hbase-site.xml
similarity index 100%
rename from phoenix-pherf/src/main/resources/hbase-site.xml
rename to phoenix-pherf/src/it/resources/hbase-site.xml
diff --git a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml b/phoenix-pherf/src/it/resources/scenario/prod_test_unsalted_scenario.xml
similarity index 100%
rename from phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
rename to phoenix-pherf/src/it/resources/scenario/prod_test_unsalted_scenario.xml
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
index cae3213..c042689 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
@@ -24,7 +24,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
 import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
 import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
@@ -33,6 +34,8 @@ import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
 import org.apache.phoenix.thirdparty.org.apache.commons.cli.PosixParser;
 import org.apache.phoenix.pherf.PherfConstants.CompareType;
 import org.apache.phoenix.pherf.PherfConstants.GeneratePhoenixStats;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Scenario;
 import org.apache.phoenix.pherf.configuration.XMLConfigParser;
 import org.apache.phoenix.pherf.jmx.MonitorManager;
 import org.apache.phoenix.pherf.result.ResultUtil;
@@ -40,6 +43,7 @@ import org.apache.phoenix.pherf.schema.SchemaReader;
 import org.apache.phoenix.pherf.util.GoogleChartGenerator;
 import org.apache.phoenix.pherf.util.PhoenixUtil;
 import org.apache.phoenix.pherf.util.ResourceList;
+import org.apache.phoenix.pherf.workload.mt.tenantoperation.TenantOperationWorkload;
 import org.apache.phoenix.pherf.workload.QueryExecutor;
 import org.apache.phoenix.pherf.workload.Workload;
 import org.apache.phoenix.pherf.workload.WorkloadExecutor;
@@ -60,6 +64,8 @@ public class Pherf {
                 "HBase Zookeeper address for connection. Default: localhost");
         options.addOption("q", "query", false, "Executes multi-threaded query sets");
         options.addOption("listFiles", false, "List available resource files");
+        options.addOption("mt", "multi-tenant", false,
+                "Multi tenanted workloads based on load profiles.");
         options.addOption("l", "load", false,
                 "Pre-loads data according to specified configuration values.");
         options.addOption("scenarioFile", true,
@@ -103,6 +109,7 @@ public class Pherf {
     private final String queryHint;
     private final Properties properties;
     private final boolean preLoadData;
+    private final boolean multiTenantWorkload;
     private final String dropPherfTablesRegEx;
     private final boolean executeQuerySets;
     private final boolean isFunctional;
@@ -148,6 +155,7 @@ public class Pherf {
         properties.setProperty(PherfConstants.LOG_PER_NROWS_NAME, getLogPerNRow(command));
 
         preLoadData = command.hasOption("l");
+        multiTenantWorkload = command.hasOption("mt");
         executeQuerySets = command.hasOption("q");
         zookeeper = command.getOptionValue("z", "localhost");
         queryHint = command.getOptionValue("hint", null);
@@ -288,17 +296,37 @@ public class Pherf {
             }
 
             // Schema and Data Load
-            if (preLoadData) {
+            if (preLoadData || multiTenantWorkload) {
                 LOGGER.info("\nStarting Data Load...");
-                Workload workload = new WriteWorkload(parser, generateStatistics);
+                List<Workload> newWorkloads = Lists.newArrayList();
                 try {
-                    workloadExecutor.add(workload);
+                    if (multiTenantWorkload) {
+                        for (DataModel model : parser.getDataModels()) {
+                            for (Scenario scenario : model.getScenarios()) {
+                                Workload workload = new TenantOperationWorkload(phoenixUtil,
+                                        model, scenario, properties);
+                                newWorkloads.add(workload);
+                            }
+                        }
+                    } else {
+                        newWorkloads.add(new WriteWorkload(parser, generateStatistics));
+                    }
+
+                    if (newWorkloads.isEmpty()) {
+                        throw new IllegalArgumentException("Found no new workload");
+                    }
+
+                    for (Workload workload : newWorkloads) {
+                        workloadExecutor.add(workload);
+                    }
 
                     // Wait for dataLoad to complete
-                    workloadExecutor.get(workload);
+                    workloadExecutor.get();
                 } finally {
-                    if (null != workload) {
-                        workload.complete();
+                    if (!newWorkloads.isEmpty()) {
+                        for (Workload workload : newWorkloads) {
+                            workload.complete();
+                        }
                     }
                 }
             } else {
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
index 129bdc2..3bbe728 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
@@ -30,7 +30,9 @@ public enum DataTypeMapping {
     VARCHAR_ARRAY("VARCHAR ARRAY", Types.ARRAY),
     VARBINARY("VARBINARY", Types.VARBINARY),
     TIMESTAMP("TIMESTAMP", Types.TIMESTAMP),
+    BOOLEAN("BOOLEAN", Types.BOOLEAN),
     BIGINT("BIGINT", Types.BIGINT),
+    UNSIGNED_INT("UNSIGNED_INT", Types.INTEGER),
     TINYINT("TINYINT", Types.TINYINT);
 
     private final String sType;
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/IdleTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/IdleTime.java
new file mode 100644
index 0000000..37d6e15
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/IdleTime.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.configuration;
+
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlType;
+
+@XmlType
+public class IdleTime {
+
+    private String id;
+    private long idleTime = 0;
+
+    @XmlAttribute
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    @XmlAttribute
+    public long getIdleTime() {
+        return idleTime;
+    }
+
+    public void setIdleTime(long idleTime) {
+        this.idleTime = idleTime;
+    }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/LoadProfile.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/LoadProfile.java
new file mode 100644
index 0000000..3116244
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/LoadProfile.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.configuration;
+
+import javax.xml.bind.annotation.XmlType;
+import java.util.List;
+
+@XmlType
+public class LoadProfile {
+    private static final int MIN_BATCH_SIZE = 1;
+    private static final String DEFAULT_TENANT_ID_FMT = "00D%s%07d";
+    private static final int DEFAULT_GROUP_ID_LEN = 5;
+    private static final int DEFAULT_TENANT_ID_LEN = 15;
+
+    // Holds the batch size to be used in upserts.
+    private int batchSize;
+    // Holds the number of operations to be generated.
+    private long numOperations;
+    /**
+     * Holds the format to be used when generating tenantIds.
+     * TenantId format should typically have 2 parts -
+     * 1. string fmt - that hold the tenant group id.
+     * 2. int fmt - that holds a random number between 1 and max tenants
+     * for e.g DEFAULT_TENANT_ID_FMT = "00D%s%07d";
+     */
+    private String tenantIdFormat;
+    private int groupIdLength;
+    private int tenantIdLength;
+    // Holds the desired tenant distribution for this load.
+    private List<TenantGroup> tenantDistribution;
+    // Holds the desired operation distribution for this load.
+    private List<OperationGroup> opDistribution;
+
+    public LoadProfile() {
+        this.batchSize = MIN_BATCH_SIZE;
+        this.numOperations = Long.MAX_VALUE;
+        this.tenantIdFormat = DEFAULT_TENANT_ID_FMT;
+        this.tenantIdLength = DEFAULT_TENANT_ID_LEN;
+        this.groupIdLength = DEFAULT_GROUP_ID_LEN;
+    }
+
+    public String getTenantIdFormat() {
+        return tenantIdFormat;
+    }
+
+    public void setTenantIdFormat(String tenantIdFormat) {
+        this.tenantIdFormat = tenantIdFormat;
+    }
+
+    public int getTenantIdLength() {
+        return tenantIdLength;
+    }
+
+    public void setTenantIdLength(int tenantIdLength) {
+        this.tenantIdLength = tenantIdLength;
+    }
+
+    public int getGroupIdLength() {
+        return groupIdLength;
+    }
+
+    public void setGroupIdLength(int groupIdLength) {
+        this.groupIdLength = groupIdLength;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public long getNumOperations() {
+        return numOperations;
+    }
+
+    public void setNumOperations(long numOperations) {
+        this.numOperations = numOperations;
+    }
+
+    public List<TenantGroup> getTenantDistribution() {
+        return tenantDistribution;
+    }
+
+    public void setTenantDistribution(List<TenantGroup> tenantDistribution) {
+        this.tenantDistribution = tenantDistribution;
+    }
+
+    public List<OperationGroup> getOpDistribution() {
+        return opDistribution;
+    }
+
+    public void setOpDistribution(List<OperationGroup> opDistribution) {
+        this.opDistribution = opDistribution;
+    }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/OperationGroup.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/OperationGroup.java
new file mode 100644
index 0000000..31545b2
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/OperationGroup.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.configuration;
+
+import javax.xml.bind.annotation.XmlAttribute;
+
+public class OperationGroup {
+    private String id;
+    private int weight;
+
+    @XmlAttribute
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    @XmlAttribute
+    public int getWeight() {
+        return weight;
+    }
+
+    public void setWeight(int weight) {
+        this.weight = weight;
+    }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
index 5f28134..c47798c 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
@@ -18,23 +18,23 @@
 
 package org.apache.phoenix.pherf.configuration;
 
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import org.apache.phoenix.pherf.rules.RulesApplier;
 
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlType;
-
-import org.apache.phoenix.pherf.rules.RulesApplier;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 @XmlType
 public class Query {
 
+    private String id;
+    private String queryGroup;
+    private String tenantId;
     private String statement;
     private Long expectedAggregateRowCount;
-    private String tenantId;
     private String ddl;
-    private String queryGroup;
-    private String id;
+    private boolean useGlobalConnection;
     private Pattern pattern;
     private long timeoutDuration = Long.MAX_VALUE;
 
@@ -51,20 +51,24 @@ public class Query {
     public String getStatement() {
         return statement;
     }
-    
-    public String getDynamicStatement(RulesApplier ruleApplier, Scenario scenario) throws Exception {
-    	String ret = this.statement;
-    	String needQuotes = "";
-    	Matcher m = pattern.matcher(ret);
-        while(m.find()) {
-        	String dynamicField = m.group(0).replace("[", "").replace("]", "");
-        	Column dynamicColumn = ruleApplier.getRule(dynamicField, scenario);
-			needQuotes = (dynamicColumn.getType() == DataTypeMapping.CHAR || dynamicColumn
-					.getType() == DataTypeMapping.VARCHAR) ? "'" : "";
-			ret = ret.replace("[" + dynamicField + "]",
-					needQuotes + ruleApplier.getDataValue(dynamicColumn).getValue() + needQuotes);
-     }
-      	return ret;    	
+
+    public String getDynamicStatement(RulesApplier ruleApplier, Scenario scenario)
+            throws Exception {
+        String ret = this.statement;
+        String needQuotes = "";
+        Matcher m = pattern.matcher(ret);
+        while (m.find()) {
+            String dynamicField = m.group(0).replace("[", "").replace("]", "");
+            Column dynamicColumn = ruleApplier.getRule(dynamicField, scenario);
+            needQuotes =
+                    (dynamicColumn.getType() == DataTypeMapping.CHAR
+                            || dynamicColumn.getType() == DataTypeMapping.VARCHAR) ? "'" : "";
+            ret =
+                    ret.replace("[" + dynamicField + "]",
+                            needQuotes + ruleApplier.getDataValue(dynamicColumn).getValue()
+                                    + needQuotes);
+        }
+        return ret;
     }
 
     public void setStatement(String statement) {
@@ -160,6 +164,14 @@ public class Query {
         this.id = id;
     }
 
+    @XmlAttribute
+    public boolean isUseGlobalConnection() {
+        return useGlobalConnection;
+    }
+
+    public void setUseGlobalConnection(boolean useGlobalConnection) {
+        this.useGlobalConnection = useGlobalConnection;
+    }
 
     @XmlAttribute
     public long getTimeoutDuration() {
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
index 53b2d25..32cfc1e 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
@@ -27,7 +27,7 @@ import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlElementWrapper;
 import javax.xml.bind.annotation.XmlRootElement;
 
-import com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.phoenix.pherf.util.PhoenixUtil;
 
@@ -36,15 +36,19 @@ public class Scenario {
     private String tableName;
     private int rowCount;
     private Map<String, String> phoenixProperties;
+    private WriteParams writeParams = null;
     private DataOverride dataOverride;
     private List<QuerySet> querySet = new ArrayList<>();
-    private WriteParams writeParams = null;
+    private List<Upsert> upsertSet = new ArrayList<>();
+    private List<IdleTime> idleTimes = new ArrayList<>();
+    private List<UserDefined> udfs = new ArrayList<>();
+    private LoadProfile loadProfile = null;
+
     private String name;
     private String tenantId;
     private List<Ddl> preScenarioDdls;
     private List<Ddl> postScenarioDdls;
-    
-   
+
     public Scenario() {
     }
 
@@ -194,6 +198,7 @@ public class Scenario {
         this.writeParams = writeParams;
     }
 
+
     @Override
     public String toString() {
         StringBuilder stringBuilder = new StringBuilder();
@@ -232,4 +237,43 @@ public class Scenario {
 	public void setPostScenarioDdls(List<Ddl> postScenarioDdls) {
 		this.postScenarioDdls = postScenarioDdls;
 	}
+
+    public List<Upsert> getUpserts() {
+        return upsertSet;
+    }
+
+    @XmlElementWrapper(name = "upserts")
+    @XmlElement(name = "upsert")
+    public void setUpserts(List<Upsert> upsertSet) {
+        this.upsertSet = upsertSet;
+    }
+
+    public List<IdleTime> getIdleTimes() {
+        return idleTimes;
+    }
+
+    @XmlElementWrapper(name = "idleTimes")
+    @XmlElement(name = "idleTime")
+    public void setIdleTimes(List<IdleTime> idleTimes) {
+        this.idleTimes = idleTimes;
+    }
+
+    public List<UserDefined> getUdfs() {
+        return udfs;
+    }
+
+    @XmlElementWrapper(name = "udfs")
+    @XmlElement(name = "udf")
+    public void setUdfs(List<UserDefined> udfs) {
+        this.udfs = udfs;
+    }
+
+
+    public LoadProfile getLoadProfile() {
+        return loadProfile;
+    }
+
+    public void setLoadProfile(LoadProfile loadProfile) {
+        this.loadProfile = loadProfile;
+    }
 }
\ No newline at end of file
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/TenantGroup.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/TenantGroup.java
new file mode 100644
index 0000000..0656917
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/TenantGroup.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.configuration;
+
+import javax.xml.bind.annotation.XmlAttribute;
+
+public class TenantGroup {
+    private String id;
+    private int weight;
+    private int numTenants;
+
+    @XmlAttribute
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    @XmlAttribute
+    public int getWeight() {
+        return weight;
+    }
+
+    public void setWeight(int weight) {
+        this.weight = weight;
+    }
+
+    @XmlAttribute
+    public int getNumTenants() { return numTenants; }
+
+    public void setNumTenants(int numTenants) { this.numTenants = numTenants; }
+
+
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Upsert.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Upsert.java
new file mode 100644
index 0000000..dfbe9e6
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Upsert.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.configuration;
+
+import org.apache.phoenix.pherf.rules.RulesApplier;
+
+import javax.xml.bind.annotation.XmlAttribute;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class Upsert {
+
+    private String id;
+    private String upsertGroup;
+    private String statement;
+    private List<Column> columns;
+    private boolean useGlobalConnection;
+    private Pattern pattern;
+    private long timeoutDuration = Long.MAX_VALUE;
+
+    public Upsert() {
+    	pattern = Pattern.compile("\\[.*?\\]");
+    }
+
+    public String getDynamicStatement(RulesApplier ruleApplier, Scenario scenario)
+            throws Exception {
+        String ret = this.statement;
+        String needQuotes = "";
+        Matcher m = pattern.matcher(ret);
+        while (m.find()) {
+            String dynamicField = m.group(0).replace("[", "").replace("]", "");
+            Column dynamicColumn = ruleApplier.getRule(dynamicField, scenario);
+            needQuotes =
+                    (dynamicColumn.getType() == DataTypeMapping.CHAR
+                            || dynamicColumn.getType() == DataTypeMapping.VARCHAR) ? "'" : "";
+            ret = ret.replace("[" + dynamicField + "]",
+                    needQuotes + ruleApplier.getDataValue(dynamicColumn).getValue()
+                                    + needQuotes);
+        }
+        return ret;
+    }
+
+    /**
+     * upsertGroup attribute is just a string value to help correlate upserts across sets or files.
+     * This helps to make sense of reporting results.
+     *
+     * @return the group id
+     */
+    @XmlAttribute
+    public String getUpsertGroup() {
+        return upsertGroup;
+    }
+
+    public void setUpsertGroup(String upsertGroup) {
+        this.upsertGroup = upsertGroup;
+    }
+
+
+    /**
+     * Upsert ID, Use UUID if none specified
+     *
+     * @return
+     */
+    @XmlAttribute
+    public String getId() {
+        if (null == this.id) {
+            this.id = java.util.UUID.randomUUID().toString();
+        }
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public List<Column> getColumns() {
+        return columns;
+    }
+
+    public void setColumns(List<Column> columns) {
+        this.columns = columns;
+    }
+
+    @XmlAttribute
+    public boolean isUseGlobalConnection() {
+        return useGlobalConnection;
+    }
+
+    public void setUseGlobalConnection(boolean useGlobalConnection) {
+        this.useGlobalConnection = useGlobalConnection;
+    }
+
+    @XmlAttribute
+    public long getTimeoutDuration() {
+        return this.timeoutDuration;
+    }
+
+    public void setTimeoutDuration(long timeoutDuration) {
+        this.timeoutDuration = timeoutDuration;
+    }
+
+    public String getStatement() {
+        return statement;
+    }
+
+    public void setStatement(String statement) {
+        // normalize statement - merge all consecutive spaces into one
+        this.statement = statement.replaceAll("\\s+", " ");
+    }
+
+    public List<Column> getColumn() {
+        return columns;
+    }
+
+    public void setColumn(List<Column> columns) {
+        this.columns = columns;
+    }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/UserDefined.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/UserDefined.java
new file mode 100644
index 0000000..8350d57
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/UserDefined.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.configuration;
+
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlType;
+import java.util.List;
+
+@XmlType
+public class UserDefined {
+    String id;
+    String clazzName;
+    List<String> args;
+
+    @XmlAttribute
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getClazzName() {
+        return clazzName;
+    }
+
+    public void setClazzName(String clazzName) {
+        this.clazzName = clazzName;
+    }
+
+    public List<String> getArgs() {
+        return args;
+    }
+
+    public void setArgs(List<String> args) {
+        this.args = args;
+    }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
index b066e00..aeb3ec5 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
@@ -18,8 +18,9 @@
 
 package org.apache.phoenix.pherf.rules;
 
-import com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.math3.random.RandomDataGenerator;
 import org.apache.phoenix.pherf.PherfConstants;
@@ -51,6 +52,7 @@ public class RulesApplier {
     private final Random rndVal;
     private final RandomDataGenerator randomDataGenerator;
 
+    private final DataModel dataModel;
     private final XMLConfigParser parser;
     private final List<Map> modelList;
     private final Map<String, Column> columnMap;
@@ -59,6 +61,30 @@ public class RulesApplier {
 
     private Map<Column,RuleBasedDataGenerator> columnRuleBasedDataGeneratorMap = new HashMap<>();
 
+    // Since rules are only relevant for a given data model,
+    // added a constructor to support a single data model => RulesApplier(DataModel model)
+
+    // We should deprecate the RulesApplier(XMLConfigParser parser) constructor,
+    // since a parser can have multiple data models (all the models found on the classpath)
+    // it implies that the rules apply to all the data models the parser holds
+    // which can be confusing to the user of this class.
+    //
+
+    public RulesApplier(DataModel model) {
+        this(model, EnvironmentEdgeManager.currentTimeMillis());
+    }
+
+    public RulesApplier(DataModel model, long seed) {
+        this.parser = null;
+        this.dataModel = model;
+        this.modelList = new ArrayList<Map>();
+        this.columnMap = new HashMap<String, Column>();
+        this.rndNull = new Random(seed);
+        this.rndVal = new Random(seed);
+        this.randomDataGenerator = new RandomDataGenerator();
+        this.cachedScenarioOverrideName = null;
+        populateModelList();
+    }
 
     public RulesApplier(XMLConfigParser parser) {
         this(parser, EnvironmentEdgeManager.currentTimeMillis());
@@ -66,6 +92,7 @@ public class RulesApplier {
 
     public RulesApplier(XMLConfigParser parser, long seed) {
         this.parser = parser;
+        this.dataModel = null;
         this.modelList = new ArrayList<Map>();
         this.columnMap = new HashMap<String, Column>();
         this.rndNull = new Random(seed);
@@ -116,10 +143,10 @@ public class RulesApplier {
     public DataValue getDataForRule(Scenario scenario, Column phxMetaColumn) throws Exception {
         // TODO Make a Set of Rules that have already been applied so that so we don't generate for every value
     	
-        List<Scenario> scenarios = parser.getScenarios();
+        List<Scenario> scenarios = dataModel != null ? dataModel.getScenarios() : parser.getScenarios();
         DataValue value = null;
         if (scenarios.contains(scenario)) {
-            LOGGER.debug("We found a correct Scenario");
+            LOGGER.debug("We found a correct Scenario" + scenario.getName());
             
             Map<DataTypeMapping, List> overrideRuleMap = this.getCachedScenarioOverrides(scenario);
             
@@ -138,9 +165,10 @@ public class RulesApplier {
             // Assume the first rule map
             Map<DataTypeMapping, List> ruleMap = modelList.get(0);
             List<Column> ruleList = ruleMap.get(phxMetaColumn.getType());
+            //LOGGER.info(String.format("Did not found a correct override column rule, %s, %s", phxMetaColumn.getName(), phxMetaColumn.getType()));
 
             // Make sure Column from Phoenix Metadata matches a rule column
-            if (ruleList.contains(phxMetaColumn)) {
+            if (ruleList != null && ruleList.contains(phxMetaColumn)) {
                 // Generate some random data based on this rule
                 LOGGER.debug("We found a correct column rule");
                 Column columnRule = getColumnForRule(ruleList, phxMetaColumn);
@@ -422,9 +450,18 @@ public class RulesApplier {
         if (!modelList.isEmpty()) {
             return;
         }
-        
-        // Support for multiple models, but rules are only relevant each model
-        for (DataModel model : parser.getDataModels()) {
+
+        // Since rules are only relevant for a given data model,
+        // added a constructor to support a single data model => RulesApplier(DataModel model)
+
+        // We should deprecate the RulesApplier(XMLConfigParser parser) constructor,
+        // since a parser can have multiple data models (all the models found on the classpath)
+        // it implies that the rules apply to all the data models the parser holds
+        // which can be confusing to the user of this class.
+
+        List<DataModel> models = dataModel != null ?
+                Lists.newArrayList(dataModel) : parser.getDataModels();
+        for (DataModel model : models) {
 
             // Step 1
             final Map<DataTypeMapping, List> ruleMap = new HashMap<DataTypeMapping, List>();
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/SequentialIntegerDataGenerator.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/SequentialIntegerDataGenerator.java
index 1d1a7d0..125e0d7 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/SequentialIntegerDataGenerator.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/SequentialIntegerDataGenerator.java
@@ -18,7 +18,7 @@
 
 package org.apache.phoenix.pherf.rules;
 
-import com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import org.apache.phoenix.pherf.configuration.Column;
 import org.apache.phoenix.pherf.configuration.DataSequence;
 import org.apache.phoenix.pherf.configuration.DataTypeMapping;
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
index 34f45b2..1b5ba33 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
@@ -18,16 +18,33 @@
 
 package org.apache.phoenix.pherf.util;
 
+import com.google.gson.Gson;
 import org.apache.phoenix.mapreduce.index.automation.PhoenixMRJobSubmitter;
 import org.apache.phoenix.pherf.PherfConstants;
-import org.apache.phoenix.pherf.configuration.*;
+import org.apache.phoenix.pherf.configuration.Column;
+import org.apache.phoenix.pherf.configuration.DataTypeMapping;
+import org.apache.phoenix.pherf.configuration.Ddl;
+import org.apache.phoenix.pherf.configuration.Query;
+import org.apache.phoenix.pherf.configuration.QuerySet;
+import org.apache.phoenix.pherf.configuration.Scenario;
 import org.apache.phoenix.pherf.result.DataLoadTimeSummary;
+import org.apache.phoenix.pherf.rules.DataValue;
 import org.apache.phoenix.pherf.rules.RulesApplier;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.sql.*;
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -40,6 +57,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
 
 public class PhoenixUtil {
+    public static final String ASYNC_KEYWORD = "ASYNC";
+    public static final Gson GSON = new Gson();
     private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixUtil.class);
     private static String zookeeper;
     private static int rowCountOverride = 0;
@@ -47,7 +66,6 @@ public class PhoenixUtil {
     private static PhoenixUtil instance;
     private static boolean useThinDriver;
     private static String queryServerUrl;
-    private static final String ASYNC_KEYWORD = "ASYNC";
     private static final int ONE_MIN_IN_MS = 60000;
     private static String CurrentSCN = null;
 
@@ -81,6 +99,10 @@ public class PhoenixUtil {
         return PhoenixUtil.useThinDriver;
     }
 
+    public static Gson getGSON() {
+        return GSON;
+    }
+
     public Connection getConnection() throws Exception {
         return getConnection(null);
     }
@@ -262,6 +284,7 @@ public class PhoenixUtil {
                 column.setType(DataTypeMapping.valueOf(resultSet.getString("TYPE_NAME").replace(" ", "_")));
                 column.setLength(resultSet.getInt("COLUMN_SIZE"));
                 columnList.add(column);
+                LOGGER.debug(String.format("getColumnsMetaData for column name : %s", column.getName()));
             }
         } finally {
             if (null != resultSet) {
@@ -330,7 +353,7 @@ public class PhoenixUtil {
      * @param tableName
      * @throws InterruptedException
      */
-    private void waitForAsyncIndexToFinish(String tableName) throws InterruptedException {
+    public void waitForAsyncIndexToFinish(String tableName) throws InterruptedException {
     	//Wait for up to 15 mins for ASYNC index build to start
     	boolean jobStarted = false;
     	for (int i=0; i<15; i++) {
@@ -450,4 +473,156 @@ public class PhoenixUtil {
         }
         return buf.toString();
     }
+
+    public PreparedStatement buildStatement(RulesApplier rulesApplier, Scenario scenario, List<Column> columns,
+            PreparedStatement statement, SimpleDateFormat simpleDateFormat) throws Exception {
+
+        int count = 1;
+        for (Column column : columns) {
+            DataValue dataValue = rulesApplier.getDataForRule(scenario, column);
+            switch (column.getType()) {
+            case VARCHAR:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.VARCHAR);
+                } else {
+                    statement.setString(count, dataValue.getValue());
+                }
+                break;
+            case CHAR:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.CHAR);
+                } else {
+                    statement.setString(count, dataValue.getValue());
+                }
+                break;
+            case DECIMAL:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.DECIMAL);
+                } else {
+                    statement.setBigDecimal(count, new BigDecimal(dataValue.getValue()));
+                }
+                break;
+            case INTEGER:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.INTEGER);
+                } else {
+                    statement.setInt(count, Integer.parseInt(dataValue.getValue()));
+                }
+                break;
+            case UNSIGNED_LONG:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.OTHER);
+                } else {
+                    statement.setLong(count, Long.parseLong(dataValue.getValue()));
+                }
+                break;
+            case BIGINT:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.BIGINT);
+                } else {
+                    statement.setLong(count, Long.parseLong(dataValue.getValue()));
+                }
+                break;
+            case TINYINT:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.TINYINT);
+                } else {
+                    statement.setLong(count, Integer.parseInt(dataValue.getValue()));
+                }
+                break;
+            case DATE:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.DATE);
+                } else {
+                    Date
+                            date =
+                            new java.sql.Date(simpleDateFormat.parse(dataValue.getValue()).getTime());
+                    statement.setDate(count, date);
+                }
+                break;
+            case VARCHAR_ARRAY:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.ARRAY);
+                } else {
+                    Array
+                            arr =
+                            statement.getConnection().createArrayOf("VARCHAR", dataValue.getValue().split(","));
+                    statement.setArray(count, arr);
+                }
+                break;
+            case VARBINARY:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.VARBINARY);
+                } else {
+                    statement.setBytes(count, dataValue.getValue().getBytes());
+                }
+                break;
+            case TIMESTAMP:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.TIMESTAMP);
+                } else {
+                    java.sql.Timestamp
+                            ts =
+                            new java.sql.Timestamp(simpleDateFormat.parse(dataValue.getValue()).getTime());
+                    statement.setTimestamp(count, ts);
+                }
+                break;
+            default:
+                break;
+            }
+            count++;
+        }
+        return statement;
+    }
+
+    public String buildSql(final List<Column> columns, final String tableName) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("upsert into ");
+        builder.append(tableName);
+        builder.append(" (");
+        int count = 1;
+        for (Column column : columns) {
+            builder.append(column.getName());
+            if (count < columns.size()) {
+                builder.append(",");
+            } else {
+                builder.append(")");
+            }
+            count++;
+        }
+        builder.append(" VALUES (");
+        for (int i = 0; i < columns.size(); i++) {
+            if (i < columns.size() - 1) {
+                builder.append("?,");
+            } else {
+                builder.append("?)");
+            }
+        }
+        return builder.toString();
+    }
+
+    public org.apache.hadoop.hbase.util.Pair<Long, Long> getResults(
+            Query query,
+            ResultSet rs,
+            String queryIteration,
+            boolean isSelectCountStatement,
+            Long queryStartTime) throws Exception {
+
+        Long resultRowCount = 0L;
+        while (rs.next()) {
+            if (isSelectCountStatement) {
+                resultRowCount = rs.getLong(1);
+            } else {
+                resultRowCount++;
+            }
+            long queryElapsedTime = EnvironmentEdgeManager.currentTimeMillis() - queryStartTime;
+            if (queryElapsedTime >= query.getTimeoutDuration()) {
+                LOGGER.error("Query " + queryIteration + " exceeded timeout of "
+                        +  query.getTimeoutDuration() + " ms at " + queryElapsedTime + " ms.");
+                return new org.apache.hadoop.hbase.util.Pair(resultRowCount, queryElapsedTime);
+            }
+        }
+        return new org.apache.hadoop.hbase.util.Pair(resultRowCount, EnvironmentEdgeManager.currentTimeMillis() - queryStartTime);
+    }
+
 }
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/ResourceList.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/ResourceList.java
index 64ee6ee..dd3c4fd 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/ResourceList.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/ResourceList.java
@@ -29,7 +29,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.regex.Pattern;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipException;
@@ -40,7 +42,7 @@ import org.apache.phoenix.pherf.exception.PherfException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 
 /**
  * list resources available from the classpath @ *
@@ -76,7 +78,7 @@ public class ResourceList {
 
         final String classPath = System.getProperty("java.class.path", ".");
         final String[] classPathElements = classPath.split(":");
-        List<String> strResources = new ArrayList<>();
+        Set<String> strResources = new HashSet<>();
         Collection<Path> paths = new ArrayList<>();
 
         // TODO Make getResourcesPaths() return the URLs directly instead of converting them
@@ -112,6 +114,7 @@ public class ResourceList {
             paths.add(path);
         }
 
+        Collections.sort((List<Path>)paths);
         return paths;
     }
 
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
index c4c38bd..5d4b973 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
@@ -25,7 +25,7 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.concurrent.Callable;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.pherf.result.DataModelResult;
 import org.apache.phoenix.pherf.result.ResultManager;
@@ -161,7 +161,7 @@ class MultiThreadedRunner implements Callable<Void> {
             conn.setAutoCommit(true);
             final String statementString = query.getDynamicStatement(ruleApplier, scenario);
             statement = conn.prepareStatement(statementString);
-            LOGGER.info("Executing iteration: " + queryIteration + ": " + statementString);
+            LOGGER.debug("Executing iteration: " + queryIteration + ": " + statementString);
             
             if (scenario.getWriteParams() != null) {
             	Workload writes = new WriteWorkload(PhoenixUtil.create(), parser, scenario, GeneratePhoenixStats.NO);
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
index 068acda..8dfcbf9 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
@@ -22,7 +22,7 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.concurrent.Callable;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.pherf.PherfConstants;
 import org.apache.phoenix.pherf.configuration.Query;
 import org.apache.phoenix.pherf.result.RunTime;
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
index 1d38e3d..381751d 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
@@ -18,7 +18,7 @@
 
 package org.apache.phoenix.pherf.workload;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.pherf.PherfConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
index 613fb23..b6a5ac6 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
@@ -274,11 +274,11 @@ public class WriteWorkload implements Workload {
                         logPerNRows = Integer.valueOf(customizedLogPerNRows);
                     }
                     last = start = EnvironmentEdgeManager.currentTimeMillis();
-                    String sql = buildSql(columns, tableName);
+                    String sql = pUtil.buildSql(columns, tableName);
                     stmt = connection.prepareStatement(sql);
                     for (long i = rowCount; (i > 0) && ((EnvironmentEdgeManager.currentTimeMillis() - logStartTime)
                             < maxDuration); i--) {
-                        stmt = buildStatement(scenario, columns, stmt, simpleDateFormat);
+                        stmt = pUtil.buildStatement(rulesApplier, scenario, columns, stmt, simpleDateFormat);
                         if (useBatchApi) {
                             stmt.addBatch();
                         } else {
@@ -362,133 +362,6 @@ public class WriteWorkload implements Workload {
         return future;
     }
 
-    private PreparedStatement buildStatement(Scenario scenario, List<Column> columns,
-            PreparedStatement statement, SimpleDateFormat simpleDateFormat) throws Exception {
-        int count = 1;
-        for (Column column : columns) {
-
-            DataValue dataValue = getRulesApplier().getDataForRule(scenario, column);
-            switch (column.getType()) {
-            case VARCHAR:
-                if (dataValue.getValue().equals("")) {
-                    statement.setNull(count, Types.VARCHAR);
-                } else {
-                    statement.setString(count, dataValue.getValue());
-                }
-                break;
-            case CHAR:
-                if (dataValue.getValue().equals("")) {
-                    statement.setNull(count, Types.CHAR);
-                } else {
-                    statement.setString(count, dataValue.getValue());
-                }
-                break;
-            case DECIMAL:
-                if (dataValue.getValue().equals("")) {
-                    statement.setNull(count, Types.DECIMAL);
-                } else {
-                    statement.setBigDecimal(count, new BigDecimal(dataValue.getValue()));
-                }
-                break;
-            case INTEGER:
-                if (dataValue.getValue().equals("")) {
-                    statement.setNull(count, Types.INTEGER);
-                } else {
-                    statement.setInt(count, Integer.parseInt(dataValue.getValue()));
-                }
-                break;
-            case UNSIGNED_LONG:
-                if (dataValue.getValue().equals("")) {
-                    statement.setNull(count, Types.OTHER);
-                } else {
-                    statement.setLong(count, Long.parseLong(dataValue.getValue()));
-                }
-                break;
-            case BIGINT:
-                if (dataValue.getValue().equals("")) {
-                    statement.setNull(count, Types.BIGINT);
-                } else {
-                    statement.setLong(count, Long.parseLong(dataValue.getValue()));
-                }
-                break;
-            case TINYINT:
-                if (dataValue.getValue().equals("")) {
-                    statement.setNull(count, Types.TINYINT);
-                } else {
-                    statement.setLong(count, Integer.parseInt(dataValue.getValue()));
-                }
-                break;
-            case DATE:
-                if (dataValue.getValue().equals("")) {
-                    statement.setNull(count, Types.DATE);
-                } else {
-                    Date
-                            date =
-                            new java.sql.Date(simpleDateFormat.parse(dataValue.getValue()).getTime());
-                    statement.setDate(count, date);
-                }
-                break;
-            case VARCHAR_ARRAY:
-                if (dataValue.getValue().equals("")) {
-                    statement.setNull(count, Types.ARRAY);
-                } else {
-                    Array
-                            arr =
-                            statement.getConnection().createArrayOf("VARCHAR", dataValue.getValue().split(","));
-                    statement.setArray(count, arr);
-                }
-            	break;
-            case VARBINARY:
-                if (dataValue.getValue().equals("")) {
-                    statement.setNull(count, Types.VARBINARY);
-                } else {
-                    statement.setBytes(count, dataValue.getValue().getBytes());
-                }
-                break;
-            case TIMESTAMP:
-                if (dataValue.getValue().equals("")) {
-                    statement.setNull(count, Types.TIMESTAMP);
-                } else {
-                    java.sql.Timestamp
-                            ts =
-                            new java.sql.Timestamp(simpleDateFormat.parse(dataValue.getValue()).getTime());
-                    statement.setTimestamp(count, ts);
-                }
-                break;
-            default:
-                break;
-            }
-            count++;
-        }
-        return statement;
-    }
-
-    private String buildSql(final List<Column> columns, final String tableName) {
-        StringBuilder builder = new StringBuilder();
-        builder.append("upsert into ");
-        builder.append(tableName);
-        builder.append(" (");
-        int count = 1;
-        for (Column column : columns) {
-            builder.append(column.getName());
-            if (count < columns.size()) {
-                builder.append(",");
-            } else {
-                builder.append(")");
-            }
-            count++;
-        }
-        builder.append(" VALUES (");
-        for (int i = 0; i < columns.size(); i++) {
-            if (i < columns.size() - 1) {
-                builder.append("?,");
-            } else {
-                builder.append("?)");
-            }
-        }
-        return builder.toString();
-    }
-
     public XMLConfigParser getParser() {
         return parser;
     }
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/EventGenerator.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/EventGenerator.java
new file mode 100644
index 0000000..c6ffeea
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/EventGenerator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+/**
+ * An interface that implementers can use to generate events that can be consumed by
+ * @see {@link com.lmax.disruptor.WorkHandler} which provide event handling functionality for
+ * a given event.
+ *
+ * @param <T>
+ */
+public interface EventGenerator<T> {
+    T next();
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/IdleTimeOperation.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/IdleTimeOperation.java
new file mode 100644
index 0000000..bc7762f
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/IdleTimeOperation.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+import org.apache.phoenix.pherf.configuration.IdleTime;
+
+/**
+ * Defines a no op operation, typically used to simulate idle time.
+ * @see {@link OperationType#IDLE_TIME}
+ */
+public interface IdleTimeOperation extends Operation {
+    IdleTime getIdleTime();
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/MultiTenantWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/MultiTenantWorkload.java
new file mode 100644
index 0000000..d2c19e8
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/MultiTenantWorkload.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+
+import java.util.Properties;
+
+public interface MultiTenantWorkload {
+    /**
+     * Initializes and readies the processor for continuous queue based workloads
+     */
+    void start();
+
+    /**
+     * Stop the processor and cleans up the workload queues.
+     */
+    void stop();
+
+
+    PhoenixUtil getPhoenixUtil();
+
+    Scenario getScenario();
+
+    DataModel getModel();
+
+    Properties getProperties();
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/Operation.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/Operation.java
new file mode 100644
index 0000000..8774ae5
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/Operation.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+/**
+ * An interface that defines the type of operation included in the load profile.
+ * @see {@link org.apache.phoenix.pherf.configuration.LoadProfile}
+ */
+public interface Operation {
+    enum OperationType {
+        PRE_RUN, UPSERT, SELECT, IDLE_TIME, USER_DEFINED
+    }
+    String getId();
+    OperationType getType();
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/OperationStats.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/OperationStats.java
new file mode 100644
index 0000000..c032ba4
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/OperationStats.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+import org.apache.phoenix.pherf.result.ResultValue;
+import org.apache.phoenix.pherf.workload.mt.tenantoperation.TenantOperationInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Holds metrics + contextual info on the operation run.
+ */
+public class OperationStats {
+    private final String modelName;
+    private final String scenarioName;
+    private final String tableName;
+    private final String tenantId;
+    private final String tenantGroup;
+    private final String operationGroup;
+    private final Operation.OperationType opType;
+    private String handlerId;
+    private final int status;
+    private final long rowCount;
+    private final long durationInMs;
+    private final long startTime;
+
+    public OperationStats(
+            TenantOperationInfo input,
+            long startTime,
+            int status,
+            long rowCount,
+            long durationInMs) {
+        this.modelName = input.getModelName();
+        this.scenarioName = input.getScenarioName();
+        this.tableName = input.getTableName();
+        this.tenantId = input.getTenantId();
+        this.tenantGroup = input.getTenantGroupId();
+        this.operationGroup = input.getOperationGroupId();
+        this.opType = input.getOperation().getType();
+        this.startTime = startTime;
+        this.status = status;
+        this.rowCount = rowCount;
+        this.durationInMs = durationInMs;
+    }
+
+    public String getModelName() { return modelName; }
+
+    public String getScenarioName() { return scenarioName; }
+
+    public String getTenantId() { return tenantId; }
+
+    public Operation.OperationType getOpType() { return opType; }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public String getTenantGroup() {
+        return tenantGroup;
+    }
+
+    public String getOperationGroup() {
+        return operationGroup;
+    }
+
+    public int getStatus() {
+        return status;
+    }
+
+    public long getRowCount() {
+        return rowCount;
+    }
+
+    public String getHandlerId() { return handlerId; }
+
+    public long getStartTime() { return startTime; }
+
+    public long getDurationInMs() {
+        return durationInMs;
+    }
+
+    public List<ResultValue> getCsvRepresentation() {
+        List<ResultValue> rowValues = new ArrayList<>();
+        rowValues.add(new ResultValue(modelName));
+        rowValues.add(new ResultValue(scenarioName));
+        rowValues.add(new ResultValue(tableName));
+        rowValues.add(new ResultValue(tenantId));
+        rowValues.add(new ResultValue(handlerId));
+        rowValues.add(new ResultValue(tenantGroup));
+        rowValues.add(new ResultValue(operationGroup));
+        rowValues.add(new ResultValue(opType.name()));
+        rowValues.add(new ResultValue(String.valueOf(startTime)));
+        rowValues.add(new ResultValue(String.valueOf(status)));
+        rowValues.add(new ResultValue(String.valueOf(rowCount)));
+        rowValues.add(new ResultValue(String.valueOf(durationInMs)));
+        return rowValues;
+    }
+
+    public void setHandlerId(String handlerId) {
+        this.handlerId = handlerId;
+    }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/PreScenarioOperation.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/PreScenarioOperation.java
new file mode 100644
index 0000000..e0a276c
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/PreScenarioOperation.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+import org.apache.phoenix.pherf.configuration.Ddl;
+
+import java.util.List;
+
+/**
+ * Defines a pre scenario operation.
+ * @see {@link OperationType#PRE_RUN}
+ */
+public interface PreScenarioOperation extends Operation {
+    List<Ddl> getPreScenarioDdls();
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/QueryOperation.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/QueryOperation.java
new file mode 100644
index 0000000..8b7ee78
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/QueryOperation.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+import org.apache.phoenix.pherf.configuration.Query;
+
+/**
+ * Defines a query operation.
+ * @see {@link OperationType#SELECT}
+ */
+public interface QueryOperation extends Operation {
+    Query getQuery();
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/UpsertOperation.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/UpsertOperation.java
new file mode 100644
index 0000000..cb6abd9
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/UpsertOperation.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+import org.apache.phoenix.pherf.configuration.Upsert;
+
+/**
+ * Defines an upsert operation.
+ * @see {@link OperationType#UPSERT}
+ */
+public interface UpsertOperation extends Operation {
+    Upsert getUpsert();
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/UserDefinedOperation.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/UserDefinedOperation.java
new file mode 100644
index 0000000..04f4fd8
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/UserDefinedOperation.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+import org.apache.phoenix.pherf.configuration.UserDefined;
+
+/**
+ * Defines an user defined operation.
+ * @see {@link OperationType#USER_DEFINED}
+ */
+public interface UserDefinedOperation extends Operation {
+    UserDefined getUserFunction();
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/BaseOperationSupplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/BaseOperationSupplier.java
new file mode 100644
index 0000000..cda4504
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/BaseOperationSupplier.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.thirdparty.com.google.common.base.Supplier;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.LoadProfile;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.rules.RulesApplier;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+
+/**
+ * An abstract base class for all OperationSuppliers
+ */
+abstract class BaseOperationSupplier implements Supplier<Function<TenantOperationInfo, OperationStats>> {
+
+    final PhoenixUtil phoenixUtil;
+    final DataModel model;
+    final Scenario scenario;
+    final RulesApplier rulesApplier;
+    final LoadProfile loadProfile;
+
+    public BaseOperationSupplier(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario) {
+        this.phoenixUtil = phoenixUtil;
+        this.model = model;
+        this.scenario = scenario;
+        this.rulesApplier = new RulesApplier(model);
+        this.loadProfile = this.scenario.getLoadProfile();
+    }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/IdleTimeOperationSupplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/IdleTimeOperationSupplier.java
new file mode 100644
index 0000000..ab45c27
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/IdleTimeOperationSupplier.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.IdleTime;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.mt.IdleTimeOperation;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A supplier of {@link Function} that takes {@link IdleTimeOperation} as an input.
+ */
+class IdleTimeOperationSupplier extends BaseOperationSupplier {
+    private static final Logger LOGGER = LoggerFactory.getLogger(IdleTimeOperationSupplier.class);
+
+    public IdleTimeOperationSupplier(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario) {
+        super(phoenixUtil, model, scenario);
+    }
+
+    @Override
+    public Function<TenantOperationInfo, OperationStats> get() {
+
+        return new Function<TenantOperationInfo, OperationStats>() {
+
+            @Override
+            public OperationStats apply(final TenantOperationInfo input) {
+
+                final IdleTimeOperation operation = (IdleTimeOperation) input.getOperation();
+                final IdleTime idleTime = operation.getIdleTime();
+
+                final String tenantId = input.getTenantId();
+                final String tenantGroup = input.getTenantGroupId();
+                final String opGroup = input.getOperationGroupId();
+                final String tableName = input.getTableName();
+                final String scenarioName = input.getScenarioName();
+                final String opName = String.format("%s:%s:%s:%s:%s", scenarioName, tableName,
+                                                opGroup, tenantGroup, tenantId);
+
+                long startTime = EnvironmentEdgeManager.currentTimeMillis();
+                int status = 0;
+
+                // Sleep for the specified time to simulate idle time.
+                try {
+                    TimeUnit.MILLISECONDS.sleep(idleTime.getIdleTime());
+                } catch (InterruptedException ie) {
+                    LOGGER.error("Operation " + opName + " failed with exception ", ie);
+                    status = -1;
+                }
+                long duration = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+                return new OperationStats(input, startTime, status, 0, duration);
+            }
+        };
+    }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/PreScenarioOperationSupplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/PreScenarioOperationSupplier.java
new file mode 100644
index 0000000..4f1e3e3
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/PreScenarioOperationSupplier.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Ddl;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.apache.phoenix.pherf.workload.mt.PreScenarioOperation;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ * A supplier of {@link Function} that takes {@link PreScenarioOperation} as an input
+ */
+class PreScenarioOperationSupplier extends BaseOperationSupplier {
+    private static final Logger LOGGER = LoggerFactory.getLogger(PreScenarioOperationSupplier.class);
+
+    public PreScenarioOperationSupplier(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario) {
+        super(phoenixUtil, model, scenario);
+    }
+
+    @Override
+    public Function<TenantOperationInfo, OperationStats> get() {
+        return new Function<TenantOperationInfo, OperationStats>() {
+
+            @Override
+            public OperationStats apply(final TenantOperationInfo input) {
+                final PreScenarioOperation operation = (PreScenarioOperation) input.getOperation();
+                final String tenantId = input.getTenantId();
+                final String tenantGroup = input.getTenantGroupId();
+                final String opGroup = input.getOperationGroupId();
+                final String tableName = input.getTableName();
+                final String scenarioName = input.getScenarioName();
+                final String opName = String.format("%s:%s:%s:%s:%s",
+                        scenarioName, tableName, opGroup, tenantGroup, tenantId);
+
+                long startTime = EnvironmentEdgeManager.currentTimeMillis();
+                int status = 0;
+                if (!operation.getPreScenarioDdls().isEmpty()) {
+                    try (Connection conn = phoenixUtil.getConnection(tenantId)) {
+                        for (Ddl ddl : operation.getPreScenarioDdls()) {
+                            LOGGER.info("\nExecuting DDL:" + ddl + " on tenantId:" + tenantId);
+                            phoenixUtil.executeStatement(ddl.toString(), conn);
+                            if (ddl.getStatement().toUpperCase().contains(phoenixUtil.ASYNC_KEYWORD)) {
+                                phoenixUtil.waitForAsyncIndexToFinish(ddl.getTableName());
+                            }
+                        }
+                    } catch (SQLException sqle) {
+                        LOGGER.error("Operation " + opName + " failed with exception ", sqle);
+                        status = -1;
+                    } catch (Exception e) {
+                        LOGGER.error("Operation " + opName + " failed with exception ", e);
+                        status = -1;
+                    }
+                }
+                long totalDuration = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+                return new OperationStats(input, startTime, status, operation.getPreScenarioDdls().size(), totalDuration);
+            }
+        };
+    }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/QueryOperationSupplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/QueryOperationSupplier.java
new file mode 100644
index 0000000..88eec68
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/QueryOperationSupplier.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Query;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.apache.phoenix.pherf.workload.mt.QueryOperation;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+
+/**
+ * A supplier of {@link Function} that takes {@link QueryOperation} as an input.
+ */
+public class QueryOperationSupplier extends BaseOperationSupplier {
+    private static final Logger LOGGER = LoggerFactory.getLogger(QueryOperationSupplier.class);
+
+    public QueryOperationSupplier(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario) {
+        super(phoenixUtil, model, scenario);
+    }
+
+    @Override
+    public Function<TenantOperationInfo, OperationStats> get() {
+        return new Function<TenantOperationInfo, OperationStats>() {
+
+            @Override
+            public OperationStats apply(final TenantOperationInfo input) {
+
+                final QueryOperation operation = (QueryOperation) input.getOperation();
+                final String tenantGroup = input.getTenantGroupId();
+                final String opGroup = input.getOperationGroupId();
+                final String tenantId = input.getTenantId();
+                final String scenarioName = input.getScenarioName();
+                final String tableName = input.getTableName();
+                final Query query = operation.getQuery();
+
+                String opName = String.format("%s:%s:%s:%s:%s", scenarioName, tableName,
+                        opGroup, tenantGroup, tenantId);
+                LOGGER.info("\nExecuting query " + query.getStatement());
+
+                long startTime = 0;
+                int status = 0;
+                Long resultRowCount = 0L;
+                Long queryElapsedTime = 0L;
+                try (Connection connection = phoenixUtil.getConnection(tenantId)) {
+                    startTime = EnvironmentEdgeManager.currentTimeMillis();
+
+                    // TODO handle dynamic statements
+                    try (PreparedStatement statement = connection.prepareStatement(query.getStatement())) {
+                        try (ResultSet rs = statement.executeQuery()) {
+                            boolean isSelectCountStatement = query.getStatement().toUpperCase().trim().contains("COUNT(") ? true : false;
+                            Pair<Long, Long> r = phoenixUtil.getResults(query, rs, opName,
+                                    isSelectCountStatement, startTime);
+                            resultRowCount = r.getFirst();
+                            queryElapsedTime = r.getSecond();
+                        }
+                    }
+                } catch (Exception e) {
+                    LOGGER.error("Operation " + opName + " failed with exception ", e);
+                    status = -1;
+                }
+                return new OperationStats(input, startTime, status, resultRowCount, queryElapsedTime);
+            }
+        };
+    }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationEventGenerator.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationEventGenerator.java
new file mode 100644
index 0000000..676c510
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationEventGenerator.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.commons.math3.distribution.EnumeratedDistribution;
+import org.apache.commons.math3.util.Pair;
+import org.apache.phoenix.pherf.PherfConstants;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.LoadProfile;
+import org.apache.phoenix.pherf.configuration.OperationGroup;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.configuration.TenantGroup;
+import org.apache.phoenix.pherf.workload.mt.Operation;
+import org.apache.phoenix.pherf.workload.mt.EventGenerator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+/**
+ * A perf load event generator based on the supplied load profile.
+ */
+
+public class TenantOperationEventGenerator
+        implements EventGenerator<TenantOperationInfo> {
+
+    private static class WeightedRandomSampler {
+        private final Random RANDOM = new Random();
+        private final LoadProfile loadProfile;
+        private final String modelName;
+        private final String scenarioName;
+        private final String tableName;
+        private final EnumeratedDistribution<String> distribution;
+
+        private final Map<String, TenantGroup> tenantGroupMap = Maps.newHashMap();
+        private final Map<String, Operation> operationMap = Maps.newHashMap();
+        private final Map<String, OperationGroup> operationGroupMap = Maps.newHashMap();
+
+        public WeightedRandomSampler(List<Operation> operationList, DataModel model, Scenario scenario) {
+            this.modelName = model.getName();
+            this.scenarioName = scenario.getName();
+            this.tableName = scenario.getTableName();
+            this.loadProfile = scenario.getLoadProfile();
+
+            // Track the individual tenant group sizes,
+            // so that given a generated sample we can get a random tenant for a group.
+            for (TenantGroup tg : loadProfile.getTenantDistribution()) {
+                tenantGroupMap.put(tg.getId(), tg);
+            }
+            Preconditions.checkArgument(!tenantGroupMap.isEmpty(),
+                    "Tenant group cannot be empty");
+
+            for (Operation op : operationList) {
+                for (OperationGroup og : loadProfile.getOpDistribution()) {
+                    if (op.getId().compareTo(og.getId()) == 0) {
+                        operationMap.put(op.getId(), op);
+                        operationGroupMap.put(op.getId(), og);
+                    }
+                }
+            }
+            Preconditions.checkArgument(!operationMap.isEmpty(),
+                    "Operation list and load profile operation do not match");
+
+            double totalTenantGroupWeight = 0.0f;
+            double totalOperationGroupWeight = 0.0f;
+            // Sum the weights to find the total weight,
+            // so that the weights can be used in the total probability distribution.
+            for (TenantGroup tg : loadProfile.getTenantDistribution()) {
+                totalTenantGroupWeight += tg.getWeight();
+            }
+            for (OperationGroup og : loadProfile.getOpDistribution()) {
+                totalOperationGroupWeight += og.getWeight();
+            }
+
+            Preconditions.checkArgument(totalTenantGroupWeight != 0.0f,
+                    "Total tenant group weight cannot be zero");
+            Preconditions.checkArgument(totalOperationGroupWeight != 0.0f,
+                    "Total operation group weight cannot be zero");
+
+            // Initialize the sample probability distribution
+            List<Pair<String, Double>> pmf = Lists.newArrayList();
+            double totalWeight = totalTenantGroupWeight * totalOperationGroupWeight;
+            for (TenantGroup tg : loadProfile.getTenantDistribution()) {
+                for (String opId : operationMap.keySet()) {
+                    String sampleName = String.format("%s:%s", tg.getId(), opId);
+                    int opWeight = operationGroupMap.get(opId).getWeight();
+                    double probability = (tg.getWeight() * opWeight)/totalWeight;
+                    pmf.add(new Pair(sampleName, probability));
+                }
+            }
+            this.distribution = new EnumeratedDistribution(pmf);
+        }
+
+        public TenantOperationInfo nextSample() {
+            String sampleIndex = this.distribution.sample();
+            String[] parts = sampleIndex.split(":");
+            String tenantGroupId = parts[0];
+            String opId = parts[1];
+
+            Operation op = operationMap.get(opId);
+            int numTenants = tenantGroupMap.get(tenantGroupId).getNumTenants();
+            String tenantIdPrefix = Strings.padStart(tenantGroupId, loadProfile.getGroupIdLength(), '0');
+            String formattedTenantId = String.format(loadProfile.getTenantIdFormat(),
+                    tenantIdPrefix.substring(0, loadProfile.getGroupIdLength()), RANDOM.nextInt(numTenants));
+            String paddedTenantId = Strings.padStart(formattedTenantId, loadProfile.getTenantIdLength(), '0');
+            String tenantId = paddedTenantId.substring(0, loadProfile.getTenantIdLength());
+
+            TenantOperationInfo sample = new TenantOperationInfo(modelName, scenarioName, tableName,
+                    tenantGroupId, opId, tenantId, op);
+            return sample;
+        }
+    }
+
+    private final WeightedRandomSampler sampler;
+    private final Properties properties;
+
+    public TenantOperationEventGenerator(List<Operation> ops, DataModel model, Scenario scenario)
+            throws Exception {
+        this(ops, model, scenario,
+                PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES, true));
+    }
+
+    public TenantOperationEventGenerator(List<Operation> ops, DataModel model, Scenario scenario,
+            Properties properties) {
+        this.properties = properties;
+        this.sampler = new WeightedRandomSampler(ops, model, scenario);
+    }
+
+    @Override public TenantOperationInfo next() {
+        return this.sampler.nextSample();
+    }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationFactory.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationFactory.java
new file mode 100644
index 0000000..365984f
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationFactory.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Charsets;
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.thirdparty.com.google.common.base.Supplier;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.hash.BloomFilter;
+import org.apache.phoenix.thirdparty.com.google.common.hash.Funnel;
+import org.apache.phoenix.thirdparty.com.google.common.hash.PrimitiveSink;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Ddl;
+import org.apache.phoenix.pherf.configuration.IdleTime;
+import org.apache.phoenix.pherf.configuration.LoadProfile;
+import org.apache.phoenix.pherf.configuration.Query;
+import org.apache.phoenix.pherf.configuration.QuerySet;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.configuration.TenantGroup;
+import org.apache.phoenix.pherf.configuration.Upsert;
+import org.apache.phoenix.pherf.configuration.UserDefined;
+import org.apache.phoenix.pherf.configuration.XMLConfigParser;
+import org.apache.phoenix.pherf.rules.RulesApplier;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.mt.EventGenerator;
+import org.apache.phoenix.pherf.workload.mt.IdleTimeOperation;
+import org.apache.phoenix.pherf.workload.mt.Operation;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Factory class for operation suppliers.
+ * The class is responsible for creating new instances of suppliers {@link Supplier}
+ * for operations {@link Operation}
+ *
+ * Operations that need to be executed for a given {@link Scenario} and {@link DataModel}
+ * are generated by {@link EventGenerator}
+ *
+ * These operation events are then published on to the {@link com.lmax.disruptor.RingBuffer}
+ * by the {@link TenantOperationWorkload} workload generator and
+ * handled by the {@link com.lmax.disruptor.WorkHandler} for eg {@link TenantOperationWorkHandler}
+ */
+public class TenantOperationFactory {
+
+    private static class TenantView {
+        private final String tenantId;
+        private final String viewName;
+
+        public TenantView(String tenantId, String viewName) {
+            this.tenantId = tenantId;
+            this.viewName = viewName;
+        }
+
+        public String getTenantId() {
+            return tenantId;
+        }
+
+        public String getViewName() {
+            return viewName;
+        }
+    }
+    private static final Logger LOGGER = LoggerFactory.getLogger(TenantOperationFactory.class);
+    private final PhoenixUtil phoenixUtil;
+    private final DataModel model;
+    private final Scenario scenario;
+    private final XMLConfigParser parser;
+
+    private final RulesApplier rulesApplier;
+    private final LoadProfile loadProfile;
+    private final List<Operation> operationList = Lists.newArrayList();
+    private final Map<Operation.OperationType, Supplier<Function<TenantOperationInfo, OperationStats>>> operationSuppliers =
+            Maps.newEnumMap(Operation.OperationType.class);
+
+    private final BloomFilter<TenantView> tenantsLoaded;
+
+    public TenantOperationFactory(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario) {
+        this.phoenixUtil = phoenixUtil;
+        this.model = model;
+        this.scenario = scenario;
+        this.parser = null;
+        this.rulesApplier = new RulesApplier(model);
+        this.loadProfile = this.scenario.getLoadProfile();
+        this.tenantsLoaded = createTenantsLoadedFilter(loadProfile);
+
+        // Read the scenario definition and load the various operations.
+        // Case : Operation.OperationType.PRE_RUN
+        if (scenario.getPreScenarioDdls() != null && scenario.getPreScenarioDdls().size() > 0) {
+            operationSuppliers.put(Operation.OperationType.PRE_RUN,
+                    new PreScenarioOperationSupplier(phoenixUtil, model, scenario));
+        }
+
+        // Case : Operation.OperationType.UPSERT
+        List<Operation> upsertOperations = getUpsertOperationsForScenario(scenario);
+        if (upsertOperations.size() > 0) {
+            operationList.addAll(upsertOperations);
+            operationSuppliers.put(Operation.OperationType.UPSERT,
+                    new UpsertOperationSupplier(phoenixUtil, model, scenario));
+        }
+
+        // Case : Operation.OperationType.SELECT
+        List<Operation> queryOperations = getQueryOperationsForScenario(scenario);
+        if (queryOperations.size() > 0) {
+            operationList.addAll(queryOperations);
+            operationSuppliers.put(Operation.OperationType.SELECT,
+                    new QueryOperationSupplier(phoenixUtil, model, scenario));
+        }
+
+        // Case : Operation.OperationType.IDLE_TIME
+        List<Operation> idleOperations = getIdleTimeOperationsForScenario(scenario);
+        if (idleOperations.size() > 0) {
+            operationList.addAll(idleOperations);
+            operationSuppliers.put(Operation.OperationType.IDLE_TIME,
+                    new IdleTimeOperationSupplier(phoenixUtil, model, scenario));
+        }
+
+        // Case : Operation.OperationType.USER_DEFINED
+        List<Operation> udfOperations = getUDFOperationsForScenario(scenario);
+        if (udfOperations.size() > 0) {
+            operationList.addAll(udfOperations);
+            operationSuppliers.put(Operation.OperationType.USER_DEFINED,
+                    new UserDefinedOperationSupplier(phoenixUtil, model, scenario));
+        }
+    }
+
+    private BloomFilter createTenantsLoadedFilter(LoadProfile loadProfile) {
+        Funnel<TenantView> tenantViewFunnel = new Funnel<TenantView>() {
+            @Override
+            public void funnel(TenantView tenantView, PrimitiveSink into) {
+                into.putString(tenantView.getTenantId(), Charsets.UTF_8)
+                        .putString(tenantView.getViewName(), Charsets.UTF_8);
+            }
+        };
+
+        int numTenants = 0;
+        for (TenantGroup tg : loadProfile.getTenantDistribution()) {
+            numTenants += tg.getNumTenants();
+        }
+
+        // This holds the info whether the tenant view was created (initialized) or not.
+        return BloomFilter.create(tenantViewFunnel, numTenants, 0.01);
+    }
+
+    private List<Operation> getUpsertOperationsForScenario(Scenario scenario) {
+        List<Operation> opList = Lists.newArrayList();
+        for (final Upsert upsert : scenario.getUpserts()) {
+            Operation upsertOp = new org.apache.phoenix.pherf.workload.mt.UpsertOperation() {
+                @Override public Upsert getUpsert() {
+                    return upsert;
+                }
+
+                @Override public String getId() {
+                    return upsert.getId();
+                }
+
+                @Override public OperationType getType() {
+                    return OperationType.UPSERT;
+                }
+            };
+            opList.add(upsertOp);
+        }
+        return opList;
+    }
+
+    private List<Operation> getQueryOperationsForScenario(Scenario scenario) {
+        List<Operation> opList = Lists.newArrayList();
+        for (final QuerySet querySet : scenario.getQuerySet()) {
+            for (final Query query : querySet.getQuery()) {
+                Operation queryOp = new org.apache.phoenix.pherf.workload.mt.QueryOperation() {
+                    @Override public Query getQuery() {
+                        return query;
+                    }
+
+                    @Override public String getId() {
+                        return query.getId();
+                    }
+
+                    @Override public OperationType getType() {
+                        return OperationType.SELECT;
+                    }
+                };
+                opList.add(queryOp);
+            }
+        }
+        return opList;
+    }
+
+    private List<Operation> getIdleTimeOperationsForScenario(Scenario scenario) {
+        List<Operation> opList = Lists.newArrayList();
+        for (final IdleTime idleTime : scenario.getIdleTimes()) {
+            Operation idleTimeOperation = new IdleTimeOperation() {
+                @Override public IdleTime getIdleTime() {
+                    return idleTime;
+                }
+                @Override public String getId() {
+                    return idleTime.getId();
+                }
+
+                @Override public OperationType getType() {
+                    return OperationType.IDLE_TIME;
+                }
+            };
+            opList.add(idleTimeOperation);
+        }
+        return opList;
+    }
+
+    private List<Operation> getUDFOperationsForScenario(Scenario scenario) {
+        List<Operation> opList = Lists.newArrayList();
+        for (final UserDefined udf : scenario.getUdfs()) {
+            Operation udfOperation = new org.apache.phoenix.pherf.workload.mt.UserDefinedOperation() {
+                @Override public UserDefined getUserFunction() {
+                    return udf;
+                }
+
+                @Override public String getId() {
+                    return udf.getId();
+                }
+
+                @Override public OperationType getType() {
+                    return OperationType.USER_DEFINED;
+                }
+            };
+            opList.add(udfOperation);
+        }
+        return opList;
+    }
+
+    public PhoenixUtil getPhoenixUtil() {
+        return phoenixUtil;
+    }
+
+    public DataModel getModel() {
+        return model;
+    }
+
+    public Scenario getScenario() {
+        return scenario;
+    }
+
+    public List<Operation> getOperations() {
+        return operationList;
+    }
+
+    public Supplier<Function<TenantOperationInfo, OperationStats>> getOperationSupplier(
+            final TenantOperationInfo input) {
+        TenantView tenantView = new TenantView(input.getTenantId(), scenario.getTableName());
+
+        // Check if pre run ddls are needed.
+        if (!tenantsLoaded.mightContain(tenantView)) {
+
+            Supplier<Function<TenantOperationInfo, OperationStats>> preRunOpSupplier =
+                    operationSuppliers.get(Operation.OperationType.PRE_RUN);
+            // Check if the scenario has a PRE_RUN operation.
+            if (preRunOpSupplier != null) {
+                // Initialize the tenant using the pre scenario ddls.
+                final org.apache.phoenix.pherf.workload.mt.PreScenarioOperation
+                        operation = new org.apache.phoenix.pherf.workload.mt.PreScenarioOperation() {
+                    @Override public List<Ddl> getPreScenarioDdls() {
+                        List<Ddl> ddls = scenario.getPreScenarioDdls();
+                        return ddls == null ? Lists.<Ddl>newArrayList() : ddls;
+                    }
+
+                    @Override public String getId() {
+                        return OperationType.PRE_RUN.name();
+                    }
+
+                    @Override public OperationType getType() {
+                        return OperationType.PRE_RUN;
+                    }
+                };
+                // Initialize with the pre run operation.
+                TenantOperationInfo preRunSample = new TenantOperationInfo(
+                        input.getModelName(),
+                        input.getScenarioName(),
+                        input.getTableName(),
+                        input.getTenantGroupId(),
+                        Operation.OperationType.PRE_RUN.name(),
+                        input.getTenantId(), operation);
+
+                try {
+                    // Run the initialization operation.
+                    OperationStats stats = preRunOpSupplier.get().apply(preRunSample);
+                    LOGGER.info(phoenixUtil.getGSON().toJson(stats));
+                } catch (Exception e) {
+                    LOGGER.error(String.format("Failed to initialize tenant. [%s, %s] ",
+                            tenantView.tenantId,
+                            tenantView.viewName), e);
+                }
+            }
+
+            tenantsLoaded.put(tenantView);
+        }
+
+        Supplier<Function<TenantOperationInfo, OperationStats>> opSupplier =
+                operationSuppliers.get(input.getOperation().getType());
+        if (opSupplier == null) {
+            throw new IllegalArgumentException("Unknown operation type");
+        }
+        return opSupplier;
+    }
+
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationInfo.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationInfo.java
new file mode 100644
index 0000000..2481dd1
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationInfo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.pherf.workload.mt.Operation;
+
+/**
+ * Holds information on the tenant operation details.
+ */
+public class TenantOperationInfo {
+    private final String modelName;
+    private final String scenarioName;
+    private final String tableName;
+    private final String tenantId;
+    private final String tenantGroupId;
+    private final String operationGroupId;
+    private final Operation operation;
+
+    public TenantOperationInfo(String modelName, String scenarioName, String tableName,
+            String tenantGroupId, String operationGroupId,
+            String tenantId, Operation operation) {
+        this.modelName = modelName;
+        this.scenarioName = scenarioName;
+        this.tableName = tableName;
+        this.tenantGroupId = tenantGroupId;
+        this.operationGroupId = operationGroupId;
+        this.tenantId = tenantId;
+        this.operation = operation;
+    }
+
+    public String getModelName() { return modelName; }
+
+    public String getScenarioName() { return scenarioName; }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public String getTenantGroupId() {
+        return tenantGroupId;
+    }
+
+    public String getOperationGroupId() {
+        return operationGroupId;
+    }
+
+    public Operation getOperation() {
+        return operation;
+    }
+
+    public String getTenantId() {
+        return tenantId;
+    }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkHandler.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkHandler.java
new file mode 100644
index 0000000..0ae4273
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkHandler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.thirdparty.com.google.common.base.Supplier;
+import com.lmax.disruptor.LifecycleAware;
+import com.lmax.disruptor.WorkHandler;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.apache.phoenix.pherf.workload.mt.tenantoperation.TenantOperationWorkload.TenantOperationEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A handler {@link WorkHandler} for
+ * executing the operations {@link org.apache.phoenix.pherf.workload.mt.Operation}
+ * as and when they become available on the {@link com.lmax.disruptor.RingBuffer}
+ * when published by the workload generator {@link TenantOperationWorkload}
+ */
+
+public class TenantOperationWorkHandler implements WorkHandler<TenantOperationEvent>,
+        LifecycleAware {
+    private static final Logger LOGGER = LoggerFactory.getLogger(TenantOperationWorkHandler.class);
+    private final String handlerId;
+    private final TenantOperationFactory operationFactory;
+
+
+    public TenantOperationWorkHandler(TenantOperationFactory operationFactory,
+            String handlerId) {
+        this.handlerId = handlerId;
+        this.operationFactory = operationFactory;
+    }
+
+    @Override
+    public void onEvent(TenantOperationEvent event)
+            throws Exception {
+        TenantOperationInfo input = event.getTenantOperationInfo();
+        Supplier<Function<TenantOperationInfo, OperationStats>> opSupplier =
+                operationFactory.getOperationSupplier(input);
+        OperationStats stats = opSupplier.get().apply(input);
+        stats.setHandlerId(handlerId);
+        LOGGER.info(operationFactory.getPhoenixUtil().getGSON().toJson(stats));
+    }
+
+    @Override
+    public void onStart() {
+        Scenario scenario = operationFactory.getScenario();
+        LOGGER.info(String.format("TenantOperationWorkHandler started for %s:%s",
+                scenario.getName(), scenario.getTableName()));
+    }
+
+    @Override
+    public void onShutdown() {
+        Scenario scenario = operationFactory.getScenario();
+        LOGGER.info(String.format("TenantOperationWorkHandler stopped for %s:%s",
+                scenario.getName(), scenario.getTableName()));
+    }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkload.java
new file mode 100644
index 0000000..be255d3
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkload.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.WorkHandler;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.Workload;
+import org.apache.phoenix.pherf.workload.mt.EventGenerator;
+import org.apache.phoenix.pherf.workload.mt.MultiTenantWorkload;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+
+/**
+ * This class creates workload for tenant based load profiles.
+ * It uses @see {@link TenantOperationFactory} in conjunction with
+ * @see {@link TenantOperationEventGenerator} to generate the load events.
+ * It then publishes these events onto a RingBuffer based queue.
+ * The @see {@link TenantOperationWorkHandler} drains the events from the queue and executes them.
+ * Reference for RingBuffer based queue http://lmax-exchange.github.io/disruptor/
+ */
+
+public class TenantOperationWorkload implements MultiTenantWorkload, Workload {
+    private static final Logger LOGGER = LoggerFactory.getLogger(TenantOperationWorkload.class);
+    private static final int DEFAULT_NUM_HANDLER_PER_MODEL = 4;
+    private static final int DEFAULT_BUFFER_SIZE = 8192;
+
+    private static class ContinuousWorkloadExceptionHandler implements ExceptionHandler {
+        @Override public void handleEventException(Throwable ex, long sequence, Object event) {
+            LOGGER.error("Sequence=" + sequence + ", event=" + event, ex);
+            throw new RuntimeException(ex);
+        }
+
+        @Override public void handleOnStartException(Throwable ex) {
+            LOGGER.error("On Start", ex);
+            throw new RuntimeException(ex);
+        }
+
+        @Override public void handleOnShutdownException(Throwable ex) {
+            LOGGER.error("On Shutdown", ex);
+            throw new RuntimeException(ex);
+        }
+    }
+
+    public static class TenantOperationEvent {
+        TenantOperationInfo tenantOperationInfo;
+
+        public TenantOperationInfo getTenantOperationInfo() {
+            return tenantOperationInfo;
+        }
+
+        public void setTenantOperationInfo(TenantOperationInfo tenantOperationInfo) {
+            this.tenantOperationInfo = tenantOperationInfo;
+        }
+
+        public static final EventFactory<TenantOperationEvent> EVENT_FACTORY = new EventFactory<TenantOperationEvent>() {
+            public TenantOperationEvent newInstance() {
+                return new TenantOperationEvent();
+            }
+        };
+    }
+
+    private Disruptor<TenantOperationEvent> disruptor;
+    private final Properties properties;
+    private final TenantOperationFactory operationFactory;
+    private final EventGenerator<TenantOperationInfo> generator;
+    private final List<WorkHandler> handlers;
+    private final ExceptionHandler exceptionHandler;
+
+    public TenantOperationWorkload(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario,
+            List<WorkHandler> workers, Properties properties) throws Exception {
+        this(phoenixUtil, model, scenario, workers, new ContinuousWorkloadExceptionHandler(), properties);
+    }
+
+    public TenantOperationWorkload(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario,
+            Properties properties) throws Exception {
+
+        operationFactory = new TenantOperationFactory(phoenixUtil, model, scenario);
+        this.properties = properties;
+        this.handlers = Lists.newArrayListWithCapacity(DEFAULT_NUM_HANDLER_PER_MODEL);
+        for (int i = 0; i < DEFAULT_NUM_HANDLER_PER_MODEL; i++) {
+            String handlerId = String.format("%s.%d", InetAddress.getLocalHost().getHostName(), i+1);
+            handlers.add(new TenantOperationWorkHandler(
+                    operationFactory,
+                    handlerId));
+        }
+        this.generator = new TenantOperationEventGenerator(operationFactory.getOperations(),
+                model, scenario);
+        this.exceptionHandler = new ContinuousWorkloadExceptionHandler();
+    }
+
+    public TenantOperationWorkload(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario,
+            List<WorkHandler> workers,
+            ExceptionHandler exceptionHandler,
+            Properties properties) throws Exception {
+
+        operationFactory = new TenantOperationFactory(phoenixUtil, model, scenario);
+        this.properties = properties;
+        this.generator = new TenantOperationEventGenerator(operationFactory.getOperations(),
+                model, scenario);
+        this.handlers = workers;
+        this.exceptionHandler = exceptionHandler;
+    }
+
+
+    @Override public void start() {
+
+        Scenario scenario = operationFactory.getScenario();
+        String currentThreadName = Thread.currentThread().getName();
+        disruptor = new Disruptor<TenantOperationEvent>(TenantOperationEvent.EVENT_FACTORY, DEFAULT_BUFFER_SIZE,
+                Threads.getNamedThreadFactory(currentThreadName + "." + scenario.getName() ),
+                ProducerType.SINGLE, new BlockingWaitStrategy());
+
+        this.disruptor.setDefaultExceptionHandler(this.exceptionHandler);
+        this.disruptor.handleEventsWithWorkerPool(this.handlers.toArray(new WorkHandler[] {}));
+        RingBuffer<TenantOperationEvent> ringBuffer = this.disruptor.start();
+        long numOperations = scenario.getLoadProfile().getNumOperations();
+        while (numOperations > 0) {
+            TenantOperationInfo sample = generator.next();
+            --numOperations;
+            // Publishers claim events in sequence
+            long sequence = ringBuffer.next();
+            TenantOperationEvent event = ringBuffer.get(sequence);
+            event.setTenantOperationInfo(sample);
+            // make the event available to EventProcessors
+            ringBuffer.publish(sequence);
+            LOGGER.debug(String.format("published : %s:%s:%d",
+                    scenario.getName(), scenario.getTableName(), numOperations));
+        }
+    }
+
+    @Override public void stop() {
+        this.disruptor.shutdown();
+    }
+
+    @Override public PhoenixUtil getPhoenixUtil() { return operationFactory.getPhoenixUtil(); }
+
+    @Override public Scenario getScenario() {
+        return operationFactory.getScenario();
+    }
+
+    @Override public DataModel getModel() {
+        return operationFactory.getModel();
+    }
+
+    @Override public Properties getProperties() {
+        return this.properties;
+    }
+
+    @Override public Callable<Void> execute() throws Exception {
+        return new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                start();
+                return null;
+            }
+        };
+    }
+
+    @Override public void complete() {
+        stop();
+    }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/UpsertOperationSupplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/UpsertOperationSupplier.java
new file mode 100644
index 0000000..5b25c12
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/UpsertOperationSupplier.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.pherf.configuration.Column;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.configuration.Upsert;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.apache.phoenix.pherf.workload.mt.UpsertOperation;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.List;
+
+/**
+ * A supplier of {@link Function} that takes {@link UpsertOperation} as an input
+ */
+class UpsertOperationSupplier extends BaseOperationSupplier {
+    private static final Logger LOGGER = LoggerFactory.getLogger(UpsertOperationSupplier.class);
+
+    public UpsertOperationSupplier(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario) {
+        super(phoenixUtil, model, scenario);
+    }
+
+    @Override
+    public Function<TenantOperationInfo, OperationStats> get() {
+        return new Function<TenantOperationInfo, OperationStats>() {
+
+            @Override
+            public OperationStats apply(final TenantOperationInfo input) {
+
+                final int batchSize = loadProfile.getBatchSize();
+                final boolean useBatchApi = batchSize != 0;
+                final int rowCount = useBatchApi ? batchSize : 1;
+
+                final UpsertOperation operation = (UpsertOperation) input.getOperation();
+                final String tenantGroup = input.getTenantGroupId();
+                final String opGroup = input.getOperationGroupId();
+                final String tenantId = input.getTenantId();
+                final Upsert upsert = operation.getUpsert();
+                final String tableName = input.getTableName();
+                final String scenarioName = input.getScenarioName();
+                final List<Column> columns = upsert.getColumn();
+
+                final String opName = String.format("%s:%s:%s:%s:%s",
+                        scenarioName, tableName, opGroup, tenantGroup, tenantId);
+
+                long rowsCreated = 0;
+                long startTime = 0, duration, totalDuration;
+                int status = 0;
+                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+                try (Connection connection = phoenixUtil.getConnection(tenantId)) {
+                    String sql = phoenixUtil.buildSql(columns, tableName);
+                    startTime = EnvironmentEdgeManager.currentTimeMillis();
+                    PreparedStatement stmt = null;
+                    try {
+                        stmt = connection.prepareStatement(sql);
+                        for (long i = rowCount; i > 0; i--) {
+                            LOGGER.debug("Operation " + opName + " executing ");
+                            stmt = phoenixUtil.buildStatement(rulesApplier, scenario, columns, stmt, simpleDateFormat);
+                            if (useBatchApi) {
+                                stmt.addBatch();
+                            } else {
+                                rowsCreated += stmt.executeUpdate();
+                            }
+                        }
+                    } catch (SQLException e) {
+                        throw e;
+                    } finally {
+                        // Need to keep the statement open to send the remaining batch of updates
+                        if (!useBatchApi && stmt != null) {
+                            stmt.close();
+                        }
+                        if (connection != null) {
+                            if (useBatchApi && stmt != null) {
+                                int[] results = stmt.executeBatch();
+                                for (int x = 0; x < results.length; x++) {
+                                    int result = results[x];
+                                    if (result < 1) {
+                                        final String msg =
+                                                "Failed to write update in batch (update count="
+                                                        + result + ")";
+                                        throw new RuntimeException(msg);
+                                    }
+                                    rowsCreated += result;
+                                }
+                                // Close the statement after our last batch execution.
+                                stmt.close();
+                            }
+
+                            try {
+                                connection.commit();
+                                duration = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+                                LOGGER.info("Writer ( " + Thread.currentThread().getName()
+                                        + ") committed Final Batch. Duration (" + duration + ") Ms");
+                                connection.close();
+                            } catch (SQLException e) {
+                                // Swallow since we are closing anyway
+                                LOGGER.error("Error when closing/committing", e);
+                            }
+                        }
+                    }
+                } catch (SQLException sqle) {
+                    LOGGER.error("Operation " + opName + " failed with exception ", sqle);
+                    status = -1;
+                } catch (Exception e) {
+                    LOGGER.error("Operation " + opName + " failed with exception ", e);
+                    status = -1;
+                }
+
+                totalDuration = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+                return new OperationStats(input, startTime, status, rowsCreated, totalDuration);
+            }
+        };
+    }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/UserDefinedOperationSupplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/UserDefinedOperationSupplier.java
new file mode 100644
index 0000000..ae8ce6f
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/UserDefinedOperationSupplier.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.apache.phoenix.pherf.workload.mt.UserDefinedOperation;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+/**
+ * A supplier of {@link Function} that takes {@link UserDefinedOperation} as an input
+ */
+class UserDefinedOperationSupplier extends BaseOperationSupplier {
+
+    public UserDefinedOperationSupplier(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario) {
+        super(phoenixUtil, model, scenario);
+    }
+
+    @Override
+    public Function<TenantOperationInfo, OperationStats> get() {
+        return new Function<TenantOperationInfo, OperationStats>() {
+            @Override
+            public OperationStats apply(final TenantOperationInfo input) {
+                // TODO : implement user defined operation invocation.
+                long startTime = EnvironmentEdgeManager.currentTimeMillis();
+                long duration = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+                return new OperationStats(input, startTime,0, 0, duration);
+            }
+        };
+    }
+}
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
index 343285f..929b7fa 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
@@ -24,7 +24,9 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Set;
 
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
 import org.apache.phoenix.pherf.configuration.*;
 import org.apache.phoenix.pherf.rules.DataValue;
 import org.junit.Test;
@@ -43,7 +45,8 @@ public class ConfigurationParserTest extends ResultBaseTest {
     @Test
     public void testReadWriteWorkloadReader() throws Exception {
         String scenarioName = "testScenarioRW";
-        List<Scenario> scenarioList = getScenarios();
+        String testResourceName = "/scenario/test_scenario.xml";
+        List<Scenario> scenarioList = getScenarios(testResourceName);
         Scenario target = null;
         for (Scenario scenario : scenarioList) {
             if (scenarioName.equals(scenario.getName())) {
@@ -64,10 +67,10 @@ public class ConfigurationParserTest extends ResultBaseTest {
     // TODO Break this into multiple smaller tests.
     public void testConfigReader() {
         try {
-
+            String testResourceName = "/scenario/test_scenario.xml";
             LOGGER.debug("DataModel: " + writeXML());
-            List<Scenario> scenarioList = getScenarios();
-            List<Column> dataMappingColumns = getDataModel().getDataMappingColumns();
+            List<Scenario> scenarioList = getScenarios(testResourceName);
+            List<Column> dataMappingColumns = getDataModel(testResourceName).getDataMappingColumns();
             assertTrue("Could not load the data columns from xml.",
                     (dataMappingColumns != null) && (dataMappingColumns.size() > 0));
             assertTrue("Could not load the data DataValue list from xml.",
@@ -122,22 +125,61 @@ public class ConfigurationParserTest extends ResultBaseTest {
         }
     }
 
-    private URL getResourceUrl() {
-        URL resourceUrl = getClass().getResource("/scenario/test_scenario.xml");
+    @Test
+    public void testWorkloadWithLoadProfile() throws Exception {
+        String testResourceName = "/scenario/test_workload_with_load_profile.xml";
+        Set<String> scenarioNames = Sets.newHashSet("scenario_11", "scenario_12");
+        List<Scenario> scenarioList = getScenarios(testResourceName);
+        Scenario target = null;
+        for (Scenario scenario : scenarioList) {
+            if (scenarioNames.contains(scenario.getName())) {
+                target = scenario;
+            }
+            assertNotNull("Could not find scenario: " + scenario.getName(), target);
+        }
+
+        Scenario testScenarioWithLoadProfile = scenarioList.get(0);
+        LoadProfile loadProfile = testScenarioWithLoadProfile.getLoadProfile();
+        assertEquals("batch size not as expected: ",
+                1, loadProfile.getBatchSize());
+        assertEquals("num operations not as expected: ",
+                1000, loadProfile.getNumOperations());
+        assertEquals("tenant group size is not as expected: ",
+                3, loadProfile.getTenantDistribution().size());
+        assertEquals("operation group size is not as expected: ",
+                5,loadProfile.getOpDistribution().size());
+        assertEquals("UDFs size is not as expected  ",
+                1, testScenarioWithLoadProfile.getUdfs().size());
+        assertNotNull("UDFs clazzName cannot be null ",
+                testScenarioWithLoadProfile.getUdfs().get(0).getClazzName());
+        assertEquals("UDFs args size is not as expected  ",
+                2, testScenarioWithLoadProfile.getUdfs().get(0).getArgs().size());
+        assertEquals("UpsertSet size is not as expected ",
+                1, testScenarioWithLoadProfile.getUpserts().size());
+        assertEquals("#Column within the first upsert is not as expected ",
+                7, testScenarioWithLoadProfile.getUpserts().get(0).getColumn().size());
+        assertEquals("QuerySet size is not as expected ",
+                1, testScenarioWithLoadProfile.getQuerySet().size());
+        assertEquals("#Queries within the first querySet is not as expected ",
+                2, testScenarioWithLoadProfile.getQuerySet().get(0).getQuery().size());
+    }
+
+    private URL getResourceUrl(String resourceName) {
+        URL resourceUrl = getClass().getResource(resourceName);
         assertNotNull("Test data XML file is missing", resourceUrl);
         return resourceUrl;
     }
 
-    private List<Scenario> getScenarios() throws Exception {
-        DataModel data = getDataModel();
+    private List<Scenario> getScenarios(String resourceName) throws Exception {
+        DataModel data = getDataModel(resourceName);
         List<Scenario> scenarioList = data.getScenarios();
         assertTrue("Could not load the scenarios from xml.",
                 (scenarioList != null) && (scenarioList.size() > 0));
         return scenarioList;
     }
 
-    private DataModel getDataModel() throws Exception {
-        Path resourcePath = Paths.get(getResourceUrl().toURI());
+    private DataModel getDataModel(String resourceName) throws Exception {
+        Path resourcePath = Paths.get(getResourceUrl(resourceName).toURI());
         return XMLConfigParser.readDataModel(resourcePath);
     }
 
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultBaseTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultBaseTest.java
index 1853b67..531af26 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultBaseTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultBaseTest.java
@@ -18,10 +18,15 @@
 
 package org.apache.phoenix.pherf;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.phoenix.pherf.result.ResultUtil;
+import org.apache.phoenix.pherf.workload.mt.tenantoperation.TenantOperationIT;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.util.Properties;
 
 public class ResultBaseTest {
@@ -42,6 +47,10 @@ public class ResultBaseTest {
     }
     
     @AfterClass public static synchronized void tearDown() throws Exception {
-    	new ResultUtil().deleteDir(properties.getProperty("pherf.default.results.dir"));
+        try {
+            new ResultUtil().deleteDir(properties.getProperty("pherf.default.results.dir"));
+        } catch (Exception e) {
+            // swallow
+        }
     }
 }
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationEventGeneratorTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationEventGeneratorTest.java
new file mode 100644
index 0000000..46eaa80
--- /dev/null
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationEventGeneratorTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.pherf.XMLConfigParserTest;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.LoadProfile;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.configuration.XMLConfigParser;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the various event generation outcomes based on scenario, model and load profile.
+ */
+public class TenantOperationEventGeneratorTest {
+    private static final Logger LOGGER = LoggerFactory.getLogger(TenantOperationEventGeneratorTest.class);
+    private enum TestOperationGroup {
+        upsertOp, queryOp1, queryOp2, idleOp, udfOp
+    }
+
+    private enum TestTenantGroup {
+        tg1, tg2, tg3
+    }
+
+    public DataModel readTestDataModel(String resourceName) throws Exception {
+        URL scenarioUrl = XMLConfigParserTest.class.getResource(resourceName);
+        assertNotNull(scenarioUrl);
+        Path p = Paths.get(scenarioUrl.toURI());
+        return XMLConfigParser.readDataModel(p);
+    }
+
+    /**
+     * Case 1 : where some operations have zero weight
+     * Case 2 : where some tenant groups have zero weight
+     * Case 3 : where no operations and tenant groups have zero weight
+     * Case 4 : where some combinations of operation and tenant groups have zero weight
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testVariousEventGeneration() throws Exception {
+        int numRuns = 10;
+        int numOperations = 100000;
+        int allowedVariance = 1500;
+        int normalizedOperations = (numOperations * numRuns) / 10000;
+        int numTenantGroups = 3;
+        int numOpGroups = 5;
+
+        PhoenixUtil pUtil = PhoenixUtil.create();
+        DataModel model = readTestDataModel("/scenario/test_evt_gen1.xml");
+        for (Scenario scenario : model.getScenarios()) {
+            LOGGER.debug(String.format("Testing %s", scenario.getName()));
+            LoadProfile loadProfile = scenario.getLoadProfile();
+            assertEquals("tenant group size is not as expected: ",
+                    numTenantGroups, loadProfile.getTenantDistribution().size());
+            assertEquals("operation group size is not as expected: ",
+                    numOpGroups, loadProfile.getOpDistribution().size());
+            // Calculate the expected distribution.
+            int[][] expectedDistribution = new int[numOpGroups][numTenantGroups];
+            for (int r = 0; r < numOpGroups; r++) {
+                for (int c = 0; c < numTenantGroups; c++) {
+                    int tenantWeight = loadProfile.getTenantDistribution().get(c).getWeight();
+                    int opWeight = loadProfile.getOpDistribution().get(r).getWeight();
+                    expectedDistribution[r][c] = normalizedOperations * (tenantWeight * opWeight);
+                    LOGGER.debug(String.format("Expected [%d,%d] = %d", r, c, expectedDistribution[r][c]));
+                }
+            }
+            TenantOperationFactory opFactory = new TenantOperationFactory(pUtil, model, scenario);
+
+            // Calculate the actual distribution.
+            int[][] distribution = new int[numOpGroups][numTenantGroups];
+            for (int i = 0; i < numRuns; i++) {
+                int ops = numOperations;
+                loadProfile.setNumOperations(ops);
+                TenantOperationEventGenerator evtGen = new TenantOperationEventGenerator(
+                        opFactory.getOperations(), model, scenario);
+                while (ops-- > 0) {
+                    TenantOperationInfo info = evtGen.next();
+                    int row = TestOperationGroup.valueOf(info.getOperationGroupId()).ordinal();
+                    int col = TestTenantGroup.valueOf(info.getTenantGroupId()).ordinal();
+                    distribution[row][col]++;
+                }
+            }
+
+            // Validate that the expected and actual distribution
+            // is within the margin of allowed variance.
+            for (int r = 0; r < numOpGroups; r++) {
+                for (int c = 0; c < numTenantGroups; c++) {
+                    LOGGER.debug(String.format("Actual[%d,%d] = %d", r, c, distribution[r][c]));
+                    int diff = Math.abs(expectedDistribution[r][c] - distribution[r][c]);
+                    boolean isAllowed = diff < allowedVariance;
+                    assertTrue(String.format("Difference is outside the allowed variance "
+                            + "[expected = %d, actual = %d]", allowedVariance, diff), isAllowed);
+
+                }
+            }
+        }
+    }
+}
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationFactoryTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationFactoryTest.java
new file mode 100644
index 0000000..eefee4e
--- /dev/null
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationFactoryTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.pherf.XMLConfigParserTest;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.LoadProfile;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.configuration.XMLConfigParser;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Tests the various operation supplier outcomes based on scenario, model and load profile.
+ */
+public class TenantOperationFactoryTest {
+    private static final Logger LOGGER = LoggerFactory.getLogger(TenantOperationFactoryTest.class);
+
+    private enum TestOperationGroup {
+        upsertOp, queryOp1, queryOp2, idleOp, udfOp
+    }
+
+    private enum TestTenantGroup {
+        tg1, tg2, tg3
+    }
+
+    public DataModel readTestDataModel(String resourceName) throws Exception {
+        URL scenarioUrl = XMLConfigParserTest.class.getResource(resourceName);
+        assertNotNull(scenarioUrl);
+        Path p = Paths.get(scenarioUrl.toURI());
+        return XMLConfigParser.readDataModel(p);
+    }
+
+    @Test public void testVariousOperations() throws Exception {
+        int numTenantGroups = 3;
+        int numOpGroups = 5;
+        int numRuns = 10;
+        int numOperations = 10;
+
+        PhoenixUtil pUtil = PhoenixUtil.create();
+        DataModel model = readTestDataModel("/scenario/test_evt_gen1.xml");
+        for (Scenario scenario : model.getScenarios()) {
+            LOGGER.debug(String.format("Testing %s", scenario.getName()));
+            LoadProfile loadProfile = scenario.getLoadProfile();
+            assertEquals("tenant group size is not as expected: ",
+                    numTenantGroups, loadProfile.getTenantDistribution().size());
+            assertEquals("operation group size is not as expected: ",
+                    numOpGroups, loadProfile.getOpDistribution().size());
+
+            TenantOperationFactory opFactory = new TenantOperationFactory(pUtil, model, scenario);
+            assertEquals("operation group size from the factory is not as expected: ",
+                    numOpGroups, opFactory.getOperations().size());
+
+            for (int i = 0; i < numRuns; i++) {
+                int ops = numOperations;
+                loadProfile.setNumOperations(ops);
+                TenantOperationEventGenerator evtGen = new TenantOperationEventGenerator(
+                        opFactory.getOperations(), model, scenario);
+                while (ops-- > 0) {
+                    TenantOperationInfo info = evtGen.next();
+                    switch (TestOperationGroup.valueOf(info.getOperationGroupId())) {
+                    case upsertOp:
+                        assertTrue(opFactory.getOperationSupplier(info).getClass()
+                                .isAssignableFrom(UpsertOperationSupplier.class));
+                        break;
+                    case queryOp1:
+                    case queryOp2:
+                        assertTrue(opFactory.getOperationSupplier(info).getClass()
+                                .isAssignableFrom(QueryOperationSupplier.class));
+                        break;
+                    case idleOp:
+                        assertTrue(opFactory.getOperationSupplier(info).getClass()
+                                .isAssignableFrom(IdleTimeOperationSupplier.class));
+                        break;
+                    case udfOp:
+                        assertTrue(opFactory.getOperationSupplier(info).getClass()
+                                .isAssignableFrom(UserDefinedOperationSupplier.class));
+                        break;
+                    default:
+                        Assert.fail();
+
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/phoenix-pherf/src/test/resources/datamodel/test_schema.sql b/phoenix-pherf/src/test/resources/datamodel/test_schema.sql
index fa9952b..a5a7274 100644
--- a/phoenix-pherf/src/test/resources/datamodel/test_schema.sql
+++ b/phoenix-pherf/src/test/resources/datamodel/test_schema.sql
@@ -29,6 +29,7 @@ CREATE TABLE IF NOT EXISTS PHERF.TEST_TABLE (
     DIVISION INTEGER,
     OLDVAL_STRING VARCHAR,
     NEWVAL_STRING VARCHAR,
+    CONNECTION_ID VARCHAR,
     SOME_INT INTEGER
     CONSTRAINT PK PRIMARY KEY
     (
diff --git a/phoenix-pherf/src/test/resources/datamodel/test_schema.sql b/phoenix-pherf/src/test/resources/datamodel/test_schema_mt_view.sql
similarity index 63%
copy from phoenix-pherf/src/test/resources/datamodel/test_schema.sql
copy to phoenix-pherf/src/test/resources/datamodel/test_schema_mt_view.sql
index fa9952b..ad25e9b 100644
--- a/phoenix-pherf/src/test/resources/datamodel/test_schema.sql
+++ b/phoenix-pherf/src/test/resources/datamodel/test_schema_mt_view.sql
@@ -15,25 +15,13 @@
   -- See the License for the specific language governing permissions and
   -- limitations under the License.
 */
-CREATE TABLE IF NOT EXISTS PHERF.TEST_TABLE (
-    TENANT_ID CHAR(15) NOT NULL,
-    PARENT_ID CHAR(15) NOT NULL,
-    CREATED_DATE DATE NOT NULL,
-    NOW_DATE DATE,
-    TS_DATE TIMESTAMP,
-    PRESENT_DATE DATE,
-    OTHER_ID CHAR(15),
-    FIELD VARCHAR,
-    VAR_ARRAY VARCHAR ARRAY,
-    VAR_BIN VARBINARY,
-    DIVISION INTEGER,
-    OLDVAL_STRING VARCHAR,
-    NEWVAL_STRING VARCHAR,
-    SOME_INT INTEGER
+
+CREATE VIEW IF NOT EXISTS PHERF.TEST_GLOBAL_VIEW (
+    GID CHAR(15) NOT NULL,
+    FIELD1 VARCHAR,
+    OTHER_INT INTEGER
     CONSTRAINT PK PRIMARY KEY
     (
-        TENANT_ID,
-        PARENT_ID,
-        CREATED_DATE DESC
+        GID
     )
-) VERSIONS=1,MULTI_TENANT=true
+) AS SELECT * FROM PHERF.TEST_MULTI_TENANT_TABLE WHERE IDENTIFIER = 'EV1'
diff --git a/phoenix-pherf/src/test/resources/scenario/test_evt_gen1.xml b/phoenix-pherf/src/test/resources/scenario/test_evt_gen1.xml
new file mode 100644
index 0000000..c1c6f8e
--- /dev/null
+++ b/phoenix-pherf/src/test/resources/scenario/test_evt_gen1.xml
@@ -0,0 +1,184 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~   or more contributor license agreements.  See the NOTICE file
+  ~   distributed with this work for additional information
+  ~   regarding copyright ownership.  The ASF licenses this file
+  ~   to you under the Apache License, Version 2.0 (the
+  ~   "License"); you may not use this file except in compliance
+  ~   with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~   Unless required by applicable law or agreed to in writing, software
+  ~   distributed under the License is distributed on an "AS IS" BASIS,
+  ~   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~   See the License for the specific language governing permissions and
+  ~   limitations under the License.
+  -->
+
+<datamodel name="model_1">
+    <datamapping>
+        <column>
+            <!-- This column type defines what will generally happen to VARCHAR fields unless they are explicitly defined or overridden elsewhere -->
+            <type>VARCHAR</type>
+            <dataSequence>RANDOM</dataSequence>
+            <length>15</length>
+            <name>GENERAL_VARCHAR</name>
+        </column>
+    </datamapping>
+    <scenarios>
+        <scenario tableName="PHERF.EVT_GEN1" name="EVT_GEN1">
+            <loadProfile>
+                <batchSize>1</batchSize>
+                <numOperations>1000</numOperations>
+                <!-- Case 1 : where some operations have zero weight -->
+                <tenantDistribution id="tg1" weight="10" numTenants="10"></tenantDistribution>
+                <tenantDistribution id="tg2" weight="10" numTenants="10"></tenantDistribution>
+                <tenantDistribution id="tg3" weight="80" numTenants="10"></tenantDistribution>
+                <opDistribution id="upsertOp" weight="50"></opDistribution>
+                <opDistribution id="queryOp1" weight="0"></opDistribution>
+                <opDistribution id="queryOp2" weight="0"></opDistribution>
+                <opDistribution id="idleOp" weight="50"></opDistribution>
+                <opDistribution id="udfOp" weight="0"></opDistribution>
+            </loadProfile>
+            <upserts>
+                <upsert id="upsertOp">
+                    <column>
+                        <type>CHAR</type>
+                        <name>COLUMN1</name>
+                    </column>
+                </upsert>
+            </upserts>
+
+            <querySet>
+                <query id="queryOp1" statement="select count(*) from PHERF.Z11"/>
+                <query id="queryOp2" statement="select sum(SOME_INT) from PHERF.Z11"/>
+            </querySet>
+            <idleTimes>
+                <idleTime id="idleOp" idleTime="50"></idleTime>
+            </idleTimes>
+            <udfs>
+                <udf id="udfOp" >
+                    <clazzName>org.apache.phoenix.pherf.ConfigurationParserTest.TestUDF</clazzName>
+                    <args>Hello</args>
+                    <args>World</args>
+                </udf>
+            </udfs>
+        </scenario>
+        <scenario tableName="PHERF.EVT_GEN2" name="EVT_GEN2">
+            <loadProfile>
+                <batchSize>1</batchSize>
+                <numOperations>1000</numOperations>
+                <!-- Case 3 : where some tenant groups have zero weight -->
+                <tenantDistribution id="tg1" weight="0" numTenants="10"></tenantDistribution>
+                <tenantDistribution id="tg2" weight="0" numTenants="10"></tenantDistribution>
+                <tenantDistribution id="tg3" weight="100" numTenants="10"></tenantDistribution>
+                <opDistribution id="upsertOp" weight="20"></opDistribution>
+                <opDistribution id="queryOp1" weight="20"></opDistribution>
+                <opDistribution id="queryOp2" weight="20"></opDistribution>
+                <opDistribution id="idleOp" weight="20"></opDistribution>
+                <opDistribution id="udfOp" weight="20"></opDistribution>
+            </loadProfile>
+            <upserts>
+                <upsert id="upsertOp">
+                    <column>
+                        <type>CHAR</type>
+                        <name>COLUMN1</name>
+                    </column>
+                </upsert>
+            </upserts>
+
+            <querySet>
+                <query id="queryOp1" statement="select count(*) from PHERF.Z12"/>
+                <query id="queryOp2" statement="select sum(SOME_INT) from PHERF.Z12"/>
+            </querySet>
+            <idleTimes>
+                <idleTime id="idleOp" idleTime="50"></idleTime>
+            </idleTimes>
+            <udfs>
+                <udf id="udfOp" >
+                    <clazzName>org.apache.phoenix.pherf.ConfigurationParserTest.TestUDF</clazzName>
+                    <args>Hello</args>
+                    <args>World</args>
+                </udf>
+            </udfs>
+        </scenario>
+        <scenario tableName="PHERF.EVT_GEN3" name="EVT_GEN3">
+            <loadProfile>
+                <batchSize>1</batchSize>
+                <numOperations>1000</numOperations>
+                <!-- Case 2 : where no operations and tenant groups have zero weight -->
+                <tenantDistribution id="tg1" weight="30" numTenants="10"></tenantDistribution>
+                <tenantDistribution id="tg2" weight="30" numTenants="10"></tenantDistribution>
+                <tenantDistribution id="tg3" weight="40" numTenants="10"></tenantDistribution>
+                <opDistribution id="upsertOp" weight="20"></opDistribution>
+                <opDistribution id="queryOp1" weight="20"></opDistribution>
+                <opDistribution id="queryOp2" weight="20"></opDistribution>
+                <opDistribution id="idleOp" weight="20"></opDistribution>
+                <opDistribution id="udfOp" weight="20"></opDistribution>
+            </loadProfile>
+            <upserts>
+                <upsert id="upsertOp">
+                    <column>
+                        <type>CHAR</type>
+                        <name>COLUMN1</name>
+                    </column>
+                </upsert>
+            </upserts>
+
+            <querySet>
+                <query id="queryOp1" statement="select count(*) from PHERF.Z13"/>
+                <query id="queryOp2" statement="select sum(SOME_INT) from PHERF.Z13"/>
+            </querySet>
+            <idleTimes>
+                <idleTime id="idleOp" idleTime="50"></idleTime>
+            </idleTimes>
+            <udfs>
+                <udf id="udfOp" >
+                    <clazzName>org.apache.phoenix.pherf.ConfigurationParserTest.TestUDF</clazzName>
+                    <args>Hello</args>
+                    <args>World</args>
+                </udf>
+            </udfs>
+        </scenario>
+        <scenario tableName="PHERF.EVT_GEN4" name="EVT_GEN4">
+            <loadProfile>
+                <batchSize>1</batchSize>
+                <numOperations>1000</numOperations>
+                <!-- Case 4 : where some combinations of operation and tenant groups have zero weight -->
+                <tenantDistribution id="tg1" weight="25" numTenants="10"></tenantDistribution>
+                <tenantDistribution id="tg2" weight="0" numTenants="10"></tenantDistribution>
+                <tenantDistribution id="tg3" weight="75" numTenants="10"></tenantDistribution>
+                <opDistribution id="upsertOp" weight="40"></opDistribution>
+                <opDistribution id="queryOp1" weight="0"></opDistribution>
+                <opDistribution id="queryOp2" weight="40"></opDistribution>
+                <opDistribution id="idleOp" weight="20"></opDistribution>
+                <opDistribution id="udfOp" weight="0"></opDistribution>
+            </loadProfile>
+            <upserts>
+                <upsert id="upsertOp">
+                    <column>
+                        <type>CHAR</type>
+                        <name>COLUMN1</name>
+                    </column>
+                </upsert>
+            </upserts>
+
+            <querySet>
+                <query id="queryOp1" statement="select count(*) from PHERF.Z14"/>
+                <query id="queryOp2" statement="select sum(SOME_INT) from PHERF.Z14"/>
+            </querySet>
+            <idleTimes>
+                <idleTime id="idleOp" idleTime="50"></idleTime>
+            </idleTimes>
+            <udfs>
+                <udf id="udfOp" >
+                    <clazzName>org.apache.phoenix.pherf.ConfigurationParserTest.TestUDF</clazzName>
+                    <args>Hello</args>
+                    <args>World</args>
+                </udf>
+            </udfs>
+        </scenario>
+    </scenarios>
+</datamodel>
diff --git a/phoenix-pherf/src/test/resources/scenario/test_mt_workload.xml b/phoenix-pherf/src/test/resources/scenario/test_mt_workload.xml
new file mode 100644
index 0000000..d3b83a2
--- /dev/null
+++ b/phoenix-pherf/src/test/resources/scenario/test_mt_workload.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~   or more contributor license agreements.  See the NOTICE file
+  ~   distributed with this work for additional information
+  ~   regarding copyright ownership.  The ASF licenses this file
+  ~   to you under the Apache License, Version 2.0 (the
+  ~   "License"); you may not use this file except in compliance
+  ~   with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~   Unless required by applicable law or agreed to in writing, software
+  ~   distributed under the License is distributed on an "AS IS" BASIS,
+  ~   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~   See the License for the specific language governing permissions and
+  ~   limitations under the License.
+  -->
+
+    <datamodel name="model_1">
+    <datamapping>
+        <column>
+            <!-- This column type defines what will generally happen to VARCHAR fields unless they are explicitly defined or overridden elsewhere -->
+            <type>VARCHAR</type>
+            <dataSequence>RANDOM</dataSequence>
+            <length>15</length>
+            <name>GENERAL_VARCHAR</name>
+        </column>
+        <column>
+            <type>CHAR</type>
+            <userDefined>true</userDefined>
+            <dataSequence>RANDOM</dataSequence>
+            <length>15</length>
+            <name>GENERAL_CHAR</name>
+        </column>
+        <column>
+            <type>INTEGER</type>
+            <dataSequence>RANDOM</dataSequence>
+            <minValue>1</minValue>
+            <maxValue>50000000</maxValue>
+            <!-- Number [0-100] that represents the probability of creating a null value -->
+            <!-- The higher the number, the more like the value will returned will be null -->
+            <!-- Leaving this tag out is equivalent to having a 0 probability. i.e. never null -->
+            <nullChance>0</nullChance>
+            <name>GENERAL_INTEGER</name>
+        </column>
+        <column>
+            <type>CHAR</type>
+            <length>3</length>
+            <userDefined>true</userDefined>
+            <dataSequence>LIST</dataSequence>
+            <name>TYPE</name>
+            <valuelist>
+                <!-- Distributes according to specified values. These must total 100 -->
+                <datavalue distribution="60">
+                    <value>ABC</value>
+                </datavalue>
+                <datavalue distribution="20">
+                    <value>XYZ</value>
+                </datavalue>
+                <datavalue distribution="20">
+                    <value>LMN</value>
+                </datavalue>
+            </valuelist>
+        </column>
+    </datamapping>
+    <scenarios>
+        <scenario tableName="PHERF.EVT_1" name="EVT_1">
+            <loadProfile>
+                <batchSize>1</batchSize>
+                <numOperations>10</numOperations>
+                <!-- Case 1 : Upsert Operation test -->
+                <tenantDistribution id="tg1" weight="100" numTenants="1"></tenantDistribution>
+                <tenantDistribution id="tg2" weight="0" numTenants="0"></tenantDistribution>
+                <tenantDistribution id="tg3" weight="0" numTenants="0"></tenantDistribution>
+                <opDistribution id="upsertOp" weight="20"></opDistribution>
+                <opDistribution id="queryOp1" weight="20"></opDistribution>
+                <opDistribution id="queryOp2" weight="20"></opDistribution>
+                <opDistribution id="idleOp" weight="20"></opDistribution>
+                <opDistribution id="udfOp" weight="20"></opDistribution>
+            </loadProfile>
+            <preScenarioDdls>
+                <ddl statement="CREATE VIEW IF NOT EXISTS PHERF.EVT_1 (ZID CHAR(15), TYPE VARCHAR) AS SELECT * FROM PHERF.TEST_GLOBAL_VIEW" />
+            </preScenarioDdls>
+
+            <upserts>
+                <upsert id="upsertOp">
+                    <column>
+                        <type>CHAR</type>
+                        <name>ID</name>
+                    </column>
+                    <column>
+                        <type>INTEGER</type>
+                        <name>SOME_INT</name>
+                    </column>
+                    <column>
+                        <type>CHAR</type>
+                        <name>GID</name>
+                    </column>
+                    <column>
+                        <type>VARCHAR</type>
+                        <name>FIELD1</name>
+                    </column>
+                    <column>
+                        <type>INTEGER</type>
+                        <name>OTHER_INT</name>
+                    </column>
+                    <column>
+                        <type>CHAR</type>
+                        <name>ZID</name>
+                    </column>
+                    <column>
+                        <type>CHAR</type>
+                        <name>TYPE</name>
+                    </column>
+                </upsert>
+            </upserts>
+
+            <querySet>
+                <query id="queryOp1" statement="select count(*) from PHERF.EVT_1"/>
+                <query id="queryOp2" statement="select * from PHERF.EVT_1"/>
+            </querySet>
+            <idleTimes>
+                <idleTime id="idleOp" idleTime="50"></idleTime>
+            </idleTimes>
+            <udfs>
+                <udf id="udfOp" >
+                    <clazzName>org.apache.phoenix.pherf.ConfigurationParserTest.TestUDF</clazzName>
+                    <args>Hello</args>
+                    <args>World</args>
+                </udf>
+            </udfs>
+        </scenario>
+    </scenarios>
+</datamodel>
diff --git a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
index 8b4762e..853b857 100644
--- a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
+++ b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
@@ -262,7 +262,7 @@
                 -->
                 <threadSleepDuration>10</threadSleepDuration>
 
-                <batchSize>1000</batchSize>
+                <batchSize>1</batchSize>
             </writeParams>
             <querySet concurrency="1" executionType="PARALLEL" executionDurationInMs="10000">
                 <query id="q3" statement="select count(*) from PHERF.TEST_TABLE"/>
diff --git a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml b/phoenix-pherf/src/test/resources/scenario/test_workload_with_load_profile.xml
similarity index 60%
copy from phoenix-pherf/src/test/resources/scenario/test_scenario.xml
copy to phoenix-pherf/src/test/resources/scenario/test_workload_with_load_profile.xml
index 8b4762e..855c1fe 100644
--- a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
+++ b/phoenix-pherf/src/test/resources/scenario/test_workload_with_load_profile.xml
@@ -17,7 +17,7 @@
   ~   limitations under the License.
   -->
 
-<datamodel name="test_scenario">
+<datamodel name="model_1">
     <datamapping>
         <column>
             <!-- This column type defines what will generally happen to VARCHAR fields unless they are explicitly defined or overridden elsewhere -->
@@ -196,7 +196,7 @@
                 <datavalue distribution="20">
                     <value>LMN</value>
                 </datavalue>
-            </valuelist>            
+            </valuelist>
         </column>
         <column>
             <type>CHAR</type>
@@ -215,12 +215,12 @@
             <prefix>VBOxx00</prefix>
         </column>
         <column>
-           <type>VARCHAR</type>
-           <userDefined>true</userDefined>
-           <dataSequence>SEQUENTIAL</dataSequence>
-           <length>1</length>
-           <name>FIELD</name>
-       </column>
+            <type>VARCHAR</type>
+            <userDefined>true</userDefined>
+            <dataSequence>SEQUENTIAL</dataSequence>
+            <length>1</length>
+            <name>FIELD</name>
+        </column>
         <column>
             <type>INTEGER</type>
             <dataSequence>SEQUENTIAL</dataSequence>
@@ -230,123 +230,141 @@
         </column>
     </datamapping>
     <scenarios>
-        <scenario tableName="PHERF.TEST_TABLE" rowCount="100" name="testScenarioRW">
-            <!-- Scenario level rule overrides will be unsupported in V1.
-                    You can use the general datamappings in the mean time-->
-            <dataOverride>
-                <column>
-                    <type>VARCHAR</type>
-                    <userDefined>true</userDefined>
-                    <dataSequence>RANDOM</dataSequence>
-                    <length>5</length>
-                    <name>FIELD</name>
-                </column>
-            </dataOverride>
+        <scenario tableName="PHERF.Z11" name="scenario_11">
+            <loadProfile>
+                <batchSize>1</batchSize>
+                <numOperations>1000</numOperations>
+                <tenantDistribution id="t111" weight="10" numTenants="10"></tenantDistribution>
+                <tenantDistribution id="t112" weight="10" numTenants="10"></tenantDistribution>
+                <tenantDistribution id="t113" weight="80" numTenants="1"></tenantDistribution>
+                <opDistribution id="op111" weight="50"></opDistribution>
+                <opDistribution id="op112" weight="0"></opDistribution>
+                <opDistribution id="op113" weight="0"></opDistribution>
+                <opDistribution id="op114" weight="50"></opDistribution>
+                <opDistribution id="op115" weight="0"></opDistribution>
+            </loadProfile>
 
-            <!--
-                This is used to add mixed R/W workloads.
 
-                If this tag exists, a writer pool will be created based on the below properties.
-                These props will override the default values in pherf.properties, but only for this
-                scenario.The write jobs will run in conjunction with the querySet below.
-            -->
-            <writeParams executionDurationInMs="10000">
-                <!--
-                    Number of writer it insert into the threadpool
-                -->
-                <writerThreadCount>2</writerThreadCount>
+            <preScenarioDdls>
+                <ddl statement="CREATE VIEW IF NOT EXISTS PHERF.Z11 (field1 VARCHAR, field2 VARCHAR) AS SELECT * FROM PHERF.TEST_TABLE" />
+            </preScenarioDdls>
 
-                <!--
-                    Time in Ms that each thread will sleep between batch writes. This helps to
-                    throttle writers.
-                -->
-                <threadSleepDuration>10</threadSleepDuration>
+            <upserts>
+                <upsert id="op111">
+                    <column>
+                        <type>CHAR</type>
+                        <name>PARENT_ID</name>
+                    </column>
+                    <column>
+                        <type>DATE</type>
+                        <name>CREATED_DATE</name>
+                    </column>
+                    <column>
+                        <type>VARCHAR</type>
+                        <name>FIELD</name>
+                    </column>
+                    <column>
+                        <type>VARCHAR</type>
+                        <name>OTHER_ID</name>
+                    </column>
+                    <column>
+                        <type>VARCHAR</type>
+                        <name>OLDVAL_STRING</name>
+                    </column>
+                    <column>
+                        <type>VARCHAR</type>
+                        <name>NEWVAL_STRING</name>
+                    </column>
+                    <column>
+                        <type>VARCHAR</type>
+                        <name>FIELD1</name>
+                    </column>
+                </upsert>
+            </upserts>
 
-                <batchSize>1000</batchSize>
-            </writeParams>
-            <querySet concurrency="1" executionType="PARALLEL" executionDurationInMs="10000">
-                <query id="q3" statement="select count(*) from PHERF.TEST_TABLE"/>
-                <query id="q4" statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
+            <idleTimes>
+                <idleTime id="op114" idleTime="50"></idleTime>
+            </idleTimes>
+            <udfs>
+                <udf id="op115" >
+                    <clazzName>org.apache.phoenix.pherf.ConfigurationParserTest.TestUDF</clazzName>
+                    <args>Hello</args>
+                    <args>World</args>
+                </udf>
+            </udfs>
+            <querySet>
+                <query id="op112" statement="select count(*) from PHERF.Z11"/>
+                <query id="op113" statement="select sum(SOME_INT) from PHERF.Z11"/>
             </querySet>
 
         </scenario>
+        <scenario tableName="PHERF.Z12" name="scenario_12">
+            <loadProfile>
+                <batchSize>5</batchSize>
+                <numOperations>1000</numOperations>
+                <tenantDistribution id="t121" weight="10" numTenants="5"></tenantDistribution>
+                <tenantDistribution id="t122" weight="10" numTenants="5"></tenantDistribution>
+                <tenantDistribution id="t123" weight="80" numTenants="5"></tenantDistribution>
+                <opDistribution id="op121" weight="50"></opDistribution>
+                <opDistribution id="op122" weight="5"></opDistribution>
+                <opDistribution id="op123" weight="5"></opDistribution>
+                <opDistribution id="op124" weight="40"></opDistribution>
+                <opDistribution id="op125" weight="0"></opDistribution>
+            </loadProfile>
 
-        <scenario tableName="PHERF.TEST_TABLE" rowCount="30" name="testScenario">
-            <!-- Scenario level rule overrides will be unsupported in V1.
-                    You can use the general datamappings in the mean time-->
-            <dataOverride>
-                <column>
-                    <type>VARCHAR</type>
-                    <userDefined>true</userDefined>
-                    <dataSequence>RANDOM</dataSequence>
-                    <length>10</length>
-                    <name>FIELD</name>
-                </column>
-            </dataOverride>
-            
-            <!--Note: 1. Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first 
-                      2. DDL included in query are executed only once on start of querySet execution.
-            -->
-            <querySet concurrency="1-3" executionType="SERIAL" executionDurationInMs="5000"
-                      numberOfExecutions="100">
-                <query id="q1" tenantId="123456789012345" expectedAggregateRowCount="0"
-                       statement="select count(*) from PHERF.TEST_TABLE"/>
-                <!-- queryGroup is a way to organize queries across tables or scenario files.
-                    The value will be dumped to results. This gives a value to group by on reporting to compare queries -->
-                <query id="q2" queryGroup="g1"
-                       statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
-            </querySet>
-            <!--Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first -->
-            <querySet concurrency="2-3" executionType="PARALLEL" executionDurationInMs="10000"
-                      numberOfExecutions="10">
-                <query id="q3" statement="select count(*) from PHERF.TEST_TABLE"/>
-                <query id="q4" statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
-            </querySet>
-        </scenario>
-        
-        <scenario tableName="PHERF.TEST_TABLE" rowCount="99" name="testPreAndPostDdls">
             <preScenarioDdls>
-                 <ddl statement="CREATE INDEX IDX_DIVISION ON ? (DIVISION)" tableName="PHERF.TEST_TABLE"/>
+                <ddl statement="CREATE VIEW IF NOT EXISTS PHERF.Z12 (field1 VARCHAR, field2 VARCHAR) AS SELECT * FROM PHERF.TEST_TABLE" />
             </preScenarioDdls>
- 
-            <postScenarioDdls>
-                 <ddl statement="CREATE INDEX IDX_OLDVAL_STRING ON ? (OLDVAL_STRING)" tableName="PHERF.TEST_TABLE"/>
-                 <ddl statement="CREATE INDEX IDX_CONNECTION_ID ON ? (CONNECTION_ID)" tableName="PHERF.TEST_TABLE"/>
-            </postScenarioDdls>
-            
-            <querySet concurrency="1" executionType="SERIAL" executionDurationInMs="5000"
-                      numberOfExecutions="1">
-                <query id="q1" expectedAggregateRowCount="99" statement="select count(*) from PHERF.TEST_TABLE"/>
+
+            <upserts>
+                <upsert id="op121">
+                    <column>
+                        <type>CHAR</type>
+                        <name>PARENT_ID</name>
+                    </column>
+                    <column>
+                        <type>DATE</type>
+                        <name>CREATED_DATE</name>
+                    </column>
+                    <column>
+                        <type>VARCHAR</type>
+                        <name>FIELD</name>
+                    </column>
+                    <column>
+                        <type>VARCHAR</type>
+                        <name>OTHER_ID</name>
+                    </column>
+                    <column>
+                        <type>VARCHAR</type>
+                        <name>OLDVAL_STRING</name>
+                    </column>
+                    <column>
+                        <type>VARCHAR</type>
+                        <name>NEWVAL_STRING</name>
+                    </column>
+                    <column>
+                        <type>VARCHAR</type>
+                        <name>FIELD1</name>
+                    </column>
+                </upsert>
+            </upserts>
+
+            <idleTimes>
+                <idleTime id="op124" idleTime="100"></idleTime>
+            </idleTimes>
+            <querySet>
+                <query id="op122" statement="select count(*) from PHERF.Z12"/>
+                <query id="op123" statement="select sum(SOME_INT) from PHERF.Z12"/>
             </querySet>
+            <udfs>
+                <udf id="op125" >
+                    <clazzName>org.apache.phoenix.pherf.ConfigurationParserTest.TestUDF</clazzName>
+                    <args>Hello</args>
+                    <args>World</args>
+                </udf>
+            </udfs>
+
         </scenario>
-        
-        <!-- To configure a Write Workload to write to a tenant specific view users need to
-             specify the tenantId attribute on the scenario, specifying the tenant they 
-             want to write data for as the attribute value. This tells Pherf to take out a 
-             tenant-specific connection for executing the write workload. 
-             The name of the tenant specific view to write to can then be specified as the value of
-             the tablename attribute. This assumes the tenant specific view has been created. To 
-             dynamically create the view see comments below with regard to the ddl attribute. 
-        -->
-        <scenario tableName="PHERF.TEST_VIEW" tenantId="xyzdefghijklmno"
-                    rowCount="100" name="testMTWriteScenario">
-           <preScenarioDdls>
-                <ddl statement="CREATE VIEW IF NOT EXISTS PHERF.TEST_VIEW (field1 VARCHAR, field2 VARCHAR) AS SELECT * FROM PHERF.TEST_MULTI_TENANT_TABLE" />
-            </preScenarioDdls>
-        </scenario>
-        <!--  Scenario level DDL that is dynamically executed before the Write Workload is run.
-              This pattern is really useful when you want to write data to multi-tenant view and the tenant id is
-              tightly bound to the scenario. In such cases you can't create the view through the data model flow.
-              The value of the tableName attribute is name of the view that is dynamically created based on the DDL
-              in the ddl attribute. Queries accessing the View will need to manually make sure Pherf was run with the -l option at
-              least once. 
-         -->
-        <scenario tableName="PHERF.TEST_MT_VIEW" tenantId="abcdefghijklmno" 
-                    rowCount="100" name="testMTDdlWriteScenario">
-            <preScenarioDdls>
-                <ddl statement="CREATE VIEW IF NOT EXISTS PHERF.TEST_MT_VIEW (field1 VARCHAR) AS SELECT * FROM PHERF.TEST_MULTI_TENANT_TABLE" />
-            </preScenarioDdls>
-        </scenario>
-        
+
     </scenarios>
 </datamodel>