You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vj...@apache.org on 2021/11/11 14:48:13 UTC

[phoenix] branch 4.x updated: PHOENIX-6561 : Allow pherf to intake phoenix Connection properties as argument. (#1346)

This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 1d92f32  PHOENIX-6561 : Allow pherf to intake phoenix Connection properties as argument. (#1346)
1d92f32 is described below

commit 1d92f32ca312ccecf97390f3b5327a186fd811f2
Author: Lokesh Khurana <kh...@gmail.com>
AuthorDate: Thu Nov 11 20:18:08 2021 +0530

    PHOENIX-6561 : Allow pherf to intake phoenix Connection properties as argument. (#1346)
    
    Signed-off-by: jpisaac <ja...@gmail.com>
---
 .../org/apache/phoenix/pherf/DataIngestIT.java     | 17 +++++++++---
 .../main/java/org/apache/phoenix/pherf/Pherf.java  | 30 ++++++++++++++-------
 .../org/apache/phoenix/pherf/util/PhoenixUtil.java | 26 +++++++++++++-----
 .../pherf/workload/MultiThreadedRunner.java        |  5 +++-
 .../phoenix/pherf/workload/WriteWorkload.java      | 31 +++++++++++-----------
 5 files changed, 72 insertions(+), 37 deletions(-)

diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java
index 1eb2d9b..23807ca 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java
@@ -30,6 +30,7 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.pherf.PherfConstants.GeneratePhoenixStats;
@@ -54,6 +55,8 @@ import com.jcabi.jdbc.Outcome;
 @Category(NeedsOwnMiniClusterTest.class)
 public class DataIngestIT extends ResultBaseTestIT {
 
+    private Properties properties;
+
     @Before
     public void applySchema() throws Exception {
         reader.applySchema();
@@ -61,6 +64,8 @@ public class DataIngestIT extends ResultBaseTestIT {
 
         assertTrue("Could not pull list of schema files.", resources.size() > 0);
         assertNotNull("Could not read schema file.", reader.resourceToString(resources.get(0)));
+        properties = PherfConstants.create().
+                getProperties(PherfConstants.PHERF_PROPERTIES, true);
     }
 
     @Test
@@ -75,7 +80,8 @@ public class DataIngestIT extends ResultBaseTestIT {
                             scenario.getTableNameWithoutSchemaName(), util.getConnection());
             assertTrue("Could not get phoenix columns.", columnListFromPhoenix.size() > 0);
 
-            WriteWorkload loader = new WriteWorkload(util, parser, scenario, GeneratePhoenixStats.NO);
+            WriteWorkload loader = new WriteWorkload(util, parser, properties, scenario,
+                    GeneratePhoenixStats.NO);
             WorkloadExecutor executor = new WorkloadExecutor();
             executor.add(loader);
             executor.get();
@@ -122,7 +128,8 @@ public class DataIngestIT extends ResultBaseTestIT {
     public void testPreAndPostDataLoadDdls() throws Exception {
         Scenario scenario = parser.getScenarioByName("testPreAndPostDdls");
         WorkloadExecutor executor = new WorkloadExecutor();
-        executor.add(new WriteWorkload(util, parser, scenario, GeneratePhoenixStats.NO));
+        executor.add(new WriteWorkload(util, parser, properties,
+                scenario, GeneratePhoenixStats.NO));
         
         try {
             executor.get();
@@ -185,7 +192,8 @@ public class DataIngestIT extends ResultBaseTestIT {
         // Arrange
         Scenario scenario = parser.getScenarioByName("testMTWriteScenario");
         WorkloadExecutor executor = new WorkloadExecutor();
-        executor.add(new WriteWorkload(util, parser, scenario, GeneratePhoenixStats.NO));
+        executor.add(new WriteWorkload(util, parser, properties,
+                scenario, GeneratePhoenixStats.NO));
         
         // Act
         try {
@@ -205,7 +213,8 @@ public class DataIngestIT extends ResultBaseTestIT {
         // Arrange
         Scenario scenario = parser.getScenarioByName("testMTDdlWriteScenario");
         WorkloadExecutor executor = new WorkloadExecutor();
-        executor.add(new WriteWorkload(util, parser, scenario, GeneratePhoenixStats.NO));
+        executor.add(new WriteWorkload(util, parser, properties,
+                scenario, GeneratePhoenixStats.NO));
         
         // Act
         try {
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
index 7b0a5e8..4853bc3 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
@@ -110,7 +110,7 @@ public class Pherf {
     private final String scenarioName;
     private final String schemaFile;
     private final String queryHint;
-    private final Properties properties;
+    private final Properties globalProperties;
     private final boolean preLoadData;
     private final boolean multiTenantWorkload;
     private final String dropPherfTablesRegEx;
@@ -127,10 +127,18 @@ public class Pherf {
     private final CompareType compareType;
     private final boolean thinDriver;
     private final String queryServerUrl;
+    private Properties properties = new Properties();
 
     @VisibleForTesting
     WorkloadExecutor workloadExecutor;
 
+    public Pherf(String[] args, Properties connProperties) throws Exception {
+        this(args);
+        //merging global and connection properties into properties.
+        if (connProperties != null)
+            this.properties.putAll(connProperties);
+    }
+
     public Pherf(String[] args) throws Exception {
         CommandLineParser parser = new DefaultParser(false, false);
         CommandLine command = null;
@@ -144,19 +152,19 @@ public class Pherf {
             System.exit(1);
         }
 
-        properties = PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES, false);
+        globalProperties = PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES, false);
         dropPherfTablesRegEx = command.getOptionValue("drop", null);
         monitor = command.hasOption("m");
         String
                 monitorFrequency =
                 (command.hasOption("m") && command.hasOption("monitorFrequency")) ?
                         command.getOptionValue("monitorFrequency") :
-                        properties.getProperty("pherf.default.monitorFrequency");
-        properties.setProperty("pherf.default.monitorFrequency", monitorFrequency);
+                        globalProperties.getProperty("pherf.default.monitorFrequency");
+        globalProperties.setProperty("pherf.default.monitorFrequency", monitorFrequency);
 
         LOGGER.debug("Using Monitor: " + monitor);
         LOGGER.debug("Monitor Frequency Ms:" + monitorFrequency);
-        properties.setProperty(PherfConstants.LOG_PER_NROWS_NAME, getLogPerNRow(command));
+        globalProperties.setProperty(PherfConstants.LOG_PER_NROWS_NAME, getLogPerNRow(command));
 
         preLoadData = command.hasOption("l");
         multiTenantWorkload = command.hasOption("mt");
@@ -177,8 +185,8 @@ public class Pherf {
         String
                 writerThreadPoolSize =
                 command.getOptionValue("writerThreadSize",
-                        properties.getProperty("pherf.default.dataloader.threadpool"));
-        properties.setProperty("pherf.default.dataloader.threadpool", writerThreadPoolSize);
+                        globalProperties.getProperty("pherf.default.dataloader.threadpool"));
+        globalProperties.setProperty("pherf.default.dataloader.threadpool", writerThreadPoolSize);
         label = command.getOptionValue("label", null);
         compareResults = command.getOptionValue("compare", null);
         compareType = command.hasOption("useAverageCompareType") ? CompareType.AVERAGE : CompareType.MINIMUM;
@@ -209,13 +217,14 @@ public class Pherf {
             PhoenixUtil.useThinDriver(queryServerUrl);
         }
         ResultUtil.setFileSuffix(label);
+        this.properties.putAll(globalProperties);
     }
 
     private String getLogPerNRow(CommandLine command) {
         try {
             String logPerNRows = (command.hasOption("log_per_nrows")) ?
                     command.getOptionValue("log_per_nrows") :
-                    properties.getProperty(
+                    globalProperties.getProperty(
                             PherfConstants.LOG_PER_NROWS_NAME,
                             String.valueOf(PherfConstants.LOG_PER_NROWS)
                     );
@@ -303,7 +312,7 @@ public class Pherf {
             if (monitor) {
                 monitorManager =
                         new MonitorManager(Integer.parseInt(
-                                properties.getProperty("pherf.default.monitorFrequency")));
+                                globalProperties.getProperty("pherf.default.monitorFrequency")));
                 workloadExecutor.add(monitorManager);
             }
 
@@ -324,7 +333,8 @@ public class Pherf {
                             }
                         }
                     } else {
-                        newWorkloads.add(new WriteWorkload(parser, generateStatistics));
+                        newWorkloads.add(new WriteWorkload(parser, properties,
+                                generateStatistics));
                     }
 
                     if (newWorkloads.isEmpty()) {
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
index fd53b97..78f3117 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java
@@ -114,12 +114,18 @@ public class PhoenixUtil {
     public Connection getConnection(String tenantId) throws Exception {
         return getConnection(tenantId, testEnabled, null);
     }
+
+    public Connection getConnection(String tenantId,
+                                    Properties properties) throws  Exception {
+        Map<String, String> propertyHashMap = getPropertyHashMap(properties);
+        return getConnection(tenantId, testEnabled, propertyHashMap);
+    }
     
-    public Connection getConnection(String tenantId, Map<String, String> phoenixProperty) throws Exception {
-        return getConnection(tenantId, testEnabled, phoenixProperty);
+    public Connection getConnection(String tenantId, Map<String, String> propertyHashMap) throws Exception {
+        return getConnection(tenantId, testEnabled, propertyHashMap);
     }
 
-    public Connection getConnection(String tenantId, boolean testEnabled, Map<String, String> phoenixProperty) throws Exception {
+    public Connection getConnection(String tenantId, boolean testEnabled, Map<String, String> propertyHashMap) throws Exception {
         if (useThinDriver) {
             if (null == queryServerUrl) {
                 throw new IllegalArgumentException("QueryServer URL must be set before" +
@@ -143,10 +149,10 @@ public class PhoenixUtil {
                 LOGGER.debug("\nSetting tenantId to " + tenantId);
             }
             
-            if (phoenixProperty != null) {
-            	for (Map.Entry<String, String> phxProperty: phoenixProperty.entrySet()) {
+            if (propertyHashMap != null) {
+            	for (Map.Entry<String, String> phxProperty: propertyHashMap.entrySet()) {
             		props.setProperty(phxProperty.getKey(), phxProperty.getValue());
-					System.out.println("Setting connection property "
+					LOGGER.debug("Setting connection property "
 							+ phxProperty.getKey() + " to "
 							+ phxProperty.getValue());
             	}
@@ -157,6 +163,14 @@ public class PhoenixUtil {
         }
     }
 
+    private Map<String, String> getPropertyHashMap(Properties props) {
+        Map<String, String> propsMaps = new HashMap<>();
+        for (String prop : props.stringPropertyNames()) {
+            propsMaps.put(prop, props.getProperty(prop));
+        }
+        return propsMaps;
+    }
+
     public boolean executeStatement(String sql, Scenario scenario) throws Exception {
         Connection connection = null;
         boolean result = false;
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
index 5d4b973..e944116 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
@@ -25,6 +25,7 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.concurrent.Callable;
 
+import org.apache.phoenix.pherf.PherfConstants;
 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.pherf.result.DataModelResult;
@@ -164,7 +165,9 @@ class MultiThreadedRunner implements Callable<Void> {
             LOGGER.debug("Executing iteration: " + queryIteration + ": " + statementString);
             
             if (scenario.getWriteParams() != null) {
-            	Workload writes = new WriteWorkload(PhoenixUtil.create(), parser, scenario, GeneratePhoenixStats.NO);
+            	Workload writes = new WriteWorkload(PhoenixUtil.create(), parser, PherfConstants.create().
+                                getProperties(PherfConstants.PHERF_PROPERTIES, true),
+                        scenario, GeneratePhoenixStats.NO);
             	workloadExecutor.add(writes);
             }
             
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
index b6a5ac6..4dce55e 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
@@ -65,6 +65,7 @@ public class WriteWorkload implements Workload {
     private final WriteParams writeParams;
     private final Scenario scenario;
     private final long threadSleepDuration;
+    private final Properties properties;
 
     private final int threadPoolSize;
     private final int batchSize;
@@ -72,22 +73,18 @@ public class WriteWorkload implements Workload {
     private final boolean useBatchApi;
 
     public WriteWorkload(XMLConfigParser parser) throws Exception {
-        this(PhoenixUtil.create(), parser, GeneratePhoenixStats.NO);
+        this(PhoenixUtil.create(), parser, PherfConstants.create().
+                getProperties(PherfConstants.PHERF_PROPERTIES, true), GeneratePhoenixStats.NO);
     }
     
-    public WriteWorkload(XMLConfigParser parser, GeneratePhoenixStats generateStatistics) throws Exception {
-        this(PhoenixUtil.create(), parser, generateStatistics);
+    public WriteWorkload(XMLConfigParser parser, Properties properties,
+                         GeneratePhoenixStats generateStatistics) throws Exception {
+        this(PhoenixUtil.create(), parser, properties, generateStatistics);
     }
 
-    public WriteWorkload(PhoenixUtil util, XMLConfigParser parser, GeneratePhoenixStats generateStatistics) throws Exception {
-        this(util, parser, null, generateStatistics);
-    }
-
-    public WriteWorkload(PhoenixUtil phoenixUtil, XMLConfigParser parser, Scenario scenario, GeneratePhoenixStats generateStatistics)
-            throws Exception {
-        this(phoenixUtil,
-                PherfConstants.create().getProperties(PherfConstants.PHERF_PROPERTIES, true),
-                parser, scenario, generateStatistics);
+    public WriteWorkload(PhoenixUtil util, XMLConfigParser parser, Properties properties,
+                         GeneratePhoenixStats generateStatistics) throws Exception {
+        this(util, parser, properties, null, generateStatistics);
     }
 
     /**
@@ -97,19 +94,21 @@ public class WriteWorkload implements Workload {
      * TODO extract notion of the scenario list and have 1 write workload per scenario
      *
      * @param phoenixUtil {@link org.apache.phoenix.pherf.util.PhoenixUtil} Query helper
-     * @param properties  {@link java.util.Properties} default properties to use
      * @param parser      {@link org.apache.phoenix.pherf.configuration.XMLConfigParser}
+     * @param properties  {@link java.util.Properties} default properties to use
      * @param scenario    {@link org.apache.phoenix.pherf.configuration.Scenario} If null is passed
      *                    it will run against all scenarios in the parsers list.
      * @throws Exception
      */
-    public WriteWorkload(PhoenixUtil phoenixUtil, Properties properties, XMLConfigParser parser,
-            Scenario scenario, GeneratePhoenixStats generateStatistics) throws Exception {
+    public WriteWorkload(PhoenixUtil phoenixUtil, XMLConfigParser parser,
+                         Properties properties, Scenario scenario,
+                         GeneratePhoenixStats generateStatistics) throws Exception {
         this.pUtil = phoenixUtil;
         this.parser = parser;
         this.rulesApplier = new RulesApplier(parser);
         this.resultUtil = new ResultUtil();
         this.generateStatistics = generateStatistics;
+        this.properties = properties;
         int size = Integer.parseInt(properties.getProperty("pherf.default.dataloader.threadpool"));
         
         // Overwrite defaults properties with those given in the configuration. This indicates the
@@ -262,7 +261,7 @@ public class WriteWorkload implements Workload {
                 Connection connection = null;
                 PreparedStatement stmt = null;
                 try {
-                    connection = pUtil.getConnection(scenario.getTenantId());
+                    connection = pUtil.getConnection(scenario.getTenantId(), properties);
                     long logStartTime = EnvironmentEdgeManager.currentTimeMillis();
                     long maxDuration = (WriteWorkload.this.writeParams == null) ? Long.MAX_VALUE :
                         WriteWorkload.this.writeParams.getExecutionDurationInMs();