You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ya...@apache.org on 2021/03/26 00:29:04 UTC
[phoenix] branch 4.x updated: PHOENIX-6118: Multi Tenant Workloads
using PHERF
This is an automated email from the ASF dual-hosted git repository.
yanxinyi pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 3535708 PHOENIX-6118: Multi Tenant Workloads using PHERF
3535708 is described below
commit 35357084a2e7cef1f97b654b1a09219b5fb82268
Author: jpisaac <ja...@gmail.com>
AuthorDate: Thu Mar 25 17:28:55 2021 -0700
PHOENIX-6118: Multi Tenant Workloads using PHERF
---
phoenix-pherf/pom.xml | 20 +-
.../java/org/apache/phoenix/pherf/PherfMainIT.java | 1 +
.../org/apache/phoenix/pherf/ResultBaseTestIT.java | 7 +-
.../org/apache/phoenix/pherf/SchemaReaderIT.java | 2 +-
.../MultiTenantOperationBaseIT.java | 79 +++++
.../mt/tenantoperation/TenantOperationIT.java | 112 +++++++
.../tenantoperation/TenantOperationWorkloadIT.java | 166 +++++++++++
.../datamodel/create_prod_test_unsalted.sql | 0
.../src/{main => it}/resources/hbase-site.xml | 0
.../scenario/prod_test_unsalted_scenario.xml | 0
.../main/java/org/apache/phoenix/pherf/Pherf.java | 42 ++-
.../pherf/configuration/DataTypeMapping.java | 2 +
.../phoenix/pherf/configuration/IdleTime.java | 47 +++
.../phoenix/pherf/configuration/LoadProfile.java | 113 +++++++
.../pherf/configuration/OperationGroup.java | 44 +++
.../apache/phoenix/pherf/configuration/Query.java | 54 ++--
.../phoenix/pherf/configuration/Scenario.java | 52 +++-
.../phoenix/pherf/configuration/TenantGroup.java | 52 ++++
.../apache/phoenix/pherf/configuration/Upsert.java | 135 +++++++++
.../phoenix/pherf/configuration/UserDefined.java | 55 ++++
.../apache/phoenix/pherf/rules/RulesApplier.java | 51 +++-
.../rules/SequentialIntegerDataGenerator.java | 2 +-
.../org/apache/phoenix/pherf/util/PhoenixUtil.java | 183 +++++++++++-
.../apache/phoenix/pherf/util/ResourceList.java | 7 +-
.../pherf/workload/MultiThreadedRunner.java | 4 +-
.../pherf/workload/MultithreadedDiffer.java | 2 +-
.../phoenix/pherf/workload/WorkloadExecutor.java | 2 +-
.../phoenix/pherf/workload/WriteWorkload.java | 131 +--------
.../phoenix/pherf/workload/mt/EventGenerator.java | 30 ++
.../pherf/workload/mt/IdleTimeOperation.java | 29 ++
.../pherf/workload/mt/MultiTenantWorkload.java | 46 +++
.../phoenix/pherf/workload/mt/Operation.java | 31 ++
.../phoenix/pherf/workload/mt/OperationStats.java | 119 ++++++++
.../pherf/workload/mt/PreScenarioOperation.java | 31 ++
.../phoenix/pherf/workload/mt/QueryOperation.java | 29 ++
.../phoenix/pherf/workload/mt/UpsertOperation.java | 29 ++
.../pherf/workload/mt/UserDefinedOperation.java | 29 ++
.../mt/tenantoperation/BaseOperationSupplier.java | 48 +++
.../tenantoperation/IdleTimeOperationSupplier.java | 78 +++++
.../PreScenarioOperationSupplier.java | 84 ++++++
.../mt/tenantoperation/QueryOperationSupplier.java | 91 ++++++
.../TenantOperationEventGenerator.java | 153 ++++++++++
.../mt/tenantoperation/TenantOperationFactory.java | 323 +++++++++++++++++++++
.../mt/tenantoperation/TenantOperationInfo.java | 70 +++++
.../TenantOperationWorkHandler.java | 75 +++++
.../tenantoperation/TenantOperationWorkload.java | 192 ++++++++++++
.../tenantoperation/UpsertOperationSupplier.java | 140 +++++++++
.../UserDefinedOperationSupplier.java | 50 ++++
.../phoenix/pherf/ConfigurationParserTest.java | 62 +++-
.../org/apache/phoenix/pherf/ResultBaseTest.java | 11 +-
.../TenantOperationEventGeneratorTest.java | 127 ++++++++
.../TenantOperationFactoryTest.java | 116 ++++++++
.../src/test/resources/datamodel/test_schema.sql | 1 +
.../{test_schema.sql => test_schema_mt_view.sql} | 26 +-
.../src/test/resources/scenario/test_evt_gen1.xml | 184 ++++++++++++
.../test/resources/scenario/test_mt_workload.xml | 135 +++++++++
.../src/test/resources/scenario/test_scenario.xml | 2 +-
...rio.xml => test_workload_with_load_profile.xml} | 246 ++++++++--------
58 files changed, 3617 insertions(+), 335 deletions(-)
diff --git a/phoenix-pherf/pom.xml b/phoenix-pherf/pom.xml
index 32f3e67..3157599 100644
--- a/phoenix-pherf/pom.xml
+++ b/phoenix-pherf/pom.xml
@@ -73,11 +73,15 @@
<artifactId>commons-math3</artifactId>
<version>3.3</version>
</dependency>
- <dependency>
- <groupId>org.apache.phoenix.thirdparty</groupId>
- <artifactId>phoenix-shaded-commons-cli</artifactId>
- </dependency>
-
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.8.6</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.phoenix.thirdparty</groupId>
+ <artifactId>phoenix-shaded-commons-cli</artifactId>
+ </dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>junit</groupId>
@@ -133,7 +137,7 @@
<build>
<resources>
<resource>
- <directory>src/main/resources</directory>
+ <directory>src/it/resources</directory>
</resource>
<resource>
<directory>config</directory>
@@ -221,7 +225,7 @@
<artifactSet>
<includes>
<include>org.apache.phoenix:phoenix-pherf</include>
- <include>com.google.guava:guava</include>
+ <include>org.apache.phoenix.thirdparty:phoenix-shaded-guava</include>
<include>com.googlecode.java-diff-utils:diffutils</include>
<include>org.apache.commons:commons-lang3</include>
<include>org.apache.commons:commons-math3</include>
@@ -230,6 +234,8 @@
<include>org.apache.commons:commons-csv</include>
<include>commons-lang:commons-lang</include>
<include>commons-io:commons-io</include>
+ <include>com.google.code.gson:gson</include>
+ <include>com.lmax:disruptor</include>
</includes>
</artifactSet>
<filters>
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
index 57aaae1..c71fda5 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
@@ -23,6 +23,7 @@ import org.apache.phoenix.pherf.result.Result;
import org.apache.phoenix.pherf.result.ResultValue;
import org.apache.phoenix.pherf.result.file.ResultFileDetails;
import org.apache.phoenix.pherf.result.impl.CSVFileResultHandler;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.ExpectedSystemExit;
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java
index fe1f2ea..75d6f6b 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java
@@ -25,6 +25,7 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.pherf.configuration.XMLConfigParser;
import org.apache.phoenix.pherf.result.ResultUtil;
import org.apache.phoenix.pherf.schema.SchemaReader;
@@ -37,9 +38,9 @@ import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@Category(NeedsOwnMiniClusterTest.class)
-public class ResultBaseTestIT extends BaseTest {
- protected static final String matcherScenario = ".*scenario/.*test.*xml";
- protected static final String matcherSchema = ".*datamodel/.*test.*sql";
+public class ResultBaseTestIT extends ParallelStatsDisabledIT {
+ protected static final String matcherScenario = ".*scenario/.*test_scenario.*xml";
+ protected static final String matcherSchema = ".*datamodel/.*test_schema.*sql";
protected static PhoenixUtil util = PhoenixUtil.create(true);
protected static Properties properties;
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java
index 901c92f..c5a93fe 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/SchemaReaderIT.java
@@ -78,7 +78,7 @@ public class SchemaReaderIT extends BaseTest {
private void assertApplySchemaTest() {
try {
util.setZookeeper("localhost");
- SchemaReader reader = new SchemaReader(util, ".*datamodel/.*test.*sql");
+ SchemaReader reader = new SchemaReader(util, ".*datamodel/.*test_schema.*sql");
List<Path> resources = new ArrayList<>(reader.getResourceList());
assertTrue("Could not pull list of schema files.", resources.size() > 0);
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/MultiTenantOperationBaseIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/MultiTenantOperationBaseIT.java
new file mode 100644
index 0000000..bcdbdca
--- /dev/null
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/MultiTenantOperationBaseIT.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.pherf.PherfConstants;
+import org.apache.phoenix.pherf.XMLConfigParserTest;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.XMLConfigParser;
+import org.apache.phoenix.pherf.schema.SchemaReader;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.junit.BeforeClass;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class MultiTenantOperationBaseIT extends ParallelStatsDisabledIT {
+ enum TestOperationGroup {
+ upsertOp, queryOp1, queryOp2, idleOp, udfOp
+ }
+
+ static enum TestTenantGroup {
+ tg1, tg2, tg3
+ }
+ protected static final String matcherScenario = ".*scenario/.*test_mt_workload.*xml";
+ protected static final String matcherSchema = ".*datamodel/.*test_schema_mt*.*sql";
+
+ protected static PhoenixUtil util = PhoenixUtil.create(true);
+ protected static Properties properties;
+ protected static SchemaReader reader;
+ protected static XMLConfigParser parser;
+ protected static List<Path> resources;
+
+ @BeforeClass public static synchronized void setUp() throws Exception {
+ PherfConstants constants = PherfConstants.create();
+ properties = constants.getProperties(PherfConstants.PHERF_PROPERTIES, false);
+
+ PhoenixUtil.setZookeeper("localhost");
+ reader = new SchemaReader(util, matcherSchema);
+ parser = new XMLConfigParser(matcherScenario);
+ reader.applySchema();
+ resources = new ArrayList<>(reader.getResourceList());
+
+ assertTrue("Could not pull list of schema files.", resources.size() > 0);
+ assertNotNull("Could not read schema file.", reader.resourceToString(resources.get(0)));
+
+ }
+
+ public DataModel readTestDataModel(String resourceName) throws Exception {
+ URL scenarioUrl = XMLConfigParserTest.class.getResource(resourceName);
+ assertNotNull(scenarioUrl);
+ Path p = Paths.get(scenarioUrl.toURI());
+ return XMLConfigParser.readDataModel(p);
+ }
+
+}
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationIT.java
new file mode 100644
index 0000000..737080a
--- /dev/null
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationIT.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.LoadProfile;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.mt.Operation;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.thirdparty.com.google.common.base.Supplier;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests focused on tenant operations and their validations
+ */
+public class TenantOperationIT extends MultiTenantOperationBaseIT {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TenantOperationIT.class);
+
+ @Test
+ public void testVariousOperations() throws Exception {
+ int numTenantGroups = 3;
+ int numOpGroups = 5;
+ int numRuns = 10;
+ int numOperations = 10;
+
+ PhoenixUtil pUtil = PhoenixUtil.create();
+ DataModel model = readTestDataModel("/scenario/test_mt_workload.xml");
+ for (Scenario scenario : model.getScenarios()) {
+ LOGGER.debug(String.format("Testing %s", scenario.getName()));
+ LoadProfile loadProfile = scenario.getLoadProfile();
+ assertEquals("tenant group size is not as expected: ",
+ numTenantGroups, loadProfile.getTenantDistribution().size());
+ assertEquals("operation group size is not as expected: ",
+ numOpGroups, loadProfile.getOpDistribution().size());
+
+ TenantOperationFactory opFactory = new TenantOperationFactory(pUtil, model, scenario);
+ TenantOperationEventGenerator evtGen = new TenantOperationEventGenerator(
+ opFactory.getOperations(), model, scenario);
+
+ assertEquals("operation group size from the factory is not as expected: ",
+ numOpGroups, opFactory.getOperations().size());
+
+ int numRowsInserted = 0;
+ for (int i = 0; i < numRuns; i++) {
+ int ops = numOperations;
+ loadProfile.setNumOperations(ops);
+ while (ops-- > 0) {
+ TenantOperationInfo info = evtGen.next();
+ Supplier<Function<TenantOperationInfo, OperationStats>> opSupplier =
+ opFactory.getOperationSupplier(info);
+ OperationStats stats = opSupplier.get().apply(info);
+ LOGGER.info(pUtil.getGSON().toJson(stats));
+ if (info.getOperation().getType() == Operation.OperationType.PRE_RUN) continue;
+ switch (TestOperationGroup.valueOf(info.getOperationGroupId())) {
+ case upsertOp:
+ assertTrue(opSupplier.getClass()
+ .isAssignableFrom(UpsertOperationSupplier.class));
+ numRowsInserted += stats.getRowCount();
+ break;
+ case queryOp1:
+ case queryOp2:
+ assertTrue(opFactory.getOperationSupplier(info).getClass()
+ .isAssignableFrom(QueryOperationSupplier.class));
+
+ // expected row count == num rows inserted
+ assertEquals(numRowsInserted, stats.getRowCount());
+ break;
+ case idleOp:
+ assertTrue(opFactory.getOperationSupplier(info).getClass()
+ .isAssignableFrom(IdleTimeOperationSupplier.class));
+ assertEquals(0, stats.getRowCount());
+ // expected think time (no-op) to be ~50ms
+ assertTrue(40 < stats.getDurationInMs() && stats.getDurationInMs() < 60);
+ break;
+ case udfOp:
+ assertTrue(opFactory.getOperationSupplier(info).getClass()
+ .isAssignableFrom(UserDefinedOperationSupplier.class));
+ assertEquals(0, stats.getRowCount());
+ break;
+ default:
+ Assert.fail();
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkloadIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkloadIT.java
new file mode 100644
index 0000000..c6d4dfd
--- /dev/null
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkloadIT.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import com.clearspring.analytics.util.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.thirdparty.com.google.common.base.Supplier;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import com.lmax.disruptor.LifecycleAware;
+import com.lmax.disruptor.WorkHandler;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.Workload;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.apache.phoenix.pherf.workload.mt.tenantoperation.TenantOperationWorkload.TenantOperationEvent;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests focused on tenant operation workloads {@link TenantOperationWorkload}
+ * and workload handlers {@link WorkHandler}
+ */
+public class TenantOperationWorkloadIT extends MultiTenantOperationBaseIT {
+
+ private static class EventCountingWorkHandler implements
+ WorkHandler<TenantOperationEvent>, LifecycleAware {
+ private final String handlerId;
+ private final TenantOperationFactory tenantOperationFactory;
+ private static final Logger LOGGER = LoggerFactory.getLogger(EventCountingWorkHandler.class);
+ private final Map<String, CountDownLatch> latches;
+ public EventCountingWorkHandler(TenantOperationFactory tenantOperationFactory,
+ String handlerId, Map<String, CountDownLatch> latches) {
+ this.handlerId = handlerId;
+ this.tenantOperationFactory = tenantOperationFactory;
+ this.latches = latches;
+ }
+
+ @Override public void onStart() {}
+
+ @Override public void onShutdown() {}
+
+ @Override public void onEvent(TenantOperationEvent event)
+ throws Exception {
+ TenantOperationInfo input = event.getTenantOperationInfo();
+ Supplier<Function<TenantOperationInfo, OperationStats>>
+ opSupplier = tenantOperationFactory.getOperationSupplier(input);
+ OperationStats stats = opSupplier.get().apply(input);
+ LOGGER.info(tenantOperationFactory.getPhoenixUtil().getGSON().toJson(stats));
+ assertEquals(0, stats.getStatus());
+ latches.get(handlerId).countDown();
+ }
+ }
+
+ @Test
+ public void testWorkloadWithOneHandler() throws Exception {
+ int numOpGroups = 5;
+ int numHandlers = 1;
+ int totalOperations = 50;
+ int perHandlerCount = 50;
+
+ ExecutorService executor = null;
+ try {
+ executor = Executors.newFixedThreadPool(numHandlers);
+ PhoenixUtil pUtil = PhoenixUtil.create();
+ DataModel model = readTestDataModel("/scenario/test_mt_workload.xml");
+ for (Scenario scenario : model.getScenarios()) {
+ // Set the total number of operations for this load profile
+ scenario.getLoadProfile().setNumOperations(totalOperations);
+ TenantOperationFactory opFactory = new TenantOperationFactory(pUtil, model, scenario);
+ assertEquals("operation group size from the factory is not as expected: ",
+ numOpGroups, opFactory.getOperations().size());
+
+ // populate the handlers and countdown latches.
+ String handlerId = String.format("%s.%d", InetAddress.getLocalHost().getHostName(), numHandlers);
+ List<WorkHandler> workers = Lists.newArrayList();
+ Map<String, CountDownLatch> latches = Maps.newConcurrentMap();
+ workers.add(new EventCountingWorkHandler(opFactory, handlerId, latches));
+ latches.put(handlerId, new CountDownLatch(perHandlerCount));
+ // submit the workload
+ Workload workload = new TenantOperationWorkload(pUtil, model, scenario, workers, properties);
+ Future status = executor.submit(workload.execute());
+ // Just make sure there are no exceptions
+ status.get();
+
+ // Wait for the handlers to count down
+ for (Map.Entry<String, CountDownLatch> latch : latches.entrySet()) {
+ assertTrue(latch.getValue().await(60, TimeUnit.SECONDS));
+ }
+ }
+ } finally {
+ if (executor != null) {
+ executor.shutdown();
+ }
+ }
+ }
+
+ @Test
+ public void testWorkloadWithManyHandlers() throws Exception {
+ int numOpGroups = 5;
+ int numHandlers = 5;
+ int totalOperations = 500;
+ int perHandlerCount = 50;
+
+ ExecutorService executor = Executors.newFixedThreadPool(numHandlers);
+ PhoenixUtil pUtil = PhoenixUtil.create();
+ DataModel model = readTestDataModel("/scenario/test_mt_workload.xml");
+ for (Scenario scenario : model.getScenarios()) {
+ // Set the total number of operations for this load profile
+ scenario.getLoadProfile().setNumOperations(totalOperations);
+ TenantOperationFactory opFactory = new TenantOperationFactory(pUtil, model, scenario);
+ assertEquals("operation group size from the factory is not as expected: ",
+ numOpGroups, opFactory.getOperations().size());
+
+ // populate the handlers and countdown latches.
+ List<WorkHandler> workers = Lists.newArrayList();
+ Map<String, CountDownLatch> latches = Maps.newConcurrentMap();
+ for (int i=0;i<numHandlers;i++) {
+ String handlerId = String.format("%s.%d", InetAddress.getLocalHost().getHostName(), i);
+ workers.add(new EventCountingWorkHandler(opFactory, handlerId, latches));
+ latches.put(handlerId, new CountDownLatch(perHandlerCount));
+ }
+ // submit the workload
+ Workload workload = new TenantOperationWorkload(pUtil, model, scenario, workers, properties);
+ Future status = executor.submit(workload.execute());
+ // Just make sure there are no exceptions
+ status.get();
+ // Wait for the handlers to count down
+ for (Map.Entry<String, CountDownLatch> latch : latches.entrySet()) {
+ assertTrue(latch.getValue().await(60, TimeUnit.SECONDS));
+ }
+ }
+ executor.shutdown();
+ }
+
+}
diff --git a/phoenix-pherf/src/main/resources/datamodel/create_prod_test_unsalted.sql b/phoenix-pherf/src/it/resources/datamodel/create_prod_test_unsalted.sql
similarity index 100%
rename from phoenix-pherf/src/main/resources/datamodel/create_prod_test_unsalted.sql
rename to phoenix-pherf/src/it/resources/datamodel/create_prod_test_unsalted.sql
diff --git a/phoenix-pherf/src/main/resources/hbase-site.xml b/phoenix-pherf/src/it/resources/hbase-site.xml
similarity index 100%
rename from phoenix-pherf/src/main/resources/hbase-site.xml
rename to phoenix-pherf/src/it/resources/hbase-site.xml
diff --git a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml b/phoenix-pherf/src/it/resources/scenario/prod_test_unsalted_scenario.xml
similarity index 100%
rename from phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
rename to phoenix-pherf/src/it/resources/scenario/prod_test_unsalted_scenario.xml
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
index cae3213..c042689 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
@@ -24,7 +24,8 @@ import java.util.Collection;
import java.util.List;
import java.util.Properties;
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
@@ -33,6 +34,8 @@ import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.PosixParser;
import org.apache.phoenix.pherf.PherfConstants.CompareType;
import org.apache.phoenix.pherf.PherfConstants.GeneratePhoenixStats;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Scenario;
import org.apache.phoenix.pherf.configuration.XMLConfigParser;
import org.apache.phoenix.pherf.jmx.MonitorManager;
import org.apache.phoenix.pherf.result.ResultUtil;
@@ -40,6 +43,7 @@ import org.apache.phoenix.pherf.schema.SchemaReader;
import org.apache.phoenix.pherf.util.GoogleChartGenerator;
import org.apache.phoenix.pherf.util.PhoenixUtil;
import org.apache.phoenix.pherf.util.ResourceList;
+import org.apache.phoenix.pherf.workload.mt.tenantoperation.TenantOperationWorkload;
import org.apache.phoenix.pherf.workload.QueryExecutor;
import org.apache.phoenix.pherf.workload.Workload;
import org.apache.phoenix.pherf.workload.WorkloadExecutor;
@@ -60,6 +64,8 @@ public class Pherf {
"HBase Zookeeper address for connection. Default: localhost");
options.addOption("q", "query", false, "Executes multi-threaded query sets");
options.addOption("listFiles", false, "List available resource files");
+ options.addOption("mt", "multi-tenant", false,
+ "Multi tenanted workloads based on load profiles.");
options.addOption("l", "load", false,
"Pre-loads data according to specified configuration values.");
options.addOption("scenarioFile", true,
@@ -103,6 +109,7 @@ public class Pherf {
private final String queryHint;
private final Properties properties;
private final boolean preLoadData;
+ private final boolean multiTenantWorkload;
private final String dropPherfTablesRegEx;
private final boolean executeQuerySets;
private final boolean isFunctional;
@@ -148,6 +155,7 @@ public class Pherf {
properties.setProperty(PherfConstants.LOG_PER_NROWS_NAME, getLogPerNRow(command));
preLoadData = command.hasOption("l");
+ multiTenantWorkload = command.hasOption("mt");
executeQuerySets = command.hasOption("q");
zookeeper = command.getOptionValue("z", "localhost");
queryHint = command.getOptionValue("hint", null);
@@ -288,17 +296,37 @@ public class Pherf {
}
// Schema and Data Load
- if (preLoadData) {
+ if (preLoadData || multiTenantWorkload) {
LOGGER.info("\nStarting Data Load...");
- Workload workload = new WriteWorkload(parser, generateStatistics);
+ List<Workload> newWorkloads = Lists.newArrayList();
try {
- workloadExecutor.add(workload);
+ if (multiTenantWorkload) {
+ for (DataModel model : parser.getDataModels()) {
+ for (Scenario scenario : model.getScenarios()) {
+ Workload workload = new TenantOperationWorkload(phoenixUtil,
+ model, scenario, properties);
+ newWorkloads.add(workload);
+ }
+ }
+ } else {
+ newWorkloads.add(new WriteWorkload(parser, generateStatistics));
+ }
+
+ if (newWorkloads.isEmpty()) {
+ throw new IllegalArgumentException("Found no new workload");
+ }
+
+ for (Workload workload : newWorkloads) {
+ workloadExecutor.add(workload);
+ }
// Wait for dataLoad to complete
- workloadExecutor.get(workload);
+ workloadExecutor.get();
} finally {
- if (null != workload) {
- workload.complete();
+ if (!newWorkloads.isEmpty()) {
+ for (Workload workload : newWorkloads) {
+ workload.complete();
+ }
}
}
} else {
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
index 129bdc2..3bbe728 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
@@ -30,7 +30,9 @@ public enum DataTypeMapping {
VARCHAR_ARRAY("VARCHAR ARRAY", Types.ARRAY),
VARBINARY("VARBINARY", Types.VARBINARY),
TIMESTAMP("TIMESTAMP", Types.TIMESTAMP),
+ BOOLEAN("BOOLEAN", Types.BOOLEAN),
BIGINT("BIGINT", Types.BIGINT),
+ UNSIGNED_INT("UNSIGNED_INT", Types.INTEGER),
TINYINT("TINYINT", Types.TINYINT);
private final String sType;
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/IdleTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/IdleTime.java
new file mode 100644
index 0000000..37d6e15
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/IdleTime.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.configuration;
+
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlType;
+
+@XmlType
+public class IdleTime {
+
+ private String id;
+ private long idleTime = 0;
+
+ @XmlAttribute
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @XmlAttribute
+ public long getIdleTime() {
+ return idleTime;
+ }
+
+ public void setIdleTime(long idleTime) {
+ this.idleTime = idleTime;
+ }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/LoadProfile.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/LoadProfile.java
new file mode 100644
index 0000000..3116244
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/LoadProfile.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.configuration;
+
+import javax.xml.bind.annotation.XmlType;
+import java.util.List;
+
+@XmlType
+public class LoadProfile {
+ private static final int MIN_BATCH_SIZE = 1;
+ private static final String DEFAULT_TENANT_ID_FMT = "00D%s%07d";
+ private static final int DEFAULT_GROUP_ID_LEN = 5;
+ private static final int DEFAULT_TENANT_ID_LEN = 15;
+
+ // Holds the batch size to be used in upserts.
+ private int batchSize;
+ // Holds the number of operations to be generated.
+ private long numOperations;
+ /**
+ * Holds the format to be used when generating tenantIds.
+ * TenantId format should typically have 2 parts -
+ * 1. string fmt - that hold the tenant group id.
+ * 2. int fmt - that holds a random number between 1 and max tenants
+ * for e.g DEFAULT_TENANT_ID_FMT = "00D%s%07d";
+ */
+ private String tenantIdFormat;
+ private int groupIdLength;
+ private int tenantIdLength;
+ // Holds the desired tenant distribution for this load.
+ private List<TenantGroup> tenantDistribution;
+ // Holds the desired operation distribution for this load.
+ private List<OperationGroup> opDistribution;
+
+ public LoadProfile() {
+ this.batchSize = MIN_BATCH_SIZE;
+ this.numOperations = Long.MAX_VALUE;
+ this.tenantIdFormat = DEFAULT_TENANT_ID_FMT;
+ this.tenantIdLength = DEFAULT_TENANT_ID_LEN;
+ this.groupIdLength = DEFAULT_GROUP_ID_LEN;
+ }
+
+ public String getTenantIdFormat() {
+ return tenantIdFormat;
+ }
+
+ public void setTenantIdFormat(String tenantIdFormat) {
+ this.tenantIdFormat = tenantIdFormat;
+ }
+
+ public int getTenantIdLength() {
+ return tenantIdLength;
+ }
+
+ public void setTenantIdLength(int tenantIdLength) {
+ this.tenantIdLength = tenantIdLength;
+ }
+
+ public int getGroupIdLength() {
+ return groupIdLength;
+ }
+
+ public void setGroupIdLength(int groupIdLength) {
+ this.groupIdLength = groupIdLength;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ public long getNumOperations() {
+ return numOperations;
+ }
+
+ public void setNumOperations(long numOperations) {
+ this.numOperations = numOperations;
+ }
+
+ public List<TenantGroup> getTenantDistribution() {
+ return tenantDistribution;
+ }
+
+ public void setTenantDistribution(List<TenantGroup> tenantDistribution) {
+ this.tenantDistribution = tenantDistribution;
+ }
+
+ public List<OperationGroup> getOpDistribution() {
+ return opDistribution;
+ }
+
+ public void setOpDistribution(List<OperationGroup> opDistribution) {
+ this.opDistribution = opDistribution;
+ }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/OperationGroup.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/OperationGroup.java
new file mode 100644
index 0000000..31545b2
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/OperationGroup.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.configuration;
+
+import javax.xml.bind.annotation.XmlAttribute;
+
+public class OperationGroup {
+ private String id;
+ private int weight;
+
+ @XmlAttribute
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @XmlAttribute
+ public int getWeight() {
+ return weight;
+ }
+
+ public void setWeight(int weight) {
+ this.weight = weight;
+ }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
index 5f28134..c47798c 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Query.java
@@ -18,23 +18,23 @@
package org.apache.phoenix.pherf.configuration;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import org.apache.phoenix.pherf.rules.RulesApplier;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlType;
-
-import org.apache.phoenix.pherf.rules.RulesApplier;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
@XmlType
public class Query {
+ private String id;
+ private String queryGroup;
+ private String tenantId;
private String statement;
private Long expectedAggregateRowCount;
- private String tenantId;
private String ddl;
- private String queryGroup;
- private String id;
+ private boolean useGlobalConnection;
private Pattern pattern;
private long timeoutDuration = Long.MAX_VALUE;
@@ -51,20 +51,24 @@ public class Query {
public String getStatement() {
return statement;
}
-
- public String getDynamicStatement(RulesApplier ruleApplier, Scenario scenario) throws Exception {
- String ret = this.statement;
- String needQuotes = "";
- Matcher m = pattern.matcher(ret);
- while(m.find()) {
- String dynamicField = m.group(0).replace("[", "").replace("]", "");
- Column dynamicColumn = ruleApplier.getRule(dynamicField, scenario);
- needQuotes = (dynamicColumn.getType() == DataTypeMapping.CHAR || dynamicColumn
- .getType() == DataTypeMapping.VARCHAR) ? "'" : "";
- ret = ret.replace("[" + dynamicField + "]",
- needQuotes + ruleApplier.getDataValue(dynamicColumn).getValue() + needQuotes);
- }
- return ret;
+
+ public String getDynamicStatement(RulesApplier ruleApplier, Scenario scenario)
+ throws Exception {
+ String ret = this.statement;
+ String needQuotes = "";
+ Matcher m = pattern.matcher(ret);
+ while (m.find()) {
+ String dynamicField = m.group(0).replace("[", "").replace("]", "");
+ Column dynamicColumn = ruleApplier.getRule(dynamicField, scenario);
+ needQuotes =
+ (dynamicColumn.getType() == DataTypeMapping.CHAR
+ || dynamicColumn.getType() == DataTypeMapping.VARCHAR) ? "'" : "";
+ ret =
+ ret.replace("[" + dynamicField + "]",
+ needQuotes + ruleApplier.getDataValue(dynamicColumn).getValue()
+ + needQuotes);
+ }
+ return ret;
}
public void setStatement(String statement) {
@@ -160,6 +164,14 @@ public class Query {
this.id = id;
}
+ @XmlAttribute
+ public boolean isUseGlobalConnection() {
+ return useGlobalConnection;
+ }
+
+ public void setUseGlobalConnection(boolean useGlobalConnection) {
+ this.useGlobalConnection = useGlobalConnection;
+ }
@XmlAttribute
public long getTimeoutDuration() {
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
index 53b2d25..32cfc1e 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
@@ -27,7 +27,7 @@ import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
-import com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.phoenix.pherf.util.PhoenixUtil;
@@ -36,15 +36,19 @@ public class Scenario {
private String tableName;
private int rowCount;
private Map<String, String> phoenixProperties;
+ private WriteParams writeParams = null;
private DataOverride dataOverride;
private List<QuerySet> querySet = new ArrayList<>();
- private WriteParams writeParams = null;
+ private List<Upsert> upsertSet = new ArrayList<>();
+ private List<IdleTime> idleTimes = new ArrayList<>();
+ private List<UserDefined> udfs = new ArrayList<>();
+ private LoadProfile loadProfile = null;
+
private String name;
private String tenantId;
private List<Ddl> preScenarioDdls;
private List<Ddl> postScenarioDdls;
-
-
+
public Scenario() {
}
@@ -194,6 +198,7 @@ public class Scenario {
this.writeParams = writeParams;
}
+
@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
@@ -232,4 +237,43 @@ public class Scenario {
public void setPostScenarioDdls(List<Ddl> postScenarioDdls) {
this.postScenarioDdls = postScenarioDdls;
}
+
+ public List<Upsert> getUpserts() {
+ return upsertSet;
+ }
+
+ @XmlElementWrapper(name = "upserts")
+ @XmlElement(name = "upsert")
+ public void setUpserts(List<Upsert> upsertSet) {
+ this.upsertSet = upsertSet;
+ }
+
+ public List<IdleTime> getIdleTimes() {
+ return idleTimes;
+ }
+
+ @XmlElementWrapper(name = "idleTimes")
+ @XmlElement(name = "idleTime")
+ public void setIdleTimes(List<IdleTime> idleTimes) {
+ this.idleTimes = idleTimes;
+ }
+
+ public List<UserDefined> getUdfs() {
+ return udfs;
+ }
+
+ @XmlElementWrapper(name = "udfs")
+ @XmlElement(name = "udf")
+ public void setUdfs(List<UserDefined> udfs) {
+ this.udfs = udfs;
+ }
+
+
+ public LoadProfile getLoadProfile() {
+ return loadProfile;
+ }
+
+ public void setLoadProfile(LoadProfile loadProfile) {
+ this.loadProfile = loadProfile;
+ }
}
\ No newline at end of file
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/TenantGroup.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/TenantGroup.java
new file mode 100644
index 0000000..0656917
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/TenantGroup.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.configuration;
+
+import javax.xml.bind.annotation.XmlAttribute;
+
+public class TenantGroup {
+ private String id;
+ private int weight;
+ private int numTenants;
+
+ @XmlAttribute
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @XmlAttribute
+ public int getWeight() {
+ return weight;
+ }
+
+ public void setWeight(int weight) {
+ this.weight = weight;
+ }
+
+ @XmlAttribute
+ public int getNumTenants() { return numTenants; }
+
+ public void setNumTenants(int numTenants) { this.numTenants = numTenants; }
+
+
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Upsert.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Upsert.java
new file mode 100644
index 0000000..dfbe9e6
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Upsert.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.configuration;
+
+import org.apache.phoenix.pherf.rules.RulesApplier;
+
+import javax.xml.bind.annotation.XmlAttribute;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class Upsert {
+
+ private String id;
+ private String upsertGroup;
+ private String statement;
+ private List<Column> columns;
+ private boolean useGlobalConnection;
+ private Pattern pattern;
+ private long timeoutDuration = Long.MAX_VALUE;
+
+ public Upsert() {
+ pattern = Pattern.compile("\\[.*?\\]");
+ }
+
+ public String getDynamicStatement(RulesApplier ruleApplier, Scenario scenario)
+ throws Exception {
+ String ret = this.statement;
+ String needQuotes = "";
+ Matcher m = pattern.matcher(ret);
+ while (m.find()) {
+ String dynamicField = m.group(0).replace("[", "").replace("]", "");
+ Column dynamicColumn = ruleApplier.getRule(dynamicField, scenario);
+ needQuotes =
+ (dynamicColumn.getType() == DataTypeMapping.CHAR
+ || dynamicColumn.getType() == DataTypeMapping.VARCHAR) ? "'" : "";
+ ret = ret.replace("[" + dynamicField + "]",
+ needQuotes + ruleApplier.getDataValue(dynamicColumn).getValue()
+ + needQuotes);
+ }
+ return ret;
+ }
+
+ /**
+ * upsertGroup attribute is just a string value to help correlate upserts across sets or files.
+ * This helps to make sense of reporting results.
+ *
+ * @return the group id
+ */
+ @XmlAttribute
+ public String getUpsertGroup() {
+ return upsertGroup;
+ }
+
+ public void setUpsertGroup(String upsertGroup) {
+ this.upsertGroup = upsertGroup;
+ }
+
+
+ /**
+ * Upsert ID, Use UUID if none specified
+ *
+ * @return
+ */
+ @XmlAttribute
+ public String getId() {
+ if (null == this.id) {
+ this.id = java.util.UUID.randomUUID().toString();
+ }
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public List<Column> getColumns() {
+ return columns;
+ }
+
+ public void setColumns(List<Column> columns) {
+ this.columns = columns;
+ }
+
+ @XmlAttribute
+ public boolean isUseGlobalConnection() {
+ return useGlobalConnection;
+ }
+
+ public void setUseGlobalConnection(boolean useGlobalConnection) {
+ this.useGlobalConnection = useGlobalConnection;
+ }
+
+ @XmlAttribute
+ public long getTimeoutDuration() {
+ return this.timeoutDuration;
+ }
+
+ public void setTimeoutDuration(long timeoutDuration) {
+ this.timeoutDuration = timeoutDuration;
+ }
+
+ public String getStatement() {
+ return statement;
+ }
+
+ public void setStatement(String statement) {
+ // normalize statement - merge all consecutive spaces into one
+ this.statement = statement.replaceAll("\\s+", " ");
+ }
+
+ public List<Column> getColumn() {
+ return columns;
+ }
+
+ public void setColumn(List<Column> columns) {
+ this.columns = columns;
+ }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/UserDefined.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/UserDefined.java
new file mode 100644
index 0000000..8350d57
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/UserDefined.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.configuration;
+
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlType;
+import java.util.List;
+
+@XmlType
+public class UserDefined {
+ String id;
+ String clazzName;
+ List<String> args;
+
+ @XmlAttribute
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getClazzName() {
+ return clazzName;
+ }
+
+ public void setClazzName(String clazzName) {
+ this.clazzName = clazzName;
+ }
+
+ public List<String> getArgs() {
+ return args;
+ }
+
+ public void setArgs(List<String> args) {
+ this.args = args;
+ }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
index b066e00..aeb3ec5 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
@@ -18,8 +18,9 @@
package org.apache.phoenix.pherf.rules;
-import com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math3.random.RandomDataGenerator;
import org.apache.phoenix.pherf.PherfConstants;
@@ -51,6 +52,7 @@ public class RulesApplier {
private final Random rndVal;
private final RandomDataGenerator randomDataGenerator;
+ private final DataModel dataModel;
private final XMLConfigParser parser;
private final List<Map> modelList;
private final Map<String, Column> columnMap;
@@ -59,6 +61,30 @@ public class RulesApplier {
private Map<Column,RuleBasedDataGenerator> columnRuleBasedDataGeneratorMap = new HashMap<>();
+ // Since rules are only relevant for a given data model,
+ // added a constructor to support a single data model => RulesApplier(DataModel model)
+
+ // We should deprecate the RulesApplier(XMLConfigParser parser) constructor,
+ // since a parser can have multiple data models (all the models found on the classpath)
+ // it implies that the rules apply to all the data models the parser holds
+ // which can be confusing to the user of this class.
+ //
+
+ public RulesApplier(DataModel model) {
+ this(model, EnvironmentEdgeManager.currentTimeMillis());
+ }
+
+ public RulesApplier(DataModel model, long seed) {
+ this.parser = null;
+ this.dataModel = model;
+ this.modelList = new ArrayList<Map>();
+ this.columnMap = new HashMap<String, Column>();
+ this.rndNull = new Random(seed);
+ this.rndVal = new Random(seed);
+ this.randomDataGenerator = new RandomDataGenerator();
+ this.cachedScenarioOverrideName = null;
+ populateModelList();
+ }
public RulesApplier(XMLConfigParser parser) {
this(parser, EnvironmentEdgeManager.currentTimeMillis());
@@ -66,6 +92,7 @@ public class RulesApplier {
public RulesApplier(XMLConfigParser parser, long seed) {
this.parser = parser;
+ this.dataModel = null;
this.modelList = new ArrayList<Map>();
this.columnMap = new HashMap<String, Column>();
this.rndNull = new Random(seed);
@@ -116,10 +143,10 @@ public class RulesApplier {
public DataValue getDataForRule(Scenario scenario, Column phxMetaColumn) throws Exception {
// TODO Make a Set of Rules that have already been applied so that so we don't generate for every value
- List<Scenario> scenarios = parser.getScenarios();
+ List<Scenario> scenarios = dataModel != null ? dataModel.getScenarios() : parser.getScenarios();
DataValue value = null;
if (scenarios.contains(scenario)) {
- LOGGER.debug("We found a correct Scenario");
+ LOGGER.debug("We found a correct Scenario" + scenario.getName());
Map<DataTypeMapping, List> overrideRuleMap = this.getCachedScenarioOverrides(scenario);
@@ -138,9 +165,10 @@ public class RulesApplier {
// Assume the first rule map
Map<DataTypeMapping, List> ruleMap = modelList.get(0);
List<Column> ruleList = ruleMap.get(phxMetaColumn.getType());
+ //LOGGER.info(String.format("Did not found a correct override column rule, %s, %s", phxMetaColumn.getName(), phxMetaColumn.getType()));
// Make sure Column from Phoenix Metadata matches a rule column
- if (ruleList.contains(phxMetaColumn)) {
+ if (ruleList != null && ruleList.contains(phxMetaColumn)) {
// Generate some random data based on this rule
LOGGER.debug("We found a correct column rule");
Column columnRule = getColumnForRule(ruleList, phxMetaColumn);
@@ -422,9 +450,18 @@ public class RulesApplier {
if (!modelList.isEmpty()) {
return;
}
-
- // Support for multiple models, but rules are only relevant each model
- for (DataModel model : parser.getDataModels()) {
+
+ // Since rules are only relevant for a given data model,
+ // added a constructor to support a single data model => RulesApplier(DataModel model)
+
+ // We should deprecate the RulesApplier(XMLConfigParser parser) constructor,
+ // since a parser can have multiple data models (all the models found on the classpath)
+ // it implies that the rules apply to all the data models the parser holds
+ // which can be confusing to the user of this class.
+
+ List<DataModel> models = dataModel != null ?
+ Lists.newArrayList(dataModel) : parser.getDataModels();
+ for (DataModel model : models) {
// Step 1
final Map<DataTypeMapping, List> ruleMap = new HashMap<DataTypeMapping, List>();
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/SequentialIntegerDataGenerator.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/SequentialIntegerDataGenerator.java
index 1d1a7d0..125e0d7 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/SequentialIntegerDataGenerator.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/SequentialIntegerDataGenerator.java
@@ -18,7 +18,7 @@
package org.apache.phoenix.pherf.rules;
-import com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.pherf.configuration.Column;
import org.apache.phoenix.pherf.configuration.DataSequence;
import org.apache.phoenix.pherf.configuration.DataTypeMapping;
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
index 34f45b2..1b5ba33 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
@@ -18,16 +18,33 @@
package org.apache.phoenix.pherf.util;
+import com.google.gson.Gson;
import org.apache.phoenix.mapreduce.index.automation.PhoenixMRJobSubmitter;
import org.apache.phoenix.pherf.PherfConstants;
-import org.apache.phoenix.pherf.configuration.*;
+import org.apache.phoenix.pherf.configuration.Column;
+import org.apache.phoenix.pherf.configuration.DataTypeMapping;
+import org.apache.phoenix.pherf.configuration.Ddl;
+import org.apache.phoenix.pherf.configuration.Query;
+import org.apache.phoenix.pherf.configuration.QuerySet;
+import org.apache.phoenix.pherf.configuration.Scenario;
import org.apache.phoenix.pherf.result.DataLoadTimeSummary;
+import org.apache.phoenix.pherf.rules.DataValue;
import org.apache.phoenix.pherf.rules.RulesApplier;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.*;
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -40,6 +57,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
public class PhoenixUtil {
+ public static final String ASYNC_KEYWORD = "ASYNC";
+ public static final Gson GSON = new Gson();
private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixUtil.class);
private static String zookeeper;
private static int rowCountOverride = 0;
@@ -47,7 +66,6 @@ public class PhoenixUtil {
private static PhoenixUtil instance;
private static boolean useThinDriver;
private static String queryServerUrl;
- private static final String ASYNC_KEYWORD = "ASYNC";
private static final int ONE_MIN_IN_MS = 60000;
private static String CurrentSCN = null;
@@ -81,6 +99,10 @@ public class PhoenixUtil {
return PhoenixUtil.useThinDriver;
}
+ public static Gson getGSON() {
+ return GSON;
+ }
+
public Connection getConnection() throws Exception {
return getConnection(null);
}
@@ -262,6 +284,7 @@ public class PhoenixUtil {
column.setType(DataTypeMapping.valueOf(resultSet.getString("TYPE_NAME").replace(" ", "_")));
column.setLength(resultSet.getInt("COLUMN_SIZE"));
columnList.add(column);
+ LOGGER.debug(String.format("getColumnsMetaData for column name : %s", column.getName()));
}
} finally {
if (null != resultSet) {
@@ -330,7 +353,7 @@ public class PhoenixUtil {
* @param tableName
* @throws InterruptedException
*/
- private void waitForAsyncIndexToFinish(String tableName) throws InterruptedException {
+ public void waitForAsyncIndexToFinish(String tableName) throws InterruptedException {
//Wait for up to 15 mins for ASYNC index build to start
boolean jobStarted = false;
for (int i=0; i<15; i++) {
@@ -450,4 +473,156 @@ public class PhoenixUtil {
}
return buf.toString();
}
+
+ public PreparedStatement buildStatement(RulesApplier rulesApplier, Scenario scenario, List<Column> columns,
+ PreparedStatement statement, SimpleDateFormat simpleDateFormat) throws Exception {
+
+ int count = 1;
+ for (Column column : columns) {
+ DataValue dataValue = rulesApplier.getDataForRule(scenario, column);
+ switch (column.getType()) {
+ case VARCHAR:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.VARCHAR);
+ } else {
+ statement.setString(count, dataValue.getValue());
+ }
+ break;
+ case CHAR:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.CHAR);
+ } else {
+ statement.setString(count, dataValue.getValue());
+ }
+ break;
+ case DECIMAL:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.DECIMAL);
+ } else {
+ statement.setBigDecimal(count, new BigDecimal(dataValue.getValue()));
+ }
+ break;
+ case INTEGER:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.INTEGER);
+ } else {
+ statement.setInt(count, Integer.parseInt(dataValue.getValue()));
+ }
+ break;
+ case UNSIGNED_LONG:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.OTHER);
+ } else {
+ statement.setLong(count, Long.parseLong(dataValue.getValue()));
+ }
+ break;
+ case BIGINT:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.BIGINT);
+ } else {
+ statement.setLong(count, Long.parseLong(dataValue.getValue()));
+ }
+ break;
+ case TINYINT:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.TINYINT);
+ } else {
+ statement.setLong(count, Integer.parseInt(dataValue.getValue()));
+ }
+ break;
+ case DATE:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.DATE);
+ } else {
+ Date
+ date =
+ new java.sql.Date(simpleDateFormat.parse(dataValue.getValue()).getTime());
+ statement.setDate(count, date);
+ }
+ break;
+ case VARCHAR_ARRAY:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.ARRAY);
+ } else {
+ Array
+ arr =
+ statement.getConnection().createArrayOf("VARCHAR", dataValue.getValue().split(","));
+ statement.setArray(count, arr);
+ }
+ break;
+ case VARBINARY:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.VARBINARY);
+ } else {
+ statement.setBytes(count, dataValue.getValue().getBytes());
+ }
+ break;
+ case TIMESTAMP:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.TIMESTAMP);
+ } else {
+ java.sql.Timestamp
+ ts =
+ new java.sql.Timestamp(simpleDateFormat.parse(dataValue.getValue()).getTime());
+ statement.setTimestamp(count, ts);
+ }
+ break;
+ default:
+ break;
+ }
+ count++;
+ }
+ return statement;
+ }
+
+ public String buildSql(final List<Column> columns, final String tableName) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("upsert into ");
+ builder.append(tableName);
+ builder.append(" (");
+ int count = 1;
+ for (Column column : columns) {
+ builder.append(column.getName());
+ if (count < columns.size()) {
+ builder.append(",");
+ } else {
+ builder.append(")");
+ }
+ count++;
+ }
+ builder.append(" VALUES (");
+ for (int i = 0; i < columns.size(); i++) {
+ if (i < columns.size() - 1) {
+ builder.append("?,");
+ } else {
+ builder.append("?)");
+ }
+ }
+ return builder.toString();
+ }
+
+ public org.apache.hadoop.hbase.util.Pair<Long, Long> getResults(
+ Query query,
+ ResultSet rs,
+ String queryIteration,
+ boolean isSelectCountStatement,
+ Long queryStartTime) throws Exception {
+
+ Long resultRowCount = 0L;
+ while (rs.next()) {
+ if (isSelectCountStatement) {
+ resultRowCount = rs.getLong(1);
+ } else {
+ resultRowCount++;
+ }
+ long queryElapsedTime = EnvironmentEdgeManager.currentTimeMillis() - queryStartTime;
+ if (queryElapsedTime >= query.getTimeoutDuration()) {
+ LOGGER.error("Query " + queryIteration + " exceeded timeout of "
+ + query.getTimeoutDuration() + " ms at " + queryElapsedTime + " ms.");
+ return new org.apache.hadoop.hbase.util.Pair(resultRowCount, queryElapsedTime);
+ }
+ }
+ return new org.apache.hadoop.hbase.util.Pair(resultRowCount, EnvironmentEdgeManager.currentTimeMillis() - queryStartTime);
+ }
+
}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/ResourceList.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/ResourceList.java
index 64ee6ee..dd3c4fd 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/ResourceList.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/ResourceList.java
@@ -29,7 +29,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipException;
@@ -40,7 +42,7 @@ import org.apache.phoenix.pherf.exception.PherfException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
/**
* list resources available from the classpath @ *
@@ -76,7 +78,7 @@ public class ResourceList {
final String classPath = System.getProperty("java.class.path", ".");
final String[] classPathElements = classPath.split(":");
- List<String> strResources = new ArrayList<>();
+ Set<String> strResources = new HashSet<>();
Collection<Path> paths = new ArrayList<>();
// TODO Make getResourcesPaths() return the URLs directly instead of converting them
@@ -112,6 +114,7 @@ public class ResourceList {
paths.add(path);
}
+ Collections.sort((List<Path>)paths);
return paths;
}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
index c4c38bd..5d4b973 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
@@ -25,7 +25,7 @@ import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.Callable;
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.pherf.result.DataModelResult;
import org.apache.phoenix.pherf.result.ResultManager;
@@ -161,7 +161,7 @@ class MultiThreadedRunner implements Callable<Void> {
conn.setAutoCommit(true);
final String statementString = query.getDynamicStatement(ruleApplier, scenario);
statement = conn.prepareStatement(statementString);
- LOGGER.info("Executing iteration: " + queryIteration + ": " + statementString);
+ LOGGER.debug("Executing iteration: " + queryIteration + ": " + statementString);
if (scenario.getWriteParams() != null) {
Workload writes = new WriteWorkload(PhoenixUtil.create(), parser, scenario, GeneratePhoenixStats.NO);
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
index 068acda..8dfcbf9 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
@@ -22,7 +22,7 @@ import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.Callable;
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.pherf.PherfConstants;
import org.apache.phoenix.pherf.configuration.Query;
import org.apache.phoenix.pherf.result.RunTime;
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
index 1d38e3d..381751d 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
@@ -18,7 +18,7 @@
package org.apache.phoenix.pherf.workload;
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.pherf.PherfConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
index 613fb23..b6a5ac6 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
@@ -274,11 +274,11 @@ public class WriteWorkload implements Workload {
logPerNRows = Integer.valueOf(customizedLogPerNRows);
}
last = start = EnvironmentEdgeManager.currentTimeMillis();
- String sql = buildSql(columns, tableName);
+ String sql = pUtil.buildSql(columns, tableName);
stmt = connection.prepareStatement(sql);
for (long i = rowCount; (i > 0) && ((EnvironmentEdgeManager.currentTimeMillis() - logStartTime)
< maxDuration); i--) {
- stmt = buildStatement(scenario, columns, stmt, simpleDateFormat);
+ stmt = pUtil.buildStatement(rulesApplier, scenario, columns, stmt, simpleDateFormat);
if (useBatchApi) {
stmt.addBatch();
} else {
@@ -362,133 +362,6 @@ public class WriteWorkload implements Workload {
return future;
}
- private PreparedStatement buildStatement(Scenario scenario, List<Column> columns,
- PreparedStatement statement, SimpleDateFormat simpleDateFormat) throws Exception {
- int count = 1;
- for (Column column : columns) {
-
- DataValue dataValue = getRulesApplier().getDataForRule(scenario, column);
- switch (column.getType()) {
- case VARCHAR:
- if (dataValue.getValue().equals("")) {
- statement.setNull(count, Types.VARCHAR);
- } else {
- statement.setString(count, dataValue.getValue());
- }
- break;
- case CHAR:
- if (dataValue.getValue().equals("")) {
- statement.setNull(count, Types.CHAR);
- } else {
- statement.setString(count, dataValue.getValue());
- }
- break;
- case DECIMAL:
- if (dataValue.getValue().equals("")) {
- statement.setNull(count, Types.DECIMAL);
- } else {
- statement.setBigDecimal(count, new BigDecimal(dataValue.getValue()));
- }
- break;
- case INTEGER:
- if (dataValue.getValue().equals("")) {
- statement.setNull(count, Types.INTEGER);
- } else {
- statement.setInt(count, Integer.parseInt(dataValue.getValue()));
- }
- break;
- case UNSIGNED_LONG:
- if (dataValue.getValue().equals("")) {
- statement.setNull(count, Types.OTHER);
- } else {
- statement.setLong(count, Long.parseLong(dataValue.getValue()));
- }
- break;
- case BIGINT:
- if (dataValue.getValue().equals("")) {
- statement.setNull(count, Types.BIGINT);
- } else {
- statement.setLong(count, Long.parseLong(dataValue.getValue()));
- }
- break;
- case TINYINT:
- if (dataValue.getValue().equals("")) {
- statement.setNull(count, Types.TINYINT);
- } else {
- statement.setLong(count, Integer.parseInt(dataValue.getValue()));
- }
- break;
- case DATE:
- if (dataValue.getValue().equals("")) {
- statement.setNull(count, Types.DATE);
- } else {
- Date
- date =
- new java.sql.Date(simpleDateFormat.parse(dataValue.getValue()).getTime());
- statement.setDate(count, date);
- }
- break;
- case VARCHAR_ARRAY:
- if (dataValue.getValue().equals("")) {
- statement.setNull(count, Types.ARRAY);
- } else {
- Array
- arr =
- statement.getConnection().createArrayOf("VARCHAR", dataValue.getValue().split(","));
- statement.setArray(count, arr);
- }
- break;
- case VARBINARY:
- if (dataValue.getValue().equals("")) {
- statement.setNull(count, Types.VARBINARY);
- } else {
- statement.setBytes(count, dataValue.getValue().getBytes());
- }
- break;
- case TIMESTAMP:
- if (dataValue.getValue().equals("")) {
- statement.setNull(count, Types.TIMESTAMP);
- } else {
- java.sql.Timestamp
- ts =
- new java.sql.Timestamp(simpleDateFormat.parse(dataValue.getValue()).getTime());
- statement.setTimestamp(count, ts);
- }
- break;
- default:
- break;
- }
- count++;
- }
- return statement;
- }
-
- private String buildSql(final List<Column> columns, final String tableName) {
- StringBuilder builder = new StringBuilder();
- builder.append("upsert into ");
- builder.append(tableName);
- builder.append(" (");
- int count = 1;
- for (Column column : columns) {
- builder.append(column.getName());
- if (count < columns.size()) {
- builder.append(",");
- } else {
- builder.append(")");
- }
- count++;
- }
- builder.append(" VALUES (");
- for (int i = 0; i < columns.size(); i++) {
- if (i < columns.size() - 1) {
- builder.append("?,");
- } else {
- builder.append("?)");
- }
- }
- return builder.toString();
- }
-
public XMLConfigParser getParser() {
return parser;
}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/EventGenerator.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/EventGenerator.java
new file mode 100644
index 0000000..c6ffeea
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/EventGenerator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+/**
+ * An interface that implementers can use to generate events that can be consumed by
+ * @see {@link com.lmax.disruptor.WorkHandler} which provide event handling functionality for
+ * a given event.
+ *
+ * @param <T>
+ */
+public interface EventGenerator<T> {
+ T next();
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/IdleTimeOperation.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/IdleTimeOperation.java
new file mode 100644
index 0000000..bc7762f
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/IdleTimeOperation.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+import org.apache.phoenix.pherf.configuration.IdleTime;
+
+/**
+ * Defines a no op operation, typically used to simulate idle time.
+ * @see {@link OperationType#IDLE_TIME}
+ */
+public interface IdleTimeOperation extends Operation {
+ IdleTime getIdleTime();
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/MultiTenantWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/MultiTenantWorkload.java
new file mode 100644
index 0000000..d2c19e8
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/MultiTenantWorkload.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+
+import java.util.Properties;
+
+public interface MultiTenantWorkload {
+ /**
+ * Initializes and readies the processor for continuous queue based workloads
+ */
+ void start();
+
+ /**
+ * Stop the processor and cleans up the workload queues.
+ */
+ void stop();
+
+
+ PhoenixUtil getPhoenixUtil();
+
+ Scenario getScenario();
+
+ DataModel getModel();
+
+ Properties getProperties();
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/Operation.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/Operation.java
new file mode 100644
index 0000000..8774ae5
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/Operation.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+/**
+ * An interface that defines the type of operation included in the load profile.
+ * @see {@link org.apache.phoenix.pherf.configuration.LoadProfile}
+ */
+public interface Operation {
+ enum OperationType {
+ PRE_RUN, UPSERT, SELECT, IDLE_TIME, USER_DEFINED
+ }
+ String getId();
+ OperationType getType();
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/OperationStats.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/OperationStats.java
new file mode 100644
index 0000000..c032ba4
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/OperationStats.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+import org.apache.phoenix.pherf.result.ResultValue;
+import org.apache.phoenix.pherf.workload.mt.tenantoperation.TenantOperationInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Holds metrics + contextual info on the operation run.
+ */
+public class OperationStats {
+ private final String modelName;
+ private final String scenarioName;
+ private final String tableName;
+ private final String tenantId;
+ private final String tenantGroup;
+ private final String operationGroup;
+ private final Operation.OperationType opType;
+ private String handlerId;
+ private final int status;
+ private final long rowCount;
+ private final long durationInMs;
+ private final long startTime;
+
+ public OperationStats(
+ TenantOperationInfo input,
+ long startTime,
+ int status,
+ long rowCount,
+ long durationInMs) {
+ this.modelName = input.getModelName();
+ this.scenarioName = input.getScenarioName();
+ this.tableName = input.getTableName();
+ this.tenantId = input.getTenantId();
+ this.tenantGroup = input.getTenantGroupId();
+ this.operationGroup = input.getOperationGroupId();
+ this.opType = input.getOperation().getType();
+ this.startTime = startTime;
+ this.status = status;
+ this.rowCount = rowCount;
+ this.durationInMs = durationInMs;
+ }
+
+ public String getModelName() { return modelName; }
+
+ public String getScenarioName() { return scenarioName; }
+
+ public String getTenantId() { return tenantId; }
+
+ public Operation.OperationType getOpType() { return opType; }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getTenantGroup() {
+ return tenantGroup;
+ }
+
+ public String getOperationGroup() {
+ return operationGroup;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public long getRowCount() {
+ return rowCount;
+ }
+
+ public String getHandlerId() { return handlerId; }
+
+ public long getStartTime() { return startTime; }
+
+ public long getDurationInMs() {
+ return durationInMs;
+ }
+
+ public List<ResultValue> getCsvRepresentation() {
+ List<ResultValue> rowValues = new ArrayList<>();
+ rowValues.add(new ResultValue(modelName));
+ rowValues.add(new ResultValue(scenarioName));
+ rowValues.add(new ResultValue(tableName));
+ rowValues.add(new ResultValue(tenantId));
+ rowValues.add(new ResultValue(handlerId));
+ rowValues.add(new ResultValue(tenantGroup));
+ rowValues.add(new ResultValue(operationGroup));
+ rowValues.add(new ResultValue(opType.name()));
+ rowValues.add(new ResultValue(String.valueOf(startTime)));
+ rowValues.add(new ResultValue(String.valueOf(status)));
+ rowValues.add(new ResultValue(String.valueOf(rowCount)));
+ rowValues.add(new ResultValue(String.valueOf(durationInMs)));
+ return rowValues;
+ }
+
+ public void setHandlerId(String handlerId) {
+ this.handlerId = handlerId;
+ }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/PreScenarioOperation.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/PreScenarioOperation.java
new file mode 100644
index 0000000..e0a276c
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/PreScenarioOperation.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+import org.apache.phoenix.pherf.configuration.Ddl;
+
+import java.util.List;
+
+/**
+ * Defines a pre scenario operation.
+ * @see {@link OperationType#PRE_RUN}
+ */
+public interface PreScenarioOperation extends Operation {
+ List<Ddl> getPreScenarioDdls();
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/QueryOperation.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/QueryOperation.java
new file mode 100644
index 0000000..8b7ee78
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/QueryOperation.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+import org.apache.phoenix.pherf.configuration.Query;
+
+/**
+ * Defines a query operation.
+ * @see {@link OperationType#SELECT}
+ */
+public interface QueryOperation extends Operation {
+ Query getQuery();
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/UpsertOperation.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/UpsertOperation.java
new file mode 100644
index 0000000..cb6abd9
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/UpsertOperation.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+import org.apache.phoenix.pherf.configuration.Upsert;
+
+/**
+ * Defines an upsert operation.
+ * @see {@link OperationType#UPSERT}
+ */
+public interface UpsertOperation extends Operation {
+ Upsert getUpsert();
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/UserDefinedOperation.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/UserDefinedOperation.java
new file mode 100644
index 0000000..04f4fd8
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/UserDefinedOperation.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt;
+
+import org.apache.phoenix.pherf.configuration.UserDefined;
+
+/**
+ * Defines an user defined operation.
+ * @see {@link OperationType#USER_DEFINED}
+ */
+public interface UserDefinedOperation extends Operation {
+ UserDefined getUserFunction();
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/BaseOperationSupplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/BaseOperationSupplier.java
new file mode 100644
index 0000000..cda4504
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/BaseOperationSupplier.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.thirdparty.com.google.common.base.Supplier;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.LoadProfile;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.rules.RulesApplier;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+
+/**
+ * An abstract base class for all OperationSuppliers
+ */
+abstract class BaseOperationSupplier implements Supplier<Function<TenantOperationInfo, OperationStats>> {
+
+ final PhoenixUtil phoenixUtil;
+ final DataModel model;
+ final Scenario scenario;
+ final RulesApplier rulesApplier;
+ final LoadProfile loadProfile;
+
+ public BaseOperationSupplier(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario) {
+ this.phoenixUtil = phoenixUtil;
+ this.model = model;
+ this.scenario = scenario;
+ this.rulesApplier = new RulesApplier(model);
+ this.loadProfile = this.scenario.getLoadProfile();
+ }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/IdleTimeOperationSupplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/IdleTimeOperationSupplier.java
new file mode 100644
index 0000000..ab45c27
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/IdleTimeOperationSupplier.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.IdleTime;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.mt.IdleTimeOperation;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A supplier of {@link Function} that takes {@link IdleTimeOperation} as an input.
+ */
+class IdleTimeOperationSupplier extends BaseOperationSupplier {
+ private static final Logger LOGGER = LoggerFactory.getLogger(IdleTimeOperationSupplier.class);
+
+ public IdleTimeOperationSupplier(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario) {
+ super(phoenixUtil, model, scenario);
+ }
+
+ @Override
+ public Function<TenantOperationInfo, OperationStats> get() {
+
+ return new Function<TenantOperationInfo, OperationStats>() {
+
+ @Override
+ public OperationStats apply(final TenantOperationInfo input) {
+
+ final IdleTimeOperation operation = (IdleTimeOperation) input.getOperation();
+ final IdleTime idleTime = operation.getIdleTime();
+
+ final String tenantId = input.getTenantId();
+ final String tenantGroup = input.getTenantGroupId();
+ final String opGroup = input.getOperationGroupId();
+ final String tableName = input.getTableName();
+ final String scenarioName = input.getScenarioName();
+ final String opName = String.format("%s:%s:%s:%s:%s", scenarioName, tableName,
+ opGroup, tenantGroup, tenantId);
+
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ int status = 0;
+
+ // Sleep for the specified time to simulate idle time.
+ try {
+ TimeUnit.MILLISECONDS.sleep(idleTime.getIdleTime());
+ } catch (InterruptedException ie) {
+ LOGGER.error("Operation " + opName + " failed with exception ", ie);
+ status = -1;
+ }
+ long duration = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+ return new OperationStats(input, startTime, status, 0, duration);
+ }
+ };
+ }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/PreScenarioOperationSupplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/PreScenarioOperationSupplier.java
new file mode 100644
index 0000000..4f1e3e3
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/PreScenarioOperationSupplier.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Ddl;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.apache.phoenix.pherf.workload.mt.PreScenarioOperation;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ * A supplier of {@link Function} that takes {@link PreScenarioOperation} as an input
+ */
+class PreScenarioOperationSupplier extends BaseOperationSupplier {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PreScenarioOperationSupplier.class);
+
+ public PreScenarioOperationSupplier(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario) {
+ super(phoenixUtil, model, scenario);
+ }
+
+ @Override
+ public Function<TenantOperationInfo, OperationStats> get() {
+ return new Function<TenantOperationInfo, OperationStats>() {
+
+ @Override
+ public OperationStats apply(final TenantOperationInfo input) {
+ final PreScenarioOperation operation = (PreScenarioOperation) input.getOperation();
+ final String tenantId = input.getTenantId();
+ final String tenantGroup = input.getTenantGroupId();
+ final String opGroup = input.getOperationGroupId();
+ final String tableName = input.getTableName();
+ final String scenarioName = input.getScenarioName();
+ final String opName = String.format("%s:%s:%s:%s:%s",
+ scenarioName, tableName, opGroup, tenantGroup, tenantId);
+
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ int status = 0;
+ if (!operation.getPreScenarioDdls().isEmpty()) {
+ try (Connection conn = phoenixUtil.getConnection(tenantId)) {
+ for (Ddl ddl : operation.getPreScenarioDdls()) {
+ LOGGER.info("\nExecuting DDL:" + ddl + " on tenantId:" + tenantId);
+ phoenixUtil.executeStatement(ddl.toString(), conn);
+ if (ddl.getStatement().toUpperCase().contains(phoenixUtil.ASYNC_KEYWORD)) {
+ phoenixUtil.waitForAsyncIndexToFinish(ddl.getTableName());
+ }
+ }
+ } catch (SQLException sqle) {
+ LOGGER.error("Operation " + opName + " failed with exception ", sqle);
+ status = -1;
+ } catch (Exception e) {
+ LOGGER.error("Operation " + opName + " failed with exception ", e);
+ status = -1;
+ }
+ }
+ long totalDuration = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+ return new OperationStats(input, startTime, status, operation.getPreScenarioDdls().size(), totalDuration);
+ }
+ };
+ }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/QueryOperationSupplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/QueryOperationSupplier.java
new file mode 100644
index 0000000..88eec68
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/QueryOperationSupplier.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Query;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.apache.phoenix.pherf.workload.mt.QueryOperation;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+
+/**
+ * A supplier of {@link Function} that takes {@link QueryOperation} as an input.
+ */
+public class QueryOperationSupplier extends BaseOperationSupplier {
+ private static final Logger LOGGER = LoggerFactory.getLogger(QueryOperationSupplier.class);
+
+ public QueryOperationSupplier(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario) {
+ super(phoenixUtil, model, scenario);
+ }
+
+ @Override
+ public Function<TenantOperationInfo, OperationStats> get() {
+ return new Function<TenantOperationInfo, OperationStats>() {
+
+ @Override
+ public OperationStats apply(final TenantOperationInfo input) {
+
+ final QueryOperation operation = (QueryOperation) input.getOperation();
+ final String tenantGroup = input.getTenantGroupId();
+ final String opGroup = input.getOperationGroupId();
+ final String tenantId = input.getTenantId();
+ final String scenarioName = input.getScenarioName();
+ final String tableName = input.getTableName();
+ final Query query = operation.getQuery();
+
+ String opName = String.format("%s:%s:%s:%s:%s", scenarioName, tableName,
+ opGroup, tenantGroup, tenantId);
+ LOGGER.info("\nExecuting query " + query.getStatement());
+
+ long startTime = 0;
+ int status = 0;
+ Long resultRowCount = 0L;
+ Long queryElapsedTime = 0L;
+ try (Connection connection = phoenixUtil.getConnection(tenantId)) {
+ startTime = EnvironmentEdgeManager.currentTimeMillis();
+
+ // TODO handle dynamic statements
+ try (PreparedStatement statement = connection.prepareStatement(query.getStatement())) {
+ try (ResultSet rs = statement.executeQuery()) {
+ boolean isSelectCountStatement = query.getStatement().toUpperCase().trim().contains("COUNT(") ? true : false;
+ Pair<Long, Long> r = phoenixUtil.getResults(query, rs, opName,
+ isSelectCountStatement, startTime);
+ resultRowCount = r.getFirst();
+ queryElapsedTime = r.getSecond();
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Operation " + opName + " failed with exception ", e);
+ status = -1;
+ }
+ return new OperationStats(input, startTime, status, resultRowCount, queryElapsedTime);
+ }
+ };
+ }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationEventGenerator.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationEventGenerator.java
new file mode 100644
index 0000000..676c510
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationEventGenerator.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.commons.math3.distribution.EnumeratedDistribution;
+import org.apache.commons.math3.util.Pair;
+import org.apache.phoenix.pherf.PherfConstants;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.LoadProfile;
+import org.apache.phoenix.pherf.configuration.OperationGroup;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.configuration.TenantGroup;
+import org.apache.phoenix.pherf.workload.mt.Operation;
+import org.apache.phoenix.pherf.workload.mt.EventGenerator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+/**
+ * A perf load event generator based on the supplied load profile.
+ */
+
+public class TenantOperationEventGenerator
+ implements EventGenerator<TenantOperationInfo> {
+
+ private static class WeightedRandomSampler {
+ private final Random RANDOM = new Random();
+ private final LoadProfile loadProfile;
+ private final String modelName;
+ private final String scenarioName;
+ private final String tableName;
+ private final EnumeratedDistribution<String> distribution;
+
+ private final Map<String, TenantGroup> tenantGroupMap = Maps.newHashMap();
+ private final Map<String, Operation> operationMap = Maps.newHashMap();
+ private final Map<String, OperationGroup> operationGroupMap = Maps.newHashMap();
+
+ public WeightedRandomSampler(List<Operation> operationList, DataModel model, Scenario scenario) {
+ this.modelName = model.getName();
+ this.scenarioName = scenario.getName();
+ this.tableName = scenario.getTableName();
+ this.loadProfile = scenario.getLoadProfile();
+
+ // Track the individual tenant group sizes,
+ // so that given a generated sample we can get a random tenant for a group.
+ for (TenantGroup tg : loadProfile.getTenantDistribution()) {
+ tenantGroupMap.put(tg.getId(), tg);
+ }
+ Preconditions.checkArgument(!tenantGroupMap.isEmpty(),
+ "Tenant group cannot be empty");
+
+ for (Operation op : operationList) {
+ for (OperationGroup og : loadProfile.getOpDistribution()) {
+ if (op.getId().compareTo(og.getId()) == 0) {
+ operationMap.put(op.getId(), op);
+ operationGroupMap.put(op.getId(), og);
+ }
+ }
+ }
+ Preconditions.checkArgument(!operationMap.isEmpty(),
+ "Operation list and load profile operation do not match");
+
+ double totalTenantGroupWeight = 0.0f;
+ double totalOperationGroupWeight = 0.0f;
+ // Sum the weights to find the total weight,
+ // so that the weights can be used in the total probability distribution.
+ for (TenantGroup tg : loadProfile.getTenantDistribution()) {
+ totalTenantGroupWeight += tg.getWeight();
+ }
+ for (OperationGroup og : loadProfile.getOpDistribution()) {
+ totalOperationGroupWeight += og.getWeight();
+ }
+
+ Preconditions.checkArgument(totalTenantGroupWeight != 0.0f,
+ "Total tenant group weight cannot be zero");
+ Preconditions.checkArgument(totalOperationGroupWeight != 0.0f,
+ "Total operation group weight cannot be zero");
+
+ // Initialize the sample probability distribution
+ List<Pair<String, Double>> pmf = Lists.newArrayList();
+ double totalWeight = totalTenantGroupWeight * totalOperationGroupWeight;
+ for (TenantGroup tg : loadProfile.getTenantDistribution()) {
+ for (String opId : operationMap.keySet()) {
+ String sampleName = String.format("%s:%s", tg.getId(), opId);
+ int opWeight = operationGroupMap.get(opId).getWeight();
+ double probability = (tg.getWeight() * opWeight)/totalWeight;
+ pmf.add(new Pair(sampleName, probability));
+ }
+ }
+ this.distribution = new EnumeratedDistribution(pmf);
+ }
+
+ public TenantOperationInfo nextSample() {
+ String sampleIndex = this.distribution.sample();
+ String[] parts = sampleIndex.split(":");
+ String tenantGroupId = parts[0];
+ String opId = parts[1];
+
+ Operation op = operationMap.get(opId);
+ int numTenants = tenantGroupMap.get(tenantGroupId).getNumTenants();
+ String tenantIdPrefix = Strings.padStart(tenantGroupId, loadProfile.getGroupIdLength(), '0');
+ String formattedTenantId = String.format(loadProfile.getTenantIdFormat(),
+ tenantIdPrefix.substring(0, loadProfile.getGroupIdLength()), RANDOM.nextInt(numTenants));
+ String paddedTenantId = Strings.padStart(formattedTenantId, loadProfile.getTenantIdLength(), '0');
+ String tenantId = paddedTenantId.substring(0, loadProfile.getTenantIdLength());
+
+ TenantOperationInfo sample = new TenantOperationInfo(modelName, scenarioName, tableName,
+ tenantGroupId, opId, tenantId, op);
+ return sample;
+ }
+ }
+
+ private final WeightedRandomSampler sampler;
+ private final Properties properties;
+
+ public TenantOperationEventGenerator(List<Operation> ops, DataModel model, Scenario scenario)
+ throws Exception {
+ this(ops, model, scenario,
+ PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES, true));
+ }
+
+ public TenantOperationEventGenerator(List<Operation> ops, DataModel model, Scenario scenario,
+ Properties properties) {
+ this.properties = properties;
+ this.sampler = new WeightedRandomSampler(ops, model, scenario);
+ }
+
+ @Override public TenantOperationInfo next() {
+ return this.sampler.nextSample();
+ }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationFactory.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationFactory.java
new file mode 100644
index 0000000..365984f
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationFactory.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Charsets;
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.thirdparty.com.google.common.base.Supplier;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.hash.BloomFilter;
+import org.apache.phoenix.thirdparty.com.google.common.hash.Funnel;
+import org.apache.phoenix.thirdparty.com.google.common.hash.PrimitiveSink;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Ddl;
+import org.apache.phoenix.pherf.configuration.IdleTime;
+import org.apache.phoenix.pherf.configuration.LoadProfile;
+import org.apache.phoenix.pherf.configuration.Query;
+import org.apache.phoenix.pherf.configuration.QuerySet;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.configuration.TenantGroup;
+import org.apache.phoenix.pherf.configuration.Upsert;
+import org.apache.phoenix.pherf.configuration.UserDefined;
+import org.apache.phoenix.pherf.configuration.XMLConfigParser;
+import org.apache.phoenix.pherf.rules.RulesApplier;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.mt.EventGenerator;
+import org.apache.phoenix.pherf.workload.mt.IdleTimeOperation;
+import org.apache.phoenix.pherf.workload.mt.Operation;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Factory class for operation suppliers.
+ * The class is responsible for creating new instances of suppliers {@link Supplier}
+ * for operations {@link Operation}
+ *
+ * Operations that need to be executed for a given {@link Scenario} and {@link DataModel}
+ * are generated by {@link EventGenerator}
+ *
+ * These operation events are then published on to the {@link com.lmax.disruptor.RingBuffer}
+ * by the {@link TenantOperationWorkload} workload generator and
+ * handled by the {@link com.lmax.disruptor.WorkHandler} for eg {@link TenantOperationWorkHandler}
+ */
+public class TenantOperationFactory {
+
+ private static class TenantView {
+ private final String tenantId;
+ private final String viewName;
+
+ public TenantView(String tenantId, String viewName) {
+ this.tenantId = tenantId;
+ this.viewName = viewName;
+ }
+
+ public String getTenantId() {
+ return tenantId;
+ }
+
+ public String getViewName() {
+ return viewName;
+ }
+ }
+ private static final Logger LOGGER = LoggerFactory.getLogger(TenantOperationFactory.class);
+ private final PhoenixUtil phoenixUtil;
+ private final DataModel model;
+ private final Scenario scenario;
+ private final XMLConfigParser parser;
+
+ private final RulesApplier rulesApplier;
+ private final LoadProfile loadProfile;
+ private final List<Operation> operationList = Lists.newArrayList();
+ private final Map<Operation.OperationType, Supplier<Function<TenantOperationInfo, OperationStats>>> operationSuppliers =
+ Maps.newEnumMap(Operation.OperationType.class);
+
+ private final BloomFilter<TenantView> tenantsLoaded;
+
+ public TenantOperationFactory(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario) {
+ this.phoenixUtil = phoenixUtil;
+ this.model = model;
+ this.scenario = scenario;
+ this.parser = null;
+ this.rulesApplier = new RulesApplier(model);
+ this.loadProfile = this.scenario.getLoadProfile();
+ this.tenantsLoaded = createTenantsLoadedFilter(loadProfile);
+
+ // Read the scenario definition and load the various operations.
+ // Case : Operation.OperationType.PRE_RUN
+ if (scenario.getPreScenarioDdls() != null && scenario.getPreScenarioDdls().size() > 0) {
+ operationSuppliers.put(Operation.OperationType.PRE_RUN,
+ new PreScenarioOperationSupplier(phoenixUtil, model, scenario));
+ }
+
+ // Case : Operation.OperationType.UPSERT
+ List<Operation> upsertOperations = getUpsertOperationsForScenario(scenario);
+ if (upsertOperations.size() > 0) {
+ operationList.addAll(upsertOperations);
+ operationSuppliers.put(Operation.OperationType.UPSERT,
+ new UpsertOperationSupplier(phoenixUtil, model, scenario));
+ }
+
+ // Case : Operation.OperationType.SELECT
+ List<Operation> queryOperations = getQueryOperationsForScenario(scenario);
+ if (queryOperations.size() > 0) {
+ operationList.addAll(queryOperations);
+ operationSuppliers.put(Operation.OperationType.SELECT,
+ new QueryOperationSupplier(phoenixUtil, model, scenario));
+ }
+
+ // Case : Operation.OperationType.IDLE_TIME
+ List<Operation> idleOperations = getIdleTimeOperationsForScenario(scenario);
+ if (idleOperations.size() > 0) {
+ operationList.addAll(idleOperations);
+ operationSuppliers.put(Operation.OperationType.IDLE_TIME,
+ new IdleTimeOperationSupplier(phoenixUtil, model, scenario));
+ }
+
+ // Case : Operation.OperationType.USER_DEFINED
+ List<Operation> udfOperations = getUDFOperationsForScenario(scenario);
+ if (udfOperations.size() > 0) {
+ operationList.addAll(udfOperations);
+ operationSuppliers.put(Operation.OperationType.USER_DEFINED,
+ new UserDefinedOperationSupplier(phoenixUtil, model, scenario));
+ }
+ }
+
+ private BloomFilter createTenantsLoadedFilter(LoadProfile loadProfile) {
+ Funnel<TenantView> tenantViewFunnel = new Funnel<TenantView>() {
+ @Override
+ public void funnel(TenantView tenantView, PrimitiveSink into) {
+ into.putString(tenantView.getTenantId(), Charsets.UTF_8)
+ .putString(tenantView.getViewName(), Charsets.UTF_8);
+ }
+ };
+
+ int numTenants = 0;
+ for (TenantGroup tg : loadProfile.getTenantDistribution()) {
+ numTenants += tg.getNumTenants();
+ }
+
+ // This holds the info whether the tenant view was created (initialized) or not.
+ return BloomFilter.create(tenantViewFunnel, numTenants, 0.01);
+ }
+
+ private List<Operation> getUpsertOperationsForScenario(Scenario scenario) {
+ List<Operation> opList = Lists.newArrayList();
+ for (final Upsert upsert : scenario.getUpserts()) {
+ Operation upsertOp = new org.apache.phoenix.pherf.workload.mt.UpsertOperation() {
+ @Override public Upsert getUpsert() {
+ return upsert;
+ }
+
+ @Override public String getId() {
+ return upsert.getId();
+ }
+
+ @Override public OperationType getType() {
+ return OperationType.UPSERT;
+ }
+ };
+ opList.add(upsertOp);
+ }
+ return opList;
+ }
+
+ private List<Operation> getQueryOperationsForScenario(Scenario scenario) {
+ List<Operation> opList = Lists.newArrayList();
+ for (final QuerySet querySet : scenario.getQuerySet()) {
+ for (final Query query : querySet.getQuery()) {
+ Operation queryOp = new org.apache.phoenix.pherf.workload.mt.QueryOperation() {
+ @Override public Query getQuery() {
+ return query;
+ }
+
+ @Override public String getId() {
+ return query.getId();
+ }
+
+ @Override public OperationType getType() {
+ return OperationType.SELECT;
+ }
+ };
+ opList.add(queryOp);
+ }
+ }
+ return opList;
+ }
+
+ private List<Operation> getIdleTimeOperationsForScenario(Scenario scenario) {
+ List<Operation> opList = Lists.newArrayList();
+ for (final IdleTime idleTime : scenario.getIdleTimes()) {
+ Operation idleTimeOperation = new IdleTimeOperation() {
+ @Override public IdleTime getIdleTime() {
+ return idleTime;
+ }
+ @Override public String getId() {
+ return idleTime.getId();
+ }
+
+ @Override public OperationType getType() {
+ return OperationType.IDLE_TIME;
+ }
+ };
+ opList.add(idleTimeOperation);
+ }
+ return opList;
+ }
+
+ private List<Operation> getUDFOperationsForScenario(Scenario scenario) {
+ List<Operation> opList = Lists.newArrayList();
+ for (final UserDefined udf : scenario.getUdfs()) {
+ Operation udfOperation = new org.apache.phoenix.pherf.workload.mt.UserDefinedOperation() {
+ @Override public UserDefined getUserFunction() {
+ return udf;
+ }
+
+ @Override public String getId() {
+ return udf.getId();
+ }
+
+ @Override public OperationType getType() {
+ return OperationType.USER_DEFINED;
+ }
+ };
+ opList.add(udfOperation);
+ }
+ return opList;
+ }
+
+ public PhoenixUtil getPhoenixUtil() {
+ return phoenixUtil;
+ }
+
+ public DataModel getModel() {
+ return model;
+ }
+
+ public Scenario getScenario() {
+ return scenario;
+ }
+
+ public List<Operation> getOperations() {
+ return operationList;
+ }
+
+ public Supplier<Function<TenantOperationInfo, OperationStats>> getOperationSupplier(
+ final TenantOperationInfo input) {
+ TenantView tenantView = new TenantView(input.getTenantId(), scenario.getTableName());
+
+ // Check if pre run ddls are needed.
+ if (!tenantsLoaded.mightContain(tenantView)) {
+
+ Supplier<Function<TenantOperationInfo, OperationStats>> preRunOpSupplier =
+ operationSuppliers.get(Operation.OperationType.PRE_RUN);
+ // Check if the scenario has a PRE_RUN operation.
+ if (preRunOpSupplier != null) {
+ // Initialize the tenant using the pre scenario ddls.
+ final org.apache.phoenix.pherf.workload.mt.PreScenarioOperation
+ operation = new org.apache.phoenix.pherf.workload.mt.PreScenarioOperation() {
+ @Override public List<Ddl> getPreScenarioDdls() {
+ List<Ddl> ddls = scenario.getPreScenarioDdls();
+ return ddls == null ? Lists.<Ddl>newArrayList() : ddls;
+ }
+
+ @Override public String getId() {
+ return OperationType.PRE_RUN.name();
+ }
+
+ @Override public OperationType getType() {
+ return OperationType.PRE_RUN;
+ }
+ };
+ // Initialize with the pre run operation.
+ TenantOperationInfo preRunSample = new TenantOperationInfo(
+ input.getModelName(),
+ input.getScenarioName(),
+ input.getTableName(),
+ input.getTenantGroupId(),
+ Operation.OperationType.PRE_RUN.name(),
+ input.getTenantId(), operation);
+
+ try {
+ // Run the initialization operation.
+ OperationStats stats = preRunOpSupplier.get().apply(preRunSample);
+ LOGGER.info(phoenixUtil.getGSON().toJson(stats));
+ } catch (Exception e) {
+ LOGGER.error(String.format("Failed to initialize tenant. [%s, %s] ",
+ tenantView.tenantId,
+ tenantView.viewName), e);
+ }
+ }
+
+ tenantsLoaded.put(tenantView);
+ }
+
+ Supplier<Function<TenantOperationInfo, OperationStats>> opSupplier =
+ operationSuppliers.get(input.getOperation().getType());
+ if (opSupplier == null) {
+ throw new IllegalArgumentException("Unknown operation type");
+ }
+ return opSupplier;
+ }
+
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationInfo.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationInfo.java
new file mode 100644
index 0000000..2481dd1
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationInfo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.pherf.workload.mt.Operation;
+
+/**
+ * Holds information on the tenant operation details.
+ */
+public class TenantOperationInfo {
+ private final String modelName;
+ private final String scenarioName;
+ private final String tableName;
+ private final String tenantId;
+ private final String tenantGroupId;
+ private final String operationGroupId;
+ private final Operation operation;
+
+ public TenantOperationInfo(String modelName, String scenarioName, String tableName,
+ String tenantGroupId, String operationGroupId,
+ String tenantId, Operation operation) {
+ this.modelName = modelName;
+ this.scenarioName = scenarioName;
+ this.tableName = tableName;
+ this.tenantGroupId = tenantGroupId;
+ this.operationGroupId = operationGroupId;
+ this.tenantId = tenantId;
+ this.operation = operation;
+ }
+
+ public String getModelName() { return modelName; }
+
+ public String getScenarioName() { return scenarioName; }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getTenantGroupId() {
+ return tenantGroupId;
+ }
+
+ public String getOperationGroupId() {
+ return operationGroupId;
+ }
+
+ public Operation getOperation() {
+ return operation;
+ }
+
+ public String getTenantId() {
+ return tenantId;
+ }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkHandler.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkHandler.java
new file mode 100644
index 0000000..0ae4273
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkHandler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.thirdparty.com.google.common.base.Supplier;
+import com.lmax.disruptor.LifecycleAware;
+import com.lmax.disruptor.WorkHandler;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.apache.phoenix.pherf.workload.mt.tenantoperation.TenantOperationWorkload.TenantOperationEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A handler {@link WorkHandler} for
+ * executing the operations {@link org.apache.phoenix.pherf.workload.mt.Operation}
+ * as and when they become available on the {@link com.lmax.disruptor.RingBuffer}
+ * when published by the workload generator {@link TenantOperationWorkload}
+ */
+
+public class TenantOperationWorkHandler implements WorkHandler<TenantOperationEvent>,
+ LifecycleAware {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TenantOperationWorkHandler.class);
+ private final String handlerId;
+ private final TenantOperationFactory operationFactory;
+
+
+ public TenantOperationWorkHandler(TenantOperationFactory operationFactory,
+ String handlerId) {
+ this.handlerId = handlerId;
+ this.operationFactory = operationFactory;
+ }
+
+ @Override
+ public void onEvent(TenantOperationEvent event)
+ throws Exception {
+ TenantOperationInfo input = event.getTenantOperationInfo();
+ Supplier<Function<TenantOperationInfo, OperationStats>> opSupplier =
+ operationFactory.getOperationSupplier(input);
+ OperationStats stats = opSupplier.get().apply(input);
+ stats.setHandlerId(handlerId);
+ LOGGER.info(operationFactory.getPhoenixUtil().getGSON().toJson(stats));
+ }
+
+ @Override
+ public void onStart() {
+ Scenario scenario = operationFactory.getScenario();
+ LOGGER.info(String.format("TenantOperationWorkHandler started for %s:%s",
+ scenario.getName(), scenario.getTableName()));
+ }
+
+ @Override
+ public void onShutdown() {
+ Scenario scenario = operationFactory.getScenario();
+ LOGGER.info(String.format("TenantOperationWorkHandler stopped for %s:%s",
+ scenario.getName(), scenario.getTableName()));
+ }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkload.java
new file mode 100644
index 0000000..be255d3
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationWorkload.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.WorkHandler;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.Workload;
+import org.apache.phoenix.pherf.workload.mt.EventGenerator;
+import org.apache.phoenix.pherf.workload.mt.MultiTenantWorkload;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+
+/**
+ * This class creates workload for tenant based load profiles.
+ * It uses @see {@link TenantOperationFactory} in conjunction with
+ * @see {@link TenantOperationEventGenerator} to generate the load events.
+ * It then publishes these events onto a RingBuffer based queue.
+ * The @see {@link TenantOperationWorkHandler} drains the events from the queue and executes them.
+ * Reference for RingBuffer based queue http://lmax-exchange.github.io/disruptor/
+ */
+
+public class TenantOperationWorkload implements MultiTenantWorkload, Workload {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TenantOperationWorkload.class);
+ private static final int DEFAULT_NUM_HANDLER_PER_MODEL = 4;
+ private static final int DEFAULT_BUFFER_SIZE = 8192;
+
+ private static class ContinuousWorkloadExceptionHandler implements ExceptionHandler {
+ @Override public void handleEventException(Throwable ex, long sequence, Object event) {
+ LOGGER.error("Sequence=" + sequence + ", event=" + event, ex);
+ throw new RuntimeException(ex);
+ }
+
+ @Override public void handleOnStartException(Throwable ex) {
+ LOGGER.error("On Start", ex);
+ throw new RuntimeException(ex);
+ }
+
+ @Override public void handleOnShutdownException(Throwable ex) {
+ LOGGER.error("On Shutdown", ex);
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public static class TenantOperationEvent {
+ TenantOperationInfo tenantOperationInfo;
+
+ public TenantOperationInfo getTenantOperationInfo() {
+ return tenantOperationInfo;
+ }
+
+ public void setTenantOperationInfo(TenantOperationInfo tenantOperationInfo) {
+ this.tenantOperationInfo = tenantOperationInfo;
+ }
+
+ public static final EventFactory<TenantOperationEvent> EVENT_FACTORY = new EventFactory<TenantOperationEvent>() {
+ public TenantOperationEvent newInstance() {
+ return new TenantOperationEvent();
+ }
+ };
+ }
+
+ private Disruptor<TenantOperationEvent> disruptor;
+ private final Properties properties;
+ private final TenantOperationFactory operationFactory;
+ private final EventGenerator<TenantOperationInfo> generator;
+ private final List<WorkHandler> handlers;
+ private final ExceptionHandler exceptionHandler;
+
+ public TenantOperationWorkload(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario,
+ List<WorkHandler> workers, Properties properties) throws Exception {
+ this(phoenixUtil, model, scenario, workers, new ContinuousWorkloadExceptionHandler(), properties);
+ }
+
+ public TenantOperationWorkload(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario,
+ Properties properties) throws Exception {
+
+ operationFactory = new TenantOperationFactory(phoenixUtil, model, scenario);
+ this.properties = properties;
+ this.handlers = Lists.newArrayListWithCapacity(DEFAULT_NUM_HANDLER_PER_MODEL);
+ for (int i = 0; i < DEFAULT_NUM_HANDLER_PER_MODEL; i++) {
+ String handlerId = String.format("%s.%d", InetAddress.getLocalHost().getHostName(), i+1);
+ handlers.add(new TenantOperationWorkHandler(
+ operationFactory,
+ handlerId));
+ }
+ this.generator = new TenantOperationEventGenerator(operationFactory.getOperations(),
+ model, scenario);
+ this.exceptionHandler = new ContinuousWorkloadExceptionHandler();
+ }
+
+ public TenantOperationWorkload(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario,
+ List<WorkHandler> workers,
+ ExceptionHandler exceptionHandler,
+ Properties properties) throws Exception {
+
+ operationFactory = new TenantOperationFactory(phoenixUtil, model, scenario);
+ this.properties = properties;
+ this.generator = new TenantOperationEventGenerator(operationFactory.getOperations(),
+ model, scenario);
+ this.handlers = workers;
+ this.exceptionHandler = exceptionHandler;
+ }
+
+
+ @Override public void start() {
+
+ Scenario scenario = operationFactory.getScenario();
+ String currentThreadName = Thread.currentThread().getName();
+ disruptor = new Disruptor<TenantOperationEvent>(TenantOperationEvent.EVENT_FACTORY, DEFAULT_BUFFER_SIZE,
+ Threads.getNamedThreadFactory(currentThreadName + "." + scenario.getName() ),
+ ProducerType.SINGLE, new BlockingWaitStrategy());
+
+ this.disruptor.setDefaultExceptionHandler(this.exceptionHandler);
+ this.disruptor.handleEventsWithWorkerPool(this.handlers.toArray(new WorkHandler[] {}));
+ RingBuffer<TenantOperationEvent> ringBuffer = this.disruptor.start();
+ long numOperations = scenario.getLoadProfile().getNumOperations();
+ while (numOperations > 0) {
+ TenantOperationInfo sample = generator.next();
+ --numOperations;
+ // Publishers claim events in sequence
+ long sequence = ringBuffer.next();
+ TenantOperationEvent event = ringBuffer.get(sequence);
+ event.setTenantOperationInfo(sample);
+ // make the event available to EventProcessors
+ ringBuffer.publish(sequence);
+ LOGGER.debug(String.format("published : %s:%s:%d",
+ scenario.getName(), scenario.getTableName(), numOperations));
+ }
+ }
+
+ @Override public void stop() {
+ this.disruptor.shutdown();
+ }
+
+ @Override public PhoenixUtil getPhoenixUtil() { return operationFactory.getPhoenixUtil(); }
+
+ @Override public Scenario getScenario() {
+ return operationFactory.getScenario();
+ }
+
+ @Override public DataModel getModel() {
+ return operationFactory.getModel();
+ }
+
+ @Override public Properties getProperties() {
+ return this.properties;
+ }
+
+ @Override public Callable<Void> execute() throws Exception {
+ return new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ start();
+ return null;
+ }
+ };
+ }
+
+ @Override public void complete() {
+ stop();
+ }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/UpsertOperationSupplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/UpsertOperationSupplier.java
new file mode 100644
index 0000000..5b25c12
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/UpsertOperationSupplier.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.pherf.configuration.Column;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.configuration.Upsert;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.apache.phoenix.pherf.workload.mt.UpsertOperation;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.List;
+
+/**
+ * A supplier of {@link Function} that takes {@link UpsertOperation} as an input
+ */
+class UpsertOperationSupplier extends BaseOperationSupplier {
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpsertOperationSupplier.class);
+
+ public UpsertOperationSupplier(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario) {
+ super(phoenixUtil, model, scenario);
+ }
+
+ @Override
+ public Function<TenantOperationInfo, OperationStats> get() {
+ return new Function<TenantOperationInfo, OperationStats>() {
+
+ @Override
+ public OperationStats apply(final TenantOperationInfo input) {
+
+ final int batchSize = loadProfile.getBatchSize();
+ final boolean useBatchApi = batchSize != 0;
+ final int rowCount = useBatchApi ? batchSize : 1;
+
+ final UpsertOperation operation = (UpsertOperation) input.getOperation();
+ final String tenantGroup = input.getTenantGroupId();
+ final String opGroup = input.getOperationGroupId();
+ final String tenantId = input.getTenantId();
+ final Upsert upsert = operation.getUpsert();
+ final String tableName = input.getTableName();
+ final String scenarioName = input.getScenarioName();
+ final List<Column> columns = upsert.getColumn();
+
+ final String opName = String.format("%s:%s:%s:%s:%s",
+ scenarioName, tableName, opGroup, tenantGroup, tenantId);
+
+ long rowsCreated = 0;
+ long startTime = 0, duration, totalDuration;
+ int status = 0;
+ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ try (Connection connection = phoenixUtil.getConnection(tenantId)) {
+ String sql = phoenixUtil.buildSql(columns, tableName);
+ startTime = EnvironmentEdgeManager.currentTimeMillis();
+ PreparedStatement stmt = null;
+ try {
+ stmt = connection.prepareStatement(sql);
+ for (long i = rowCount; i > 0; i--) {
+ LOGGER.debug("Operation " + opName + " executing ");
+ stmt = phoenixUtil.buildStatement(rulesApplier, scenario, columns, stmt, simpleDateFormat);
+ if (useBatchApi) {
+ stmt.addBatch();
+ } else {
+ rowsCreated += stmt.executeUpdate();
+ }
+ }
+ } catch (SQLException e) {
+ throw e;
+ } finally {
+ // Need to keep the statement open to send the remaining batch of updates
+ if (!useBatchApi && stmt != null) {
+ stmt.close();
+ }
+ if (connection != null) {
+ if (useBatchApi && stmt != null) {
+ int[] results = stmt.executeBatch();
+ for (int x = 0; x < results.length; x++) {
+ int result = results[x];
+ if (result < 1) {
+ final String msg =
+ "Failed to write update in batch (update count="
+ + result + ")";
+ throw new RuntimeException(msg);
+ }
+ rowsCreated += result;
+ }
+ // Close the statement after our last batch execution.
+ stmt.close();
+ }
+
+ try {
+ connection.commit();
+ duration = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+ LOGGER.info("Writer ( " + Thread.currentThread().getName()
+ + ") committed Final Batch. Duration (" + duration + ") Ms");
+ connection.close();
+ } catch (SQLException e) {
+ // Swallow since we are closing anyway
+ LOGGER.error("Error when closing/committing", e);
+ }
+ }
+ }
+ } catch (SQLException sqle) {
+ LOGGER.error("Operation " + opName + " failed with exception ", sqle);
+ status = -1;
+ } catch (Exception e) {
+ LOGGER.error("Operation " + opName + " failed with exception ", e);
+ status = -1;
+ }
+
+ totalDuration = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+ return new OperationStats(input, startTime, status, rowsCreated, totalDuration);
+ }
+ };
+ }
+}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/UserDefinedOperationSupplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/UserDefinedOperationSupplier.java
new file mode 100644
index 0000000..ae8ce6f
--- /dev/null
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/UserDefinedOperationSupplier.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Function;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.apache.phoenix.pherf.workload.mt.OperationStats;
+import org.apache.phoenix.pherf.workload.mt.UserDefinedOperation;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+/**
+ * A supplier of {@link Function} that takes {@link UserDefinedOperation} as an input
+ */
+class UserDefinedOperationSupplier extends BaseOperationSupplier {
+
+ public UserDefinedOperationSupplier(PhoenixUtil phoenixUtil, DataModel model, Scenario scenario) {
+ super(phoenixUtil, model, scenario);
+ }
+
+ @Override
+ public Function<TenantOperationInfo, OperationStats> get() {
+ return new Function<TenantOperationInfo, OperationStats>() {
+ @Override
+ public OperationStats apply(final TenantOperationInfo input) {
+ // TODO : implement user defined operation invocation.
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ long duration = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+ return new OperationStats(input, startTime,0, 0, duration);
+ }
+ };
+ }
+}
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
index 343285f..929b7fa 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ConfigurationParserTest.java
@@ -24,7 +24,9 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Set;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
import org.apache.phoenix.pherf.configuration.*;
import org.apache.phoenix.pherf.rules.DataValue;
import org.junit.Test;
@@ -43,7 +45,8 @@ public class ConfigurationParserTest extends ResultBaseTest {
@Test
public void testReadWriteWorkloadReader() throws Exception {
String scenarioName = "testScenarioRW";
- List<Scenario> scenarioList = getScenarios();
+ String testResourceName = "/scenario/test_scenario.xml";
+ List<Scenario> scenarioList = getScenarios(testResourceName);
Scenario target = null;
for (Scenario scenario : scenarioList) {
if (scenarioName.equals(scenario.getName())) {
@@ -64,10 +67,10 @@ public class ConfigurationParserTest extends ResultBaseTest {
// TODO Break this into multiple smaller tests.
public void testConfigReader() {
try {
-
+ String testResourceName = "/scenario/test_scenario.xml";
LOGGER.debug("DataModel: " + writeXML());
- List<Scenario> scenarioList = getScenarios();
- List<Column> dataMappingColumns = getDataModel().getDataMappingColumns();
+ List<Scenario> scenarioList = getScenarios(testResourceName);
+ List<Column> dataMappingColumns = getDataModel(testResourceName).getDataMappingColumns();
assertTrue("Could not load the data columns from xml.",
(dataMappingColumns != null) && (dataMappingColumns.size() > 0));
assertTrue("Could not load the data DataValue list from xml.",
@@ -122,22 +125,61 @@ public class ConfigurationParserTest extends ResultBaseTest {
}
}
- private URL getResourceUrl() {
- URL resourceUrl = getClass().getResource("/scenario/test_scenario.xml");
+ @Test
+ public void testWorkloadWithLoadProfile() throws Exception {
+ String testResourceName = "/scenario/test_workload_with_load_profile.xml";
+ Set<String> scenarioNames = Sets.newHashSet("scenario_11", "scenario_12");
+ List<Scenario> scenarioList = getScenarios(testResourceName);
+ Scenario target = null;
+ for (Scenario scenario : scenarioList) {
+ if (scenarioNames.contains(scenario.getName())) {
+ target = scenario;
+ }
+ assertNotNull("Could not find scenario: " + scenario.getName(), target);
+ }
+
+ Scenario testScenarioWithLoadProfile = scenarioList.get(0);
+ LoadProfile loadProfile = testScenarioWithLoadProfile.getLoadProfile();
+ assertEquals("batch size not as expected: ",
+ 1, loadProfile.getBatchSize());
+ assertEquals("num operations not as expected: ",
+ 1000, loadProfile.getNumOperations());
+ assertEquals("tenant group size is not as expected: ",
+ 3, loadProfile.getTenantDistribution().size());
+ assertEquals("operation group size is not as expected: ",
+ 5,loadProfile.getOpDistribution().size());
+ assertEquals("UDFs size is not as expected ",
+ 1, testScenarioWithLoadProfile.getUdfs().size());
+ assertNotNull("UDFs clazzName cannot be null ",
+ testScenarioWithLoadProfile.getUdfs().get(0).getClazzName());
+ assertEquals("UDFs args size is not as expected ",
+ 2, testScenarioWithLoadProfile.getUdfs().get(0).getArgs().size());
+ assertEquals("UpsertSet size is not as expected ",
+ 1, testScenarioWithLoadProfile.getUpserts().size());
+ assertEquals("#Column within the first upsert is not as expected ",
+ 7, testScenarioWithLoadProfile.getUpserts().get(0).getColumn().size());
+ assertEquals("QuerySet size is not as expected ",
+ 1, testScenarioWithLoadProfile.getQuerySet().size());
+ assertEquals("#Queries within the first querySet is not as expected ",
+ 2, testScenarioWithLoadProfile.getQuerySet().get(0).getQuery().size());
+ }
+
+ private URL getResourceUrl(String resourceName) {
+ URL resourceUrl = getClass().getResource(resourceName);
assertNotNull("Test data XML file is missing", resourceUrl);
return resourceUrl;
}
- private List<Scenario> getScenarios() throws Exception {
- DataModel data = getDataModel();
+ private List<Scenario> getScenarios(String resourceName) throws Exception {
+ DataModel data = getDataModel(resourceName);
List<Scenario> scenarioList = data.getScenarios();
assertTrue("Could not load the scenarios from xml.",
(scenarioList != null) && (scenarioList.size() > 0));
return scenarioList;
}
- private DataModel getDataModel() throws Exception {
- Path resourcePath = Paths.get(getResourceUrl().toURI());
+ private DataModel getDataModel(String resourceName) throws Exception {
+ Path resourcePath = Paths.get(getResourceUrl(resourceName).toURI());
return XMLConfigParser.readDataModel(resourcePath);
}
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultBaseTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultBaseTest.java
index 1853b67..531af26 100644
--- a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultBaseTest.java
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/ResultBaseTest.java
@@ -18,10 +18,15 @@
package org.apache.phoenix.pherf;
+import org.apache.commons.io.FileUtils;
import org.apache.phoenix.pherf.result.ResultUtil;
+import org.apache.phoenix.pherf.workload.mt.tenantoperation.TenantOperationIT;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
import java.util.Properties;
public class ResultBaseTest {
@@ -42,6 +47,10 @@ public class ResultBaseTest {
}
@AfterClass public static synchronized void tearDown() throws Exception {
- new ResultUtil().deleteDir(properties.getProperty("pherf.default.results.dir"));
+ try {
+ new ResultUtil().deleteDir(properties.getProperty("pherf.default.results.dir"));
+ } catch (Exception e) {
+ // swallow
+ }
}
}
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationEventGeneratorTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationEventGeneratorTest.java
new file mode 100644
index 0000000..46eaa80
--- /dev/null
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationEventGeneratorTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.pherf.XMLConfigParserTest;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.LoadProfile;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.configuration.XMLConfigParser;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the various event generation outcomes based on scenario, model and load profile.
+ */
+public class TenantOperationEventGeneratorTest {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TenantOperationEventGeneratorTest.class);
+ private enum TestOperationGroup {
+ upsertOp, queryOp1, queryOp2, idleOp, udfOp
+ }
+
+ private enum TestTenantGroup {
+ tg1, tg2, tg3
+ }
+
+ public DataModel readTestDataModel(String resourceName) throws Exception {
+ URL scenarioUrl = XMLConfigParserTest.class.getResource(resourceName);
+ assertNotNull(scenarioUrl);
+ Path p = Paths.get(scenarioUrl.toURI());
+ return XMLConfigParser.readDataModel(p);
+ }
+
+ /**
+ * Case 1 : where some operations have zero weight
+ * Case 2 : where some tenant groups have zero weight
+ * Case 3 : where no operations and tenant groups have zero weight
+ * Case 4 : where some combinations of operation and tenant groups have zero weight
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testVariousEventGeneration() throws Exception {
+ int numRuns = 10;
+ int numOperations = 100000;
+ int allowedVariance = 1500;
+ int normalizedOperations = (numOperations * numRuns) / 10000;
+ int numTenantGroups = 3;
+ int numOpGroups = 5;
+
+ PhoenixUtil pUtil = PhoenixUtil.create();
+ DataModel model = readTestDataModel("/scenario/test_evt_gen1.xml");
+ for (Scenario scenario : model.getScenarios()) {
+ LOGGER.debug(String.format("Testing %s", scenario.getName()));
+ LoadProfile loadProfile = scenario.getLoadProfile();
+ assertEquals("tenant group size is not as expected: ",
+ numTenantGroups, loadProfile.getTenantDistribution().size());
+ assertEquals("operation group size is not as expected: ",
+ numOpGroups, loadProfile.getOpDistribution().size());
+ // Calculate the expected distribution.
+ int[][] expectedDistribution = new int[numOpGroups][numTenantGroups];
+ for (int r = 0; r < numOpGroups; r++) {
+ for (int c = 0; c < numTenantGroups; c++) {
+ int tenantWeight = loadProfile.getTenantDistribution().get(c).getWeight();
+ int opWeight = loadProfile.getOpDistribution().get(r).getWeight();
+ expectedDistribution[r][c] = normalizedOperations * (tenantWeight * opWeight);
+ LOGGER.debug(String.format("Expected [%d,%d] = %d", r, c, expectedDistribution[r][c]));
+ }
+ }
+ TenantOperationFactory opFactory = new TenantOperationFactory(pUtil, model, scenario);
+
+ // Calculate the actual distribution.
+ int[][] distribution = new int[numOpGroups][numTenantGroups];
+ for (int i = 0; i < numRuns; i++) {
+ int ops = numOperations;
+ loadProfile.setNumOperations(ops);
+ TenantOperationEventGenerator evtGen = new TenantOperationEventGenerator(
+ opFactory.getOperations(), model, scenario);
+ while (ops-- > 0) {
+ TenantOperationInfo info = evtGen.next();
+ int row = TestOperationGroup.valueOf(info.getOperationGroupId()).ordinal();
+ int col = TestTenantGroup.valueOf(info.getTenantGroupId()).ordinal();
+ distribution[row][col]++;
+ }
+ }
+
+ // Validate that the expected and actual distribution
+ // is within the margin of allowed variance.
+ for (int r = 0; r < numOpGroups; r++) {
+ for (int c = 0; c < numTenantGroups; c++) {
+ LOGGER.debug(String.format("Actual[%d,%d] = %d", r, c, distribution[r][c]));
+ int diff = Math.abs(expectedDistribution[r][c] - distribution[r][c]);
+ boolean isAllowed = diff < allowedVariance;
+ assertTrue(String.format("Difference is outside the allowed variance "
+ + "[expected = %d, actual = %d]", allowedVariance, diff), isAllowed);
+
+ }
+ }
+ }
+ }
+}
diff --git a/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationFactoryTest.java b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationFactoryTest.java
new file mode 100644
index 0000000..eefee4e
--- /dev/null
+++ b/phoenix-pherf/src/test/java/org/apache/phoenix/pherf/workload/mt/tenantoperation/TenantOperationFactoryTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pherf.workload.mt.tenantoperation;
+
+import org.apache.phoenix.pherf.XMLConfigParserTest;
+import org.apache.phoenix.pherf.configuration.DataModel;
+import org.apache.phoenix.pherf.configuration.LoadProfile;
+import org.apache.phoenix.pherf.configuration.Scenario;
+import org.apache.phoenix.pherf.configuration.XMLConfigParser;
+import org.apache.phoenix.pherf.util.PhoenixUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Tests the various operation supplier outcomes based on scenario, model and load profile.
+ */
+public class TenantOperationFactoryTest {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TenantOperationFactoryTest.class);
+
+ private enum TestOperationGroup {
+ upsertOp, queryOp1, queryOp2, idleOp, udfOp
+ }
+
+ private enum TestTenantGroup {
+ tg1, tg2, tg3
+ }
+
+ public DataModel readTestDataModel(String resourceName) throws Exception {
+ URL scenarioUrl = XMLConfigParserTest.class.getResource(resourceName);
+ assertNotNull(scenarioUrl);
+ Path p = Paths.get(scenarioUrl.toURI());
+ return XMLConfigParser.readDataModel(p);
+ }
+
+ @Test public void testVariousOperations() throws Exception {
+ int numTenantGroups = 3;
+ int numOpGroups = 5;
+ int numRuns = 10;
+ int numOperations = 10;
+
+ PhoenixUtil pUtil = PhoenixUtil.create();
+ DataModel model = readTestDataModel("/scenario/test_evt_gen1.xml");
+ for (Scenario scenario : model.getScenarios()) {
+ LOGGER.debug(String.format("Testing %s", scenario.getName()));
+ LoadProfile loadProfile = scenario.getLoadProfile();
+ assertEquals("tenant group size is not as expected: ",
+ numTenantGroups, loadProfile.getTenantDistribution().size());
+ assertEquals("operation group size is not as expected: ",
+ numOpGroups, loadProfile.getOpDistribution().size());
+
+ TenantOperationFactory opFactory = new TenantOperationFactory(pUtil, model, scenario);
+ assertEquals("operation group size from the factory is not as expected: ",
+ numOpGroups, opFactory.getOperations().size());
+
+ for (int i = 0; i < numRuns; i++) {
+ int ops = numOperations;
+ loadProfile.setNumOperations(ops);
+ TenantOperationEventGenerator evtGen = new TenantOperationEventGenerator(
+ opFactory.getOperations(), model, scenario);
+ while (ops-- > 0) {
+ TenantOperationInfo info = evtGen.next();
+ switch (TestOperationGroup.valueOf(info.getOperationGroupId())) {
+ case upsertOp:
+ assertTrue(opFactory.getOperationSupplier(info).getClass()
+ .isAssignableFrom(UpsertOperationSupplier.class));
+ break;
+ case queryOp1:
+ case queryOp2:
+ assertTrue(opFactory.getOperationSupplier(info).getClass()
+ .isAssignableFrom(QueryOperationSupplier.class));
+ break;
+ case idleOp:
+ assertTrue(opFactory.getOperationSupplier(info).getClass()
+ .isAssignableFrom(IdleTimeOperationSupplier.class));
+ break;
+ case udfOp:
+ assertTrue(opFactory.getOperationSupplier(info).getClass()
+ .isAssignableFrom(UserDefinedOperationSupplier.class));
+ break;
+ default:
+ Assert.fail();
+
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/phoenix-pherf/src/test/resources/datamodel/test_schema.sql b/phoenix-pherf/src/test/resources/datamodel/test_schema.sql
index fa9952b..a5a7274 100644
--- a/phoenix-pherf/src/test/resources/datamodel/test_schema.sql
+++ b/phoenix-pherf/src/test/resources/datamodel/test_schema.sql
@@ -29,6 +29,7 @@ CREATE TABLE IF NOT EXISTS PHERF.TEST_TABLE (
DIVISION INTEGER,
OLDVAL_STRING VARCHAR,
NEWVAL_STRING VARCHAR,
+ CONNECTION_ID VARCHAR,
SOME_INT INTEGER
CONSTRAINT PK PRIMARY KEY
(
diff --git a/phoenix-pherf/src/test/resources/datamodel/test_schema.sql b/phoenix-pherf/src/test/resources/datamodel/test_schema_mt_view.sql
similarity index 63%
copy from phoenix-pherf/src/test/resources/datamodel/test_schema.sql
copy to phoenix-pherf/src/test/resources/datamodel/test_schema_mt_view.sql
index fa9952b..ad25e9b 100644
--- a/phoenix-pherf/src/test/resources/datamodel/test_schema.sql
+++ b/phoenix-pherf/src/test/resources/datamodel/test_schema_mt_view.sql
@@ -15,25 +15,13 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
*/
-CREATE TABLE IF NOT EXISTS PHERF.TEST_TABLE (
- TENANT_ID CHAR(15) NOT NULL,
- PARENT_ID CHAR(15) NOT NULL,
- CREATED_DATE DATE NOT NULL,
- NOW_DATE DATE,
- TS_DATE TIMESTAMP,
- PRESENT_DATE DATE,
- OTHER_ID CHAR(15),
- FIELD VARCHAR,
- VAR_ARRAY VARCHAR ARRAY,
- VAR_BIN VARBINARY,
- DIVISION INTEGER,
- OLDVAL_STRING VARCHAR,
- NEWVAL_STRING VARCHAR,
- SOME_INT INTEGER
+
+CREATE VIEW IF NOT EXISTS PHERF.TEST_GLOBAL_VIEW (
+ GID CHAR(15) NOT NULL,
+ FIELD1 VARCHAR,
+ OTHER_INT INTEGER
CONSTRAINT PK PRIMARY KEY
(
- TENANT_ID,
- PARENT_ID,
- CREATED_DATE DESC
+ GID
)
-) VERSIONS=1,MULTI_TENANT=true
+) AS SELECT * FROM PHERF.TEST_MULTI_TENANT_TABLE WHERE IDENTIFIER = 'EV1'
diff --git a/phoenix-pherf/src/test/resources/scenario/test_evt_gen1.xml b/phoenix-pherf/src/test/resources/scenario/test_evt_gen1.xml
new file mode 100644
index 0000000..c1c6f8e
--- /dev/null
+++ b/phoenix-pherf/src/test/resources/scenario/test_evt_gen1.xml
@@ -0,0 +1,184 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<datamodel name="model_1">
+ <datamapping>
+ <column>
+ <!-- This column type defines what will generally happen to VARCHAR fields unless they are explicitly defined or overridden elsewhere -->
+ <type>VARCHAR</type>
+ <dataSequence>RANDOM</dataSequence>
+ <length>15</length>
+ <name>GENERAL_VARCHAR</name>
+ </column>
+ </datamapping>
+ <scenarios>
+ <scenario tableName="PHERF.EVT_GEN1" name="EVT_GEN1">
+ <loadProfile>
+ <batchSize>1</batchSize>
+ <numOperations>1000</numOperations>
+ <!-- Case 1 : where some operations have zero weight -->
+ <tenantDistribution id="tg1" weight="10" numTenants="10"></tenantDistribution>
+ <tenantDistribution id="tg2" weight="10" numTenants="10"></tenantDistribution>
+ <tenantDistribution id="tg3" weight="80" numTenants="10"></tenantDistribution>
+ <opDistribution id="upsertOp" weight="50"></opDistribution>
+ <opDistribution id="queryOp1" weight="0"></opDistribution>
+ <opDistribution id="queryOp2" weight="0"></opDistribution>
+ <opDistribution id="idleOp" weight="50"></opDistribution>
+ <opDistribution id="udfOp" weight="0"></opDistribution>
+ </loadProfile>
+ <upserts>
+ <upsert id="upsertOp">
+ <column>
+ <type>CHAR</type>
+ <name>COLUMN1</name>
+ </column>
+ </upsert>
+ </upserts>
+
+ <querySet>
+ <query id="queryOp1" statement="select count(*) from PHERF.Z11"/>
+ <query id="queryOp2" statement="select sum(SOME_INT) from PHERF.Z11"/>
+ </querySet>
+ <idleTimes>
+ <idleTime id="idleOp" idleTime="50"></idleTime>
+ </idleTimes>
+ <udfs>
+ <udf id="udfOp" >
+ <clazzName>org.apache.phoenix.pherf.ConfigurationParserTest.TestUDF</clazzName>
+ <args>Hello</args>
+ <args>World</args>
+ </udf>
+ </udfs>
+ </scenario>
+ <scenario tableName="PHERF.EVT_GEN2" name="EVT_GEN2">
+ <loadProfile>
+ <batchSize>1</batchSize>
+ <numOperations>1000</numOperations>
+ <!-- Case 3 : where some tenant groups have zero weight -->
+ <tenantDistribution id="tg1" weight="0" numTenants="10"></tenantDistribution>
+ <tenantDistribution id="tg2" weight="0" numTenants="10"></tenantDistribution>
+ <tenantDistribution id="tg3" weight="100" numTenants="10"></tenantDistribution>
+ <opDistribution id="upsertOp" weight="20"></opDistribution>
+ <opDistribution id="queryOp1" weight="20"></opDistribution>
+ <opDistribution id="queryOp2" weight="20"></opDistribution>
+ <opDistribution id="idleOp" weight="20"></opDistribution>
+ <opDistribution id="udfOp" weight="20"></opDistribution>
+ </loadProfile>
+ <upserts>
+ <upsert id="upsertOp">
+ <column>
+ <type>CHAR</type>
+ <name>COLUMN1</name>
+ </column>
+ </upsert>
+ </upserts>
+
+ <querySet>
+ <query id="queryOp1" statement="select count(*) from PHERF.Z12"/>
+ <query id="queryOp2" statement="select sum(SOME_INT) from PHERF.Z12"/>
+ </querySet>
+ <idleTimes>
+ <idleTime id="idleOp" idleTime="50"></idleTime>
+ </idleTimes>
+ <udfs>
+ <udf id="udfOp" >
+ <clazzName>org.apache.phoenix.pherf.ConfigurationParserTest.TestUDF</clazzName>
+ <args>Hello</args>
+ <args>World</args>
+ </udf>
+ </udfs>
+ </scenario>
+ <scenario tableName="PHERF.EVT_GEN3" name="EVT_GEN3">
+ <loadProfile>
+ <batchSize>1</batchSize>
+ <numOperations>1000</numOperations>
+ <!-- Case 2 : where no operations and tenant groups have zero weight -->
+ <tenantDistribution id="tg1" weight="30" numTenants="10"></tenantDistribution>
+ <tenantDistribution id="tg2" weight="30" numTenants="10"></tenantDistribution>
+ <tenantDistribution id="tg3" weight="40" numTenants="10"></tenantDistribution>
+ <opDistribution id="upsertOp" weight="20"></opDistribution>
+ <opDistribution id="queryOp1" weight="20"></opDistribution>
+ <opDistribution id="queryOp2" weight="20"></opDistribution>
+ <opDistribution id="idleOp" weight="20"></opDistribution>
+ <opDistribution id="udfOp" weight="20"></opDistribution>
+ </loadProfile>
+ <upserts>
+ <upsert id="upsertOp">
+ <column>
+ <type>CHAR</type>
+ <name>COLUMN1</name>
+ </column>
+ </upsert>
+ </upserts>
+
+ <querySet>
+ <query id="queryOp1" statement="select count(*) from PHERF.Z13"/>
+ <query id="queryOp2" statement="select sum(SOME_INT) from PHERF.Z13"/>
+ </querySet>
+ <idleTimes>
+ <idleTime id="idleOp" idleTime="50"></idleTime>
+ </idleTimes>
+ <udfs>
+ <udf id="udfOp" >
+ <clazzName>org.apache.phoenix.pherf.ConfigurationParserTest.TestUDF</clazzName>
+ <args>Hello</args>
+ <args>World</args>
+ </udf>
+ </udfs>
+ </scenario>
+ <scenario tableName="PHERF.EVT_GEN4" name="EVT_GEN4">
+ <loadProfile>
+ <batchSize>1</batchSize>
+ <numOperations>1000</numOperations>
+ <!-- Case 4 : where some combinations of operation and tenant groups have zero weight -->
+ <tenantDistribution id="tg1" weight="25" numTenants="10"></tenantDistribution>
+ <tenantDistribution id="tg2" weight="0" numTenants="10"></tenantDistribution>
+ <tenantDistribution id="tg3" weight="75" numTenants="10"></tenantDistribution>
+ <opDistribution id="upsertOp" weight="40"></opDistribution>
+ <opDistribution id="queryOp1" weight="0"></opDistribution>
+ <opDistribution id="queryOp2" weight="40"></opDistribution>
+ <opDistribution id="idleOp" weight="20"></opDistribution>
+ <opDistribution id="udfOp" weight="0"></opDistribution>
+ </loadProfile>
+ <upserts>
+ <upsert id="upsertOp">
+ <column>
+ <type>CHAR</type>
+ <name>COLUMN1</name>
+ </column>
+ </upsert>
+ </upserts>
+
+ <querySet>
+ <query id="queryOp1" statement="select count(*) from PHERF.Z14"/>
+ <query id="queryOp2" statement="select sum(SOME_INT) from PHERF.Z14"/>
+ </querySet>
+ <idleTimes>
+ <idleTime id="idleOp" idleTime="50"></idleTime>
+ </idleTimes>
+ <udfs>
+ <udf id="udfOp" >
+ <clazzName>org.apache.phoenix.pherf.ConfigurationParserTest.TestUDF</clazzName>
+ <args>Hello</args>
+ <args>World</args>
+ </udf>
+ </udfs>
+ </scenario>
+ </scenarios>
+</datamodel>
diff --git a/phoenix-pherf/src/test/resources/scenario/test_mt_workload.xml b/phoenix-pherf/src/test/resources/scenario/test_mt_workload.xml
new file mode 100644
index 0000000..d3b83a2
--- /dev/null
+++ b/phoenix-pherf/src/test/resources/scenario/test_mt_workload.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+ <datamodel name="model_1">
+ <datamapping>
+ <column>
+ <!-- This column type defines what will generally happen to VARCHAR fields unless they are explicitly defined or overridden elsewhere -->
+ <type>VARCHAR</type>
+ <dataSequence>RANDOM</dataSequence>
+ <length>15</length>
+ <name>GENERAL_VARCHAR</name>
+ </column>
+ <column>
+ <type>CHAR</type>
+ <userDefined>true</userDefined>
+ <dataSequence>RANDOM</dataSequence>
+ <length>15</length>
+ <name>GENERAL_CHAR</name>
+ </column>
+ <column>
+ <type>INTEGER</type>
+ <dataSequence>RANDOM</dataSequence>
+ <minValue>1</minValue>
+ <maxValue>50000000</maxValue>
+ <!-- Number [0-100] that represents the probability of creating a null value -->
+ <!-- The higher the number, the more like the value will returned will be null -->
+ <!-- Leaving this tag out is equivalent to having a 0 probability. i.e. never null -->
+ <nullChance>0</nullChance>
+ <name>GENERAL_INTEGER</name>
+ </column>
+ <column>
+ <type>CHAR</type>
+ <length>3</length>
+ <userDefined>true</userDefined>
+ <dataSequence>LIST</dataSequence>
+ <name>TYPE</name>
+ <valuelist>
+ <!-- Distributes according to specified values. These must total 100 -->
+ <datavalue distribution="60">
+ <value>ABC</value>
+ </datavalue>
+ <datavalue distribution="20">
+ <value>XYZ</value>
+ </datavalue>
+ <datavalue distribution="20">
+ <value>LMN</value>
+ </datavalue>
+ </valuelist>
+ </column>
+ </datamapping>
+ <scenarios>
+ <scenario tableName="PHERF.EVT_1" name="EVT_1">
+ <loadProfile>
+ <batchSize>1</batchSize>
+ <numOperations>10</numOperations>
+ <!-- Case 1 : Upsert Operation test -->
+ <tenantDistribution id="tg1" weight="100" numTenants="1"></tenantDistribution>
+ <tenantDistribution id="tg2" weight="0" numTenants="0"></tenantDistribution>
+ <tenantDistribution id="tg3" weight="0" numTenants="0"></tenantDistribution>
+ <opDistribution id="upsertOp" weight="20"></opDistribution>
+ <opDistribution id="queryOp1" weight="20"></opDistribution>
+ <opDistribution id="queryOp2" weight="20"></opDistribution>
+ <opDistribution id="idleOp" weight="20"></opDistribution>
+ <opDistribution id="udfOp" weight="20"></opDistribution>
+ </loadProfile>
+ <preScenarioDdls>
+ <ddl statement="CREATE VIEW IF NOT EXISTS PHERF.EVT_1 (ZID CHAR(15), TYPE VARCHAR) AS SELECT * FROM PHERF.TEST_GLOBAL_VIEW" />
+ </preScenarioDdls>
+
+ <upserts>
+ <upsert id="upsertOp">
+ <column>
+ <type>CHAR</type>
+ <name>ID</name>
+ </column>
+ <column>
+ <type>INTEGER</type>
+ <name>SOME_INT</name>
+ </column>
+ <column>
+ <type>CHAR</type>
+ <name>GID</name>
+ </column>
+ <column>
+ <type>VARCHAR</type>
+ <name>FIELD1</name>
+ </column>
+ <column>
+ <type>INTEGER</type>
+ <name>OTHER_INT</name>
+ </column>
+ <column>
+ <type>CHAR</type>
+ <name>ZID</name>
+ </column>
+ <column>
+ <type>CHAR</type>
+ <name>TYPE</name>
+ </column>
+ </upsert>
+ </upserts>
+
+ <querySet>
+ <query id="queryOp1" statement="select count(*) from PHERF.EVT_1"/>
+ <query id="queryOp2" statement="select * from PHERF.EVT_1"/>
+ </querySet>
+ <idleTimes>
+ <idleTime id="idleOp" idleTime="50"></idleTime>
+ </idleTimes>
+ <udfs>
+ <udf id="udfOp" >
+ <clazzName>org.apache.phoenix.pherf.ConfigurationParserTest.TestUDF</clazzName>
+ <args>Hello</args>
+ <args>World</args>
+ </udf>
+ </udfs>
+ </scenario>
+ </scenarios>
+</datamodel>
diff --git a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
index 8b4762e..853b857 100644
--- a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
+++ b/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
@@ -262,7 +262,7 @@
-->
<threadSleepDuration>10</threadSleepDuration>
- <batchSize>1000</batchSize>
+ <batchSize>1</batchSize>
</writeParams>
<querySet concurrency="1" executionType="PARALLEL" executionDurationInMs="10000">
<query id="q3" statement="select count(*) from PHERF.TEST_TABLE"/>
diff --git a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml b/phoenix-pherf/src/test/resources/scenario/test_workload_with_load_profile.xml
similarity index 60%
copy from phoenix-pherf/src/test/resources/scenario/test_scenario.xml
copy to phoenix-pherf/src/test/resources/scenario/test_workload_with_load_profile.xml
index 8b4762e..855c1fe 100644
--- a/phoenix-pherf/src/test/resources/scenario/test_scenario.xml
+++ b/phoenix-pherf/src/test/resources/scenario/test_workload_with_load_profile.xml
@@ -17,7 +17,7 @@
~ limitations under the License.
-->
-<datamodel name="test_scenario">
+<datamodel name="model_1">
<datamapping>
<column>
<!-- This column type defines what will generally happen to VARCHAR fields unless they are explicitly defined or overridden elsewhere -->
@@ -196,7 +196,7 @@
<datavalue distribution="20">
<value>LMN</value>
</datavalue>
- </valuelist>
+ </valuelist>
</column>
<column>
<type>CHAR</type>
@@ -215,12 +215,12 @@
<prefix>VBOxx00</prefix>
</column>
<column>
- <type>VARCHAR</type>
- <userDefined>true</userDefined>
- <dataSequence>SEQUENTIAL</dataSequence>
- <length>1</length>
- <name>FIELD</name>
- </column>
+ <type>VARCHAR</type>
+ <userDefined>true</userDefined>
+ <dataSequence>SEQUENTIAL</dataSequence>
+ <length>1</length>
+ <name>FIELD</name>
+ </column>
<column>
<type>INTEGER</type>
<dataSequence>SEQUENTIAL</dataSequence>
@@ -230,123 +230,141 @@
</column>
</datamapping>
<scenarios>
- <scenario tableName="PHERF.TEST_TABLE" rowCount="100" name="testScenarioRW">
- <!-- Scenario level rule overrides will be unsupported in V1.
- You can use the general datamappings in the mean time-->
- <dataOverride>
- <column>
- <type>VARCHAR</type>
- <userDefined>true</userDefined>
- <dataSequence>RANDOM</dataSequence>
- <length>5</length>
- <name>FIELD</name>
- </column>
- </dataOverride>
+ <scenario tableName="PHERF.Z11" name="scenario_11">
+ <loadProfile>
+ <batchSize>1</batchSize>
+ <numOperations>1000</numOperations>
+ <tenantDistribution id="t111" weight="10" numTenants="10"></tenantDistribution>
+ <tenantDistribution id="t112" weight="10" numTenants="10"></tenantDistribution>
+ <tenantDistribution id="t113" weight="80" numTenants="1"></tenantDistribution>
+ <opDistribution id="op111" weight="50"></opDistribution>
+ <opDistribution id="op112" weight="0"></opDistribution>
+ <opDistribution id="op113" weight="0"></opDistribution>
+ <opDistribution id="op114" weight="50"></opDistribution>
+ <opDistribution id="op115" weight="0"></opDistribution>
+ </loadProfile>
- <!--
- This is used to add mixed R/W workloads.
- If this tag exists, a writer pool will be created based on the below properties.
- These props will override the default values in pherf.properties, but only for this
- scenario.The write jobs will run in conjunction with the querySet below.
- -->
- <writeParams executionDurationInMs="10000">
- <!--
- Number of writer it insert into the threadpool
- -->
- <writerThreadCount>2</writerThreadCount>
+ <preScenarioDdls>
+ <ddl statement="CREATE VIEW IF NOT EXISTS PHERF.Z11 (field1 VARCHAR, field2 VARCHAR) AS SELECT * FROM PHERF.TEST_TABLE" />
+ </preScenarioDdls>
- <!--
- Time in Ms that each thread will sleep between batch writes. This helps to
- throttle writers.
- -->
- <threadSleepDuration>10</threadSleepDuration>
+ <upserts>
+ <upsert id="op111">
+ <column>
+ <type>CHAR</type>
+ <name>PARENT_ID</name>
+ </column>
+ <column>
+ <type>DATE</type>
+ <name>CREATED_DATE</name>
+ </column>
+ <column>
+ <type>VARCHAR</type>
+ <name>FIELD</name>
+ </column>
+ <column>
+ <type>VARCHAR</type>
+ <name>OTHER_ID</name>
+ </column>
+ <column>
+ <type>VARCHAR</type>
+ <name>OLDVAL_STRING</name>
+ </column>
+ <column>
+ <type>VARCHAR</type>
+ <name>NEWVAL_STRING</name>
+ </column>
+ <column>
+ <type>VARCHAR</type>
+ <name>FIELD1</name>
+ </column>
+ </upsert>
+ </upserts>
- <batchSize>1000</batchSize>
- </writeParams>
- <querySet concurrency="1" executionType="PARALLEL" executionDurationInMs="10000">
- <query id="q3" statement="select count(*) from PHERF.TEST_TABLE"/>
- <query id="q4" statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
+ <idleTimes>
+ <idleTime id="op114" idleTime="50"></idleTime>
+ </idleTimes>
+ <udfs>
+ <udf id="op115" >
+ <clazzName>org.apache.phoenix.pherf.ConfigurationParserTest.TestUDF</clazzName>
+ <args>Hello</args>
+ <args>World</args>
+ </udf>
+ </udfs>
+ <querySet>
+ <query id="op112" statement="select count(*) from PHERF.Z11"/>
+ <query id="op113" statement="select sum(SOME_INT) from PHERF.Z11"/>
</querySet>
</scenario>
+ <scenario tableName="PHERF.Z12" name="scenario_12">
+ <loadProfile>
+ <batchSize>5</batchSize>
+ <numOperations>1000</numOperations>
+ <tenantDistribution id="t121" weight="10" numTenants="5"></tenantDistribution>
+ <tenantDistribution id="t122" weight="10" numTenants="5"></tenantDistribution>
+ <tenantDistribution id="t123" weight="80" numTenants="5"></tenantDistribution>
+ <opDistribution id="op121" weight="50"></opDistribution>
+ <opDistribution id="op122" weight="5"></opDistribution>
+ <opDistribution id="op123" weight="5"></opDistribution>
+ <opDistribution id="op124" weight="40"></opDistribution>
+ <opDistribution id="op125" weight="0"></opDistribution>
+ </loadProfile>
- <scenario tableName="PHERF.TEST_TABLE" rowCount="30" name="testScenario">
- <!-- Scenario level rule overrides will be unsupported in V1.
- You can use the general datamappings in the mean time-->
- <dataOverride>
- <column>
- <type>VARCHAR</type>
- <userDefined>true</userDefined>
- <dataSequence>RANDOM</dataSequence>
- <length>10</length>
- <name>FIELD</name>
- </column>
- </dataOverride>
-
- <!--Note: 1. Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first
- 2. DDL included in query are executed only once on start of querySet execution.
- -->
- <querySet concurrency="1-3" executionType="SERIAL" executionDurationInMs="5000"
- numberOfExecutions="100">
- <query id="q1" tenantId="123456789012345" expectedAggregateRowCount="0"
- statement="select count(*) from PHERF.TEST_TABLE"/>
- <!-- queryGroup is a way to organize queries across tables or scenario files.
- The value will be dumped to results. This gives a value to group by on reporting to compare queries -->
- <query id="q2" queryGroup="g1"
- statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
- </querySet>
- <!--Minimum of executionDurationInMs or numberOfExecutions. Which ever is reached first -->
- <querySet concurrency="2-3" executionType="PARALLEL" executionDurationInMs="10000"
- numberOfExecutions="10">
- <query id="q3" statement="select count(*) from PHERF.TEST_TABLE"/>
- <query id="q4" statement="select sum(SOME_INT) from PHERF.TEST_TABLE"/>
- </querySet>
- </scenario>
-
- <scenario tableName="PHERF.TEST_TABLE" rowCount="99" name="testPreAndPostDdls">
<preScenarioDdls>
- <ddl statement="CREATE INDEX IDX_DIVISION ON ? (DIVISION)" tableName="PHERF.TEST_TABLE"/>
+ <ddl statement="CREATE VIEW IF NOT EXISTS PHERF.Z12 (field1 VARCHAR, field2 VARCHAR) AS SELECT * FROM PHERF.TEST_TABLE" />
</preScenarioDdls>
-
- <postScenarioDdls>
- <ddl statement="CREATE INDEX IDX_OLDVAL_STRING ON ? (OLDVAL_STRING)" tableName="PHERF.TEST_TABLE"/>
- <ddl statement="CREATE INDEX IDX_CONNECTION_ID ON ? (CONNECTION_ID)" tableName="PHERF.TEST_TABLE"/>
- </postScenarioDdls>
-
- <querySet concurrency="1" executionType="SERIAL" executionDurationInMs="5000"
- numberOfExecutions="1">
- <query id="q1" expectedAggregateRowCount="99" statement="select count(*) from PHERF.TEST_TABLE"/>
+
+ <upserts>
+ <upsert id="op121">
+ <column>
+ <type>CHAR</type>
+ <name>PARENT_ID</name>
+ </column>
+ <column>
+ <type>DATE</type>
+ <name>CREATED_DATE</name>
+ </column>
+ <column>
+ <type>VARCHAR</type>
+ <name>FIELD</name>
+ </column>
+ <column>
+ <type>VARCHAR</type>
+ <name>OTHER_ID</name>
+ </column>
+ <column>
+ <type>VARCHAR</type>
+ <name>OLDVAL_STRING</name>
+ </column>
+ <column>
+ <type>VARCHAR</type>
+ <name>NEWVAL_STRING</name>
+ </column>
+ <column>
+ <type>VARCHAR</type>
+ <name>FIELD1</name>
+ </column>
+ </upsert>
+ </upserts>
+
+ <idleTimes>
+ <idleTime id="op124" idleTime="100"></idleTime>
+ </idleTimes>
+ <querySet>
+ <query id="op122" statement="select count(*) from PHERF.Z12"/>
+ <query id="op123" statement="select sum(SOME_INT) from PHERF.Z12"/>
</querySet>
+ <udfs>
+ <udf id="op125" >
+ <clazzName>org.apache.phoenix.pherf.ConfigurationParserTest.TestUDF</clazzName>
+ <args>Hello</args>
+ <args>World</args>
+ </udf>
+ </udfs>
+
</scenario>
-
- <!-- To configure a Write Workload to write to a tenant specific view users need to
- specify the tenantId attribute on the scenario, specifying the tenant they
- want to write data for as the attribute value. This tells Pherf to take out a
- tenant-specific connection for executing the write workload.
- The name of the tenant specific view to write to can then be specified as the value of
- the tablename attribute. This assumes the tenant specific view has been created. To
- dynamically create the view see comments below with regard to the ddl attribute.
- -->
- <scenario tableName="PHERF.TEST_VIEW" tenantId="xyzdefghijklmno"
- rowCount="100" name="testMTWriteScenario">
- <preScenarioDdls>
- <ddl statement="CREATE VIEW IF NOT EXISTS PHERF.TEST_VIEW (field1 VARCHAR, field2 VARCHAR) AS SELECT * FROM PHERF.TEST_MULTI_TENANT_TABLE" />
- </preScenarioDdls>
- </scenario>
- <!-- Scenario level DDL that is dynamically executed before the Write Workload is run.
- This pattern is really useful when you want to write data to multi-tenant view and the tenant id is
- tightly bound to the scenario. In such cases you can't create the view through the data model flow.
- The value of the tableName attribute is name of the view that is dynamically created based on the DDL
- in the ddl attribute. Queries accessing the View will need to manually make sure Pherf was run with the -l option at
- least once.
- -->
- <scenario tableName="PHERF.TEST_MT_VIEW" tenantId="abcdefghijklmno"
- rowCount="100" name="testMTDdlWriteScenario">
- <preScenarioDdls>
- <ddl statement="CREATE VIEW IF NOT EXISTS PHERF.TEST_MT_VIEW (field1 VARCHAR) AS SELECT * FROM PHERF.TEST_MULTI_TENANT_TABLE" />
- </preScenarioDdls>
- </scenario>
-
+
</scenarios>
</datamodel>