You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vj...@apache.org on 2021/11/11 14:48:13 UTC
[phoenix] branch 4.x updated: PHOENIX-6561 : Allow pherf to intake
phoenix Connection properties as argument. (#1346)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 1d92f32 PHOENIX-6561 : Allow pherf to intake phoenix Connection properties as argument. (#1346)
1d92f32 is described below
commit 1d92f32ca312ccecf97390f3b5327a186fd811f2
Author: Lokesh Khurana <kh...@gmail.com>
AuthorDate: Thu Nov 11 20:18:08 2021 +0530
PHOENIX-6561 : Allow pherf to intake phoenix Connection properties as argument. (#1346)
Signed-off-by: jpisaac <ja...@gmail.com>
---
.../org/apache/phoenix/pherf/DataIngestIT.java | 17 +++++++++---
.../main/java/org/apache/phoenix/pherf/Pherf.java | 30 ++++++++++++++-------
.../org/apache/phoenix/pherf/util/PhoenixUtil.java | 26 +++++++++++++-----
.../pherf/workload/MultiThreadedRunner.java | 5 +++-
.../phoenix/pherf/workload/WriteWorkload.java | 31 +++++++++++-----------
5 files changed, 72 insertions(+), 37 deletions(-)
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java
index 1eb2d9b..23807ca 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java
@@ -30,6 +30,7 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.pherf.PherfConstants.GeneratePhoenixStats;
@@ -54,6 +55,8 @@ import com.jcabi.jdbc.Outcome;
@Category(NeedsOwnMiniClusterTest.class)
public class DataIngestIT extends ResultBaseTestIT {
+ private Properties properties;
+
@Before
public void applySchema() throws Exception {
reader.applySchema();
@@ -61,6 +64,8 @@ public class DataIngestIT extends ResultBaseTestIT {
assertTrue("Could not pull list of schema files.", resources.size() > 0);
assertNotNull("Could not read schema file.", reader.resourceToString(resources.get(0)));
+ properties = PherfConstants.create().
+ getProperties(PherfConstants.PHERF_PROPERTIES, true);
}
@Test
@@ -75,7 +80,8 @@ public class DataIngestIT extends ResultBaseTestIT {
scenario.getTableNameWithoutSchemaName(), util.getConnection());
assertTrue("Could not get phoenix columns.", columnListFromPhoenix.size() > 0);
- WriteWorkload loader = new WriteWorkload(util, parser, scenario, GeneratePhoenixStats.NO);
+ WriteWorkload loader = new WriteWorkload(util, parser, properties, scenario,
+ GeneratePhoenixStats.NO);
WorkloadExecutor executor = new WorkloadExecutor();
executor.add(loader);
executor.get();
@@ -122,7 +128,8 @@ public class DataIngestIT extends ResultBaseTestIT {
public void testPreAndPostDataLoadDdls() throws Exception {
Scenario scenario = parser.getScenarioByName("testPreAndPostDdls");
WorkloadExecutor executor = new WorkloadExecutor();
- executor.add(new WriteWorkload(util, parser, scenario, GeneratePhoenixStats.NO));
+ executor.add(new WriteWorkload(util, parser, properties,
+ scenario, GeneratePhoenixStats.NO));
try {
executor.get();
@@ -185,7 +192,8 @@ public class DataIngestIT extends ResultBaseTestIT {
// Arrange
Scenario scenario = parser.getScenarioByName("testMTWriteScenario");
WorkloadExecutor executor = new WorkloadExecutor();
- executor.add(new WriteWorkload(util, parser, scenario, GeneratePhoenixStats.NO));
+ executor.add(new WriteWorkload(util, parser, properties,
+ scenario, GeneratePhoenixStats.NO));
// Act
try {
@@ -205,7 +213,8 @@ public class DataIngestIT extends ResultBaseTestIT {
// Arrange
Scenario scenario = parser.getScenarioByName("testMTDdlWriteScenario");
WorkloadExecutor executor = new WorkloadExecutor();
- executor.add(new WriteWorkload(util, parser, scenario, GeneratePhoenixStats.NO));
+ executor.add(new WriteWorkload(util, parser, properties,
+ scenario, GeneratePhoenixStats.NO));
// Act
try {
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
index 7b0a5e8..4853bc3 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
@@ -110,7 +110,7 @@ public class Pherf {
private final String scenarioName;
private final String schemaFile;
private final String queryHint;
- private final Properties properties;
+ private final Properties globalProperties;
private final boolean preLoadData;
private final boolean multiTenantWorkload;
private final String dropPherfTablesRegEx;
@@ -127,10 +127,18 @@ public class Pherf {
private final CompareType compareType;
private final boolean thinDriver;
private final String queryServerUrl;
+ private Properties properties = new Properties();
@VisibleForTesting
WorkloadExecutor workloadExecutor;
+ public Pherf(String[] args, Properties connProperties) throws Exception {
+ this(args);
+ //merging global and connection properties into properties.
+ if (connProperties != null)
+ this.properties.putAll(connProperties);
+ }
+
public Pherf(String[] args) throws Exception {
CommandLineParser parser = new DefaultParser(false, false);
CommandLine command = null;
@@ -144,19 +152,19 @@ public class Pherf {
System.exit(1);
}
- properties = PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES, false);
+ globalProperties = PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES, false);
dropPherfTablesRegEx = command.getOptionValue("drop", null);
monitor = command.hasOption("m");
String
monitorFrequency =
(command.hasOption("m") && command.hasOption("monitorFrequency")) ?
command.getOptionValue("monitorFrequency") :
- properties.getProperty("pherf.default.monitorFrequency");
- properties.setProperty("pherf.default.monitorFrequency", monitorFrequency);
+ globalProperties.getProperty("pherf.default.monitorFrequency");
+ globalProperties.setProperty("pherf.default.monitorFrequency", monitorFrequency);
LOGGER.debug("Using Monitor: " + monitor);
LOGGER.debug("Monitor Frequency Ms:" + monitorFrequency);
- properties.setProperty(PherfConstants.LOG_PER_NROWS_NAME, getLogPerNRow(command));
+ globalProperties.setProperty(PherfConstants.LOG_PER_NROWS_NAME, getLogPerNRow(command));
preLoadData = command.hasOption("l");
multiTenantWorkload = command.hasOption("mt");
@@ -177,8 +185,8 @@ public class Pherf {
String
writerThreadPoolSize =
command.getOptionValue("writerThreadSize",
- properties.getProperty("pherf.default.dataloader.threadpool"));
- properties.setProperty("pherf.default.dataloader.threadpool", writerThreadPoolSize);
+ globalProperties.getProperty("pherf.default.dataloader.threadpool"));
+ globalProperties.setProperty("pherf.default.dataloader.threadpool", writerThreadPoolSize);
label = command.getOptionValue("label", null);
compareResults = command.getOptionValue("compare", null);
compareType = command.hasOption("useAverageCompareType") ? CompareType.AVERAGE : CompareType.MINIMUM;
@@ -209,13 +217,14 @@ public class Pherf {
PhoenixUtil.useThinDriver(queryServerUrl);
}
ResultUtil.setFileSuffix(label);
+ this.properties.putAll(globalProperties);
}
private String getLogPerNRow(CommandLine command) {
try {
String logPerNRows = (command.hasOption("log_per_nrows")) ?
command.getOptionValue("log_per_nrows") :
- properties.getProperty(
+ globalProperties.getProperty(
PherfConstants.LOG_PER_NROWS_NAME,
String.valueOf(PherfConstants.LOG_PER_NROWS)
);
@@ -303,7 +312,7 @@ public class Pherf {
if (monitor) {
monitorManager =
new MonitorManager(Integer.parseInt(
- properties.getProperty("pherf.default.monitorFrequency")));
+ globalProperties.getProperty("pherf.default.monitorFrequency")));
workloadExecutor.add(monitorManager);
}
@@ -324,7 +333,8 @@ public class Pherf {
}
}
} else {
- newWorkloads.add(new WriteWorkload(parser, generateStatistics));
+ newWorkloads.add(new WriteWorkload(parser, properties,
+ generateStatistics));
}
if (newWorkloads.isEmpty()) {
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
index fd53b97..78f3117 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
@@ -114,12 +114,18 @@ public class PhoenixUtil {
public Connection getConnection(String tenantId) throws Exception {
return getConnection(tenantId, testEnabled, null);
}
+
+ public Connection getConnection(String tenantId,
+ Properties properties) throws Exception {
+ Map<String, String> propertyHashMap = getPropertyHashMap(properties);
+ return getConnection(tenantId, testEnabled, propertyHashMap);
+ }
- public Connection getConnection(String tenantId, Map<String, String> phoenixProperty) throws Exception {
- return getConnection(tenantId, testEnabled, phoenixProperty);
+ public Connection getConnection(String tenantId, Map<String, String> propertyHashMap) throws Exception {
+ return getConnection(tenantId, testEnabled, propertyHashMap);
}
- public Connection getConnection(String tenantId, boolean testEnabled, Map<String, String> phoenixProperty) throws Exception {
+ public Connection getConnection(String tenantId, boolean testEnabled, Map<String, String> propertyHashMap) throws Exception {
if (useThinDriver) {
if (null == queryServerUrl) {
throw new IllegalArgumentException("QueryServer URL must be set before" +
@@ -143,10 +149,10 @@ public class PhoenixUtil {
LOGGER.debug("\nSetting tenantId to " + tenantId);
}
- if (phoenixProperty != null) {
- for (Map.Entry<String, String> phxProperty: phoenixProperty.entrySet()) {
+ if (propertyHashMap != null) {
+ for (Map.Entry<String, String> phxProperty: propertyHashMap.entrySet()) {
props.setProperty(phxProperty.getKey(), phxProperty.getValue());
- System.out.println("Setting connection property "
+ LOGGER.debug("Setting connection property "
+ phxProperty.getKey() + " to "
+ phxProperty.getValue());
}
@@ -157,6 +163,14 @@ public class PhoenixUtil {
}
}
+ private Map<String, String> getPropertyHashMap(Properties props) {
+ Map<String, String> propsMaps = new HashMap<>();
+ for (String prop : props.stringPropertyNames()) {
+ propsMaps.put(prop, props.getProperty(prop));
+ }
+ return propsMaps;
+ }
+
public boolean executeStatement(String sql, Scenario scenario) throws Exception {
Connection connection = null;
boolean result = false;
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
index 5d4b973..e944116 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
@@ -25,6 +25,7 @@ import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.Callable;
+import org.apache.phoenix.pherf.PherfConstants;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.pherf.result.DataModelResult;
@@ -164,7 +165,9 @@ class MultiThreadedRunner implements Callable<Void> {
LOGGER.debug("Executing iteration: " + queryIteration + ": " + statementString);
if (scenario.getWriteParams() != null) {
- Workload writes = new WriteWorkload(PhoenixUtil.create(), parser, scenario, GeneratePhoenixStats.NO);
+ Workload writes = new WriteWorkload(PhoenixUtil.create(), parser, PherfConstants.create().
+ getProperties(PherfConstants.PHERF_PROPERTIES, true),
+ scenario, GeneratePhoenixStats.NO);
workloadExecutor.add(writes);
}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
index b6a5ac6..4dce55e 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
@@ -65,6 +65,7 @@ public class WriteWorkload implements Workload {
private final WriteParams writeParams;
private final Scenario scenario;
private final long threadSleepDuration;
+ private final Properties properties;
private final int threadPoolSize;
private final int batchSize;
@@ -72,22 +73,18 @@ public class WriteWorkload implements Workload {
private final boolean useBatchApi;
public WriteWorkload(XMLConfigParser parser) throws Exception {
- this(PhoenixUtil.create(), parser, GeneratePhoenixStats.NO);
+ this(PhoenixUtil.create(), parser, PherfConstants.create().
+ getProperties(PherfConstants.PHERF_PROPERTIES, true), GeneratePhoenixStats.NO);
}
- public WriteWorkload(XMLConfigParser parser, GeneratePhoenixStats generateStatistics) throws Exception {
- this(PhoenixUtil.create(), parser, generateStatistics);
+ public WriteWorkload(XMLConfigParser parser, Properties properties,
+ GeneratePhoenixStats generateStatistics) throws Exception {
+ this(PhoenixUtil.create(), parser, properties, generateStatistics);
}
- public WriteWorkload(PhoenixUtil util, XMLConfigParser parser, GeneratePhoenixStats generateStatistics) throws Exception {
- this(util, parser, null, generateStatistics);
- }
-
- public WriteWorkload(PhoenixUtil phoenixUtil, XMLConfigParser parser, Scenario scenario, GeneratePhoenixStats generateStatistics)
- throws Exception {
- this(phoenixUtil,
- PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES, true),
- parser, scenario, generateStatistics);
+ public WriteWorkload(PhoenixUtil util, XMLConfigParser parser, Properties properties,
+ GeneratePhoenixStats generateStatistics) throws Exception {
+ this(util, parser, properties, null, generateStatistics);
}
/**
@@ -97,19 +94,21 @@ public class WriteWorkload implements Workload {
* TODO extract notion of the scenario list and have 1 write workload per scenario
*
* @param phoenixUtil {@link org.apache.phoenix.pherf.util.PhoenixUtil} Query helper
- * @param properties {@link java.util.Properties} default properties to use
* @param parser {@link org.apache.phoenix.pherf.configuration.XMLConfigParser}
+ * @param properties {@link java.util.Properties} default properties to use
* @param scenario {@link org.apache.phoenix.pherf.configuration.Scenario} If null is passed
* it will run against all scenarios in the parsers list.
* @throws Exception
*/
- public WriteWorkload(PhoenixUtil phoenixUtil, Properties properties, XMLConfigParser parser,
- Scenario scenario, GeneratePhoenixStats generateStatistics) throws Exception {
+ public WriteWorkload(PhoenixUtil phoenixUtil, XMLConfigParser parser,
+ Properties properties, Scenario scenario,
+ GeneratePhoenixStats generateStatistics) throws Exception {
this.pUtil = phoenixUtil;
this.parser = parser;
this.rulesApplier = new RulesApplier(parser);
this.resultUtil = new ResultUtil();
this.generateStatistics = generateStatistics;
+ this.properties = properties;
int size = Integer.parseInt(properties.getProperty("pherf.default.dataloader.threadpool"));
// Overwrite defaults properties with those given in the configuration. This indicates the
@@ -262,7 +261,7 @@ public class WriteWorkload implements Workload {
Connection connection = null;
PreparedStatement stmt = null;
try {
- connection = pUtil.getConnection(scenario.getTenantId());
+ connection = pUtil.getConnection(scenario.getTenantId(), properties);
long logStartTime = EnvironmentEdgeManager.currentTimeMillis();
long maxDuration = (WriteWorkload.this.writeParams == null) ? Long.MAX_VALUE :
WriteWorkload.this.writeParams.getExecutionDurationInMs();