You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2019/06/07 00:42:20 UTC
[phoenix] branch master updated: PHOENIX-5316 Use callable instead
of runnable so that Pherf exceptions cause tests to fail
This is an automated email from the ASF dual-hosted git repository.
tdsilva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 13fd777 PHOENIX-5316 Use callable instead of runnable so that Pherf exceptions cause tests to fail
13fd777 is described below
commit 13fd7776d7629c222af63d9a4f38a426fc5ed163
Author: Thomas D'Silva <td...@apache.org>
AuthorDate: Tue Jun 4 16:46:43 2019 -0700
PHOENIX-5316 Use callable instead of runnable so that Pherf exceptions cause tests to fail
---
.../java/org/apache/phoenix/pherf/PherfMainIT.java | 17 +++++--
.../main/java/org/apache/phoenix/pherf/Pherf.java | 7 ++-
.../pherf/configuration/DataTypeMapping.java | 4 +-
.../phoenix/pherf/configuration/Scenario.java | 2 +
.../pherf/configuration/XMLConfigParser.java | 3 +-
.../apache/phoenix/pherf/jmx/MonitorManager.java | 9 ++--
.../apache/phoenix/pherf/rules/RulesApplier.java | 13 +++--
.../pherf/workload/MultiThreadedRunner.java | 27 ++++++-----
.../pherf/workload/MultithreadedDiffer.java | 6 ++-
.../phoenix/pherf/workload/QueryExecutor.java | 56 ++++++++++------------
.../apache/phoenix/pherf/workload/Workload.java | 4 +-
.../phoenix/pherf/workload/WorkloadExecutor.java | 5 +-
.../phoenix/pherf/workload/WriteWorkload.java | 53 ++++++++++++--------
.../scenario/prod_test_unsalted_scenario.xml | 6 +--
14 files changed, 129 insertions(+), 83 deletions(-)
diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
index 2407ef4..6dc900e 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
@@ -22,15 +22,24 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.ExpectedSystemExit;
+import java.util.concurrent.Future;
+
public class PherfMainIT extends ResultBaseTestIT {
@Rule
public final ExpectedSystemExit exit = ExpectedSystemExit.none();
@Test
- public void testPherfMain() {
- String[] args = { "-q",
- "--scenarioFile", ".*prod_test_unsalted_scenario.*",
+ public void testPherfMain() throws Exception {
+ String[] args = { "-q", "-l",
+ "--schemaFile", ".*create_prod_test_unsalted.sql",
+ "--scenarioFile", ".*prod_test_unsalted_scenario.xml",
"-m", "--monitorFrequency", "10" };
- Pherf.main(args);
+ Pherf pherf = new Pherf(args);
+ pherf.run();
+
+ // verify that none of the scenarios threw any exceptions
+ for (Future<Void> future : pherf.workloadExecutor.jobs.values()) {
+ future.get();
+ }
}
}
\ No newline at end of file
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
index d92ffde..51d6743 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
@@ -24,6 +24,8 @@ import java.util.Collection;
import java.util.List;
import java.util.Properties;
+import com.google.common.annotations.VisibleForTesting;
+import jline.internal.TestAccessible;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@@ -115,6 +117,9 @@ public class Pherf {
private final boolean thinDriver;
private final String queryServerUrl;
+ @VisibleForTesting
+ WorkloadExecutor workloadExecutor;
+
public Pherf(String[] args) throws Exception {
CommandLineParser parser = new PosixParser();
CommandLine command = null;
@@ -201,7 +206,7 @@ public class Pherf {
public void run() throws Exception {
MonitorManager monitorManager = null;
List<Workload> workloads = new ArrayList<>();
- WorkloadExecutor workloadExecutor = new WorkloadExecutor(properties, workloads, !isFunctional);
+ workloadExecutor = new WorkloadExecutor(properties, workloads, !isFunctional);
try {
if (listFiles) {
ResourceList list = new ResourceList(PherfConstants.RESOURCE_DATAMODEL);
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
index 0476df2..129bdc2 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
@@ -29,7 +29,9 @@ public enum DataTypeMapping {
UNSIGNED_LONG("UNSIGNED_LONG", Types.LONGVARCHAR),
VARCHAR_ARRAY("VARCHAR ARRAY", Types.ARRAY),
VARBINARY("VARBINARY", Types.VARBINARY),
- TIMESTAMP("TIMESTAMP", Types.TIMESTAMP);
+ TIMESTAMP("TIMESTAMP", Types.TIMESTAMP),
+ BIGINT("BIGINT", Types.BIGINT),
+ TINYINT("TINYINT", Types.TINYINT);
private final String sType;
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
index c867ae1..513445e 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
@@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
+import com.google.common.base.Preconditions;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.phoenix.pherf.util.PhoenixUtil;
@@ -161,6 +162,7 @@ public class Scenario {
*/
@XmlAttribute()
public String getName() {
+ Preconditions.checkNotNull(name);
return name;
}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java
index a0ee471..8f2a1d8 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java
@@ -96,7 +96,8 @@ public class XMLConfigParser {
scenarios.add(scenario);
}
} catch (JAXBException e) {
- e.printStackTrace();
+ logger.error("Unable to parse scenario file "+path, e);
+ throw e;
}
}
return scenarios;
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java
index bb29902..5c434d8 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java
@@ -35,6 +35,7 @@ import javax.management.StandardMBean;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.*;
+import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -91,9 +92,9 @@ public class MonitorManager implements Workload {
this.shouldStop.set(true);
}
- @Override public Runnable execute() {
- return new Runnable() {
- @Override public void run() {
+ @Override public Callable<Void> execute() {
+ return new Callable<Void>() {
+ @Override public Void call() throws Exception {
try {
while (!shouldStop()) {
isRunning.set(true);
@@ -131,6 +132,7 @@ public class MonitorManager implements Workload {
} catch (Exception e) {
Thread.currentThread().interrupt();
e.printStackTrace();
+ throw e;
}
}
}
@@ -144,6 +146,7 @@ public class MonitorManager implements Workload {
throw new FileLoaderRuntimeException("Could not close monitor results.", e);
}
}
+ return null;
}
};
}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
index da0a172..662037e 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
public class RulesApplier {
@@ -228,24 +229,30 @@ public class RulesApplier {
data = new DataValue(column.getType(), String.valueOf(dbl));
}
break;
+ case TINYINT:
case INTEGER:
if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
data = pickDataValueFromList(dataValues);
} else {
int minInt = (int) column.getMinValue();
int maxInt = (int) column.getMaxValue();
- Preconditions.checkArgument((minInt > 0) && (maxInt > 0), "min and max values need to be set in configuration for integers " + column.getName());
- int intVal = RandomUtils.nextInt(minInt, maxInt);
+ if (column.getType() == DataTypeMapping.TINYINT) {
+ Preconditions.checkArgument((minInt >= -128) && (minInt <= 128), "min value need to be set in configuration for tinyints " + column.getName());
+ Preconditions.checkArgument((maxInt >= -128) && (maxInt <= 128), "max value need to be set in configuration for tinyints " + column.getName());
+ }
+ int intVal = ThreadLocalRandom.current().nextInt(minInt, maxInt + 1);
data = new DataValue(column.getType(), String.valueOf(intVal));
}
break;
+ case BIGINT:
case UNSIGNED_LONG:
if ((column.getDataValues() != null) && (column.getDataValues().size() > 0)) {
data = pickDataValueFromList(dataValues);
} else {
long minLong = column.getMinValue();
long maxLong = column.getMaxValue();
- Preconditions.checkArgument((minLong > 0) && (maxLong > 0), "min and max values need to be set in configuration for unsigned_longs " + column.getName());
+ if (column.getType() == DataTypeMapping.UNSIGNED_LONG)
+ Preconditions.checkArgument((minLong > 0) && (maxLong > 0), "min and max values need to be set in configuration for unsigned_longs " + column.getName());
long longVal = RandomUtils.nextLong(minLong, maxLong);
data = new DataValue(column.getType(), String.valueOf(longVal));
}
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
index 7b9313f..4423bbd 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
@@ -23,6 +23,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Calendar;
import java.util.Date;
+import java.util.concurrent.Callable;
import org.apache.phoenix.pherf.result.DataModelResult;
import org.apache.phoenix.pherf.result.ResultManager;
@@ -38,7 +39,7 @@ import org.apache.phoenix.pherf.configuration.WriteParams;
import org.apache.phoenix.pherf.configuration.XMLConfigParser;
import org.apache.phoenix.pherf.util.PhoenixUtil;
-class MultiThreadedRunner implements Runnable {
+class MultiThreadedRunner implements Callable<Void> {
private static final Logger logger = LoggerFactory.getLogger(MultiThreadedRunner.class);
private Query query;
private ThreadTime threadTime;
@@ -85,29 +86,28 @@ class MultiThreadedRunner implements Runnable {
* Executes run for a minimum of number of execution or execution duration
*/
@Override
- public void run() {
+ public Void call() throws Exception {
logger.info("\n\nThread Starting " + threadName + " ; " + query.getStatement() + " for "
+ numberOfExecutions + "times\n\n");
Long start = System.currentTimeMillis();
for (long i = numberOfExecutions; (i > 0 && ((System.currentTimeMillis() - start)
< executionDurationInMs)); i--) {
- try {
- synchronized (resultManager) {
- timedQuery();
- if ((System.currentTimeMillis() - lastResultWritten) > 1000) {
- resultManager.write(dataModelResult, ruleApplier);
- lastResultWritten = System.currentTimeMillis();
- }
+ synchronized (workloadExecutor) {
+ timedQuery();
+ if ((System.currentTimeMillis() - lastResultWritten) > 1000) {
+ resultManager.write(dataModelResult, ruleApplier);
+ lastResultWritten = System.currentTimeMillis();
}
- } catch (Exception e) {
- e.printStackTrace();
}
}
// Make sure all result have been dumped before exiting
- resultManager.flush();
+ synchronized (workloadExecutor) {
+ resultManager.flush();
+ }
logger.info("\n\nThread exiting." + threadName + "\n\n");
+ return null;
}
private synchronized ThreadTime getThreadTime() {
@@ -165,8 +165,9 @@ class MultiThreadedRunner implements Runnable {
conn.commit();
}
} catch (Exception e) {
- e.printStackTrace();
+ logger.error("Exception while executing query", e);
exception = e.getMessage();
+ throw e;
} finally {
getThreadTime().getRunTimesInMs().add(new RunTime(exception, startDate, resultRowCount,
(int) (System.currentTimeMillis() - start)));
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
index decff51..6e828bd 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.pherf.workload;
import java.util.Calendar;
import java.util.Date;
+import java.util.concurrent.Callable;
import org.apache.phoenix.pherf.PherfConstants;
import org.apache.phoenix.pherf.configuration.Query;
@@ -29,7 +30,7 @@ import org.apache.phoenix.pherf.util.PhoenixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class MultithreadedDiffer implements Runnable {
+class MultithreadedDiffer implements Callable<Void> {
private static final Logger logger = LoggerFactory.getLogger(MultiThreadedRunner.class);
private Thread t;
private Query query;
@@ -80,7 +81,7 @@ class MultithreadedDiffer implements Runnable {
/**
* Executes verification runs for a minimum of number of execution or execution duration
*/
- public void run() {
+ public Void call() throws Exception {
logger.info("\n\nThread Starting " + t.getName() + " ; " + query.getStatement() + " for "
+ numberOfExecutions + "times\n\n");
Long start = System.currentTimeMillis();
@@ -93,5 +94,6 @@ class MultithreadedDiffer implements Runnable {
}
}
logger.info("\n\nThread exiting." + t.getName() + "\n\n");
+ return null;
}
}
\ No newline at end of file
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
index 8d0ced5..c4a3517 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -78,16 +79,16 @@ public class QueryExecutor implements Workload {
* @throws Exception
*/
@Override
- public Runnable execute() throws Exception {
- Runnable runnable = null;
+ public Callable<Void> execute() throws Exception {
+ Callable<Void> callable = null;
for (DataModel dataModel : dataModels) {
if (exportCSV) {
- runnable = exportAllScenarios(dataModel);
+ callable = exportAllScenarios(dataModel);
} else {
- runnable = executeAllScenarios(dataModel);
+ callable = executeAllScenarios(dataModel);
}
}
- return runnable;
+ return callable;
}
/**
@@ -96,12 +97,11 @@ public class QueryExecutor implements Workload {
* @param dataModel
* @throws Exception
*/
- protected Runnable exportAllScenarios(final DataModel dataModel) throws Exception {
- return new Runnable() {
+ protected Callable<Void> exportAllScenarios(final DataModel dataModel) throws Exception {
+ return new Callable<Void>() {
@Override
- public void run() {
+ public Void call() throws Exception {
try {
-
List<Scenario> scenarios = dataModel.getScenarios();
QueryVerifier exportRunner = new QueryVerifier(false);
for (Scenario scenario : scenarios) {
@@ -113,8 +113,10 @@ public class QueryExecutor implements Workload {
}
}
} catch (Exception e) {
- logger.warn("", e);
+ logger.error("Scenario throws exception", e);
+ throw e;
}
+ return null;
}
};
}
@@ -125,9 +127,9 @@ public class QueryExecutor implements Workload {
* @param dataModel
* @throws Exception
*/
- protected Runnable executeAllScenarios(final DataModel dataModel) throws Exception {
- return new Runnable() {
- @Override public void run() {
+ protected Callable<Void> executeAllScenarios(final DataModel dataModel) throws Exception {
+ return new Callable<Void>() {
+ @Override public Void call() throws Exception {
List<DataModelResult> dataModelResults = new ArrayList<>();
DataModelResult
dataModelResult =
@@ -163,8 +165,10 @@ public class QueryExecutor implements Workload {
resultManager.write(dataModelResults, ruleApplier);
resultManager.flush();
} catch (Exception e) {
- logger.warn("", e);
+ logger.error("Scenario throws exception", e);
+ throw e;
}
+ return null;
}
};
}
@@ -179,7 +183,7 @@ public class QueryExecutor implements Workload {
* @throws InterruptedException
*/
protected void executeQuerySetSerial(DataModelResult dataModelResult, QuerySet querySet,
- QuerySetResult querySetResult, Scenario scenario) throws InterruptedException {
+ QuerySetResult querySetResult, Scenario scenario) throws ExecutionException, InterruptedException {
for (Query query : querySet.getQuery()) {
QueryResult queryResult = new QueryResult(query);
querySetResult.getQueryResults().add(queryResult);
@@ -190,7 +194,7 @@ public class QueryExecutor implements Workload {
for (int i = 0; i < cr; i++) {
- Runnable
+ Callable
thread =
executeRunner((i + 1) + "," + cr, dataModelResult, queryResult,
querySetResult, scenario);
@@ -198,11 +202,7 @@ public class QueryExecutor implements Workload {
}
for (Future thread : threads) {
- try {
- thread.get();
- } catch (ExecutionException e) {
- logger.error("", e);
- }
+ thread.get();
}
}
}
@@ -217,7 +217,7 @@ public class QueryExecutor implements Workload {
* @throws InterruptedException
*/
protected void executeQuerySetParallel(DataModelResult dataModelResult, QuerySet querySet,
- QuerySetResult querySetResult, Scenario scenario) throws InterruptedException {
+ QuerySetResult querySetResult, Scenario scenario) throws ExecutionException, InterruptedException {
for (int cr = querySet.getMinConcurrency(); cr <= querySet.getMaxConcurrency(); cr++) {
List<Future> threads = new ArrayList<>();
for (int i = 0; i < cr; i++) {
@@ -225,7 +225,7 @@ public class QueryExecutor implements Workload {
QueryResult queryResult = new QueryResult(query);
querySetResult.getQueryResults().add(queryResult);
- Runnable
+ Callable<Void>
thread =
executeRunner((i + 1) + "," + cr, dataModelResult, queryResult,
querySetResult, scenario);
@@ -233,11 +233,7 @@ public class QueryExecutor implements Workload {
}
for (Future thread : threads) {
- try {
- thread.get();
- } catch (ExecutionException e) {
- logger.error("", e);
- }
+ thread.get();
}
}
}
@@ -253,14 +249,14 @@ public class QueryExecutor implements Workload {
* @param scenario
* @return
*/
- protected Runnable executeRunner(String name, DataModelResult dataModelResult,
+ protected Callable<Void> executeRunner(String name, DataModelResult dataModelResult,
QueryResult queryResult, QuerySet querySet, Scenario scenario) {
ThreadTime threadTime = new ThreadTime();
queryResult.getThreadTimes().add(threadTime);
threadTime.setThreadName(name);
queryResult.setHint(this.queryHint);
logger.info("\nExecuting query " + queryResult.getStatement());
- Runnable thread;
+ Callable<Void> thread;
if (workloadExecutor.isPerformance()) {
thread =
new MultiThreadedRunner(threadTime.getThreadName(), queryResult,
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
index 882fa50..0532201 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
@@ -17,8 +17,10 @@
*/
package org.apache.phoenix.pherf.workload;
+import java.util.concurrent.Callable;
+
public interface Workload {
- public Runnable execute() throws Exception;
+ public Callable<Void> execute() throws Exception;
/**
* Use this method to perform any cleanup or forced shutdown of the thread.
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
index 3cde7ae..4abb574 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
@@ -18,6 +18,8 @@
package org.apache.phoenix.pherf.workload;
+import com.google.common.annotations.VisibleForTesting;
+import jline.internal.TestAccessible;
import org.apache.phoenix.pherf.PherfConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +36,8 @@ public class WorkloadExecutor {
private final boolean isPerformance;
// Jobs can be accessed by multiple threads
- private final Map<Workload, Future> jobs = new ConcurrentHashMap<>();
+ @VisibleForTesting
+ public final Map<Workload, Future> jobs = new ConcurrentHashMap<>();
private final ExecutorService pool;
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
index b340a2b..cae223c 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
@@ -151,9 +151,9 @@ public class WriteWorkload implements Workload {
pool.shutdownNow();
}
- public Runnable execute() throws Exception {
- return new Runnable() {
- @Override public void run() {
+ public Callable<Void> execute() throws Exception {
+ return new Callable<Void>() {
+ @Override public Void call() throws Exception {
try {
DataLoadTimeSummary dataLoadTimeSummary = new DataLoadTimeSummary();
DataLoadThreadTime dataLoadThreadTime = new DataLoadThreadTime();
@@ -169,8 +169,10 @@ public class WriteWorkload implements Workload {
resultUtil.write(dataLoadThreadTime);
} catch (Exception e) {
- logger.warn("", e);
+ logger.error("WriteWorkLoad failed", e);
+ throw e;
}
+ return null;
}
};
}
@@ -292,21 +294,17 @@ public class WriteWorkload implements Workload {
rowsCreated += result;
}
}
- try {
- connection.commit();
- duration = System.currentTimeMillis() - last;
- logger.info("Writer (" + Thread.currentThread().getName()
- + ") committed Batch. Total " + getBatchSize()
- + " rows for this thread (" + this.hashCode() + ") in ("
- + duration + ") Ms");
-
- if (i % PherfConstants.LOG_PER_NROWS == 0 && i != 0) {
- dataLoadThreadTime.add(tableName,
- Thread.currentThread().getName(), i,
- System.currentTimeMillis() - logStartTime);
- }
- } catch (SQLException e) {
- logger.warn("SQLException in commit operation", e);
+ connection.commit();
+ duration = System.currentTimeMillis() - last;
+ logger.info("Writer (" + Thread.currentThread().getName()
+ + ") committed Batch. Total " + getBatchSize()
+ + " rows for this thread (" + this.hashCode() + ") in ("
+ + duration + ") Ms");
+
+ if (i % PherfConstants.LOG_PER_NROWS == 0 && i != 0) {
+ dataLoadThreadTime.add(tableName,
+ Thread.currentThread().getName(), i,
+ System.currentTimeMillis() - logStartTime);
}
logStartTime = System.currentTimeMillis();
@@ -317,6 +315,7 @@ public class WriteWorkload implements Workload {
}
}
} catch (SQLException e) {
+ logger.error("Scenario " + scenario.getName() + " failed with exception ", e);
throw e;
} finally {
// Need to keep the statement open to send the remaining batch of updates
@@ -396,11 +395,25 @@ public class WriteWorkload implements Workload {
break;
case UNSIGNED_LONG:
if (dataValue.getValue().equals("")) {
- statement.setNull(count, Types.LONGVARCHAR);
+ statement.setNull(count, Types.OTHER);
+ } else {
+ statement.setLong(count, Long.parseLong(dataValue.getValue()));
+ }
+ break;
+ case BIGINT:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.BIGINT);
} else {
statement.setLong(count, Long.parseLong(dataValue.getValue()));
}
break;
+ case TINYINT:
+ if (dataValue.getValue().equals("")) {
+ statement.setNull(count, Types.TINYINT);
+ } else {
+ statement.setLong(count, Integer.parseInt(dataValue.getValue()));
+ }
+ break;
case DATE:
if (dataValue.getValue().equals("")) {
statement.setNull(count, Types.DATE);
diff --git a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
index 1c32b75..fb89ef3 100644
--- a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
+++ b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
@@ -316,12 +316,12 @@
</dataOverride>
<preScenarioDdls>
- <ddl>CREATE INDEX IDX_DIVISION ON PHERF.PHERF_PROD_TEST_UNSALTED (DIVISION)</ddl>
+ <ddl statement="CREATE INDEX IDX_DIVISION ON PHERF.PHERF_PROD_TEST_UNSALTED (DIVISION)"/>
</preScenarioDdls>
<postScenarioDdls>
- <ddl>CREATE INDEX IDX_OLDVAL_STRING ON PHERF.PHERF_PROD_TEST_UNSALTED (OLDVAL_STRING)</ddl>
- <ddl>CREATE INDEX IDX_CONNECTION_ID ON PHERF.PHERF_PROD_TEST_UNSALTED (CONNECTION_ID)</ddl>
+ <ddl statement="CREATE INDEX IDX_OLDVAL_STRING ON PHERF.PHERF_PROD_TEST_UNSALTED (OLDVAL_STRING)"/>
+ <ddl statement="CREATE INDEX IDX_CONNECTION_ID ON PHERF.PHERF_PROD_TEST_UNSALTED (CONNECTION_ID)"/>
</postScenarioDdls>
<writeParams executionDurationInMs="10000">