You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/06/29 12:59:37 UTC
nifi git commit: NIFI-5339 - Better Time Tracking for ExecuteSQL
Durations
Repository: nifi
Updated Branches:
refs/heads/master 6508c191b -> c1083dfb6
NIFI-5339 - Better Time Tracking for ExecuteSQL Durations
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #2817.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c1083dfb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c1083dfb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c1083dfb
Branch: refs/heads/master
Commit: c1083dfb62185567dca0b3bcd6b1202cd34093dd
Parents: 6508c19
Author: patricker <pa...@gmail.com>
Authored: Tue Jun 26 13:27:11 2018 -0600
Committer: Pierre Villard <pi...@gmail.com>
Committed: Fri Jun 29 14:59:27 2018 +0200
----------------------------------------------------------------------
.../nifi/processors/standard/ExecuteSQL.java | 18 ++++++++++++++----
.../nifi/processors/standard/TestExecuteSQL.java | 8 ++++++++
2 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/c1083dfb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index ac93feb..af22fc3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -100,6 +100,8 @@ public class ExecuteSQL extends AbstractProcessor {
public static final String RESULT_ROW_COUNT = "executesql.row.count";
public static final String RESULT_QUERY_DURATION = "executesql.query.duration";
+ public static final String RESULT_QUERY_EXECUTION_TIME = "executesql.query.executiontime";
+ public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime";
public static final String RESULTSET_INDEX = "executesql.resultset.index";
// Relationships
@@ -203,7 +205,6 @@ public class ExecuteSQL extends AbstractProcessor {
final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(fileToProcess).asInteger();
final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(fileToProcess).asInteger();
- final StopWatch stopWatch = new StopWatch(true);
final String selectQuery;
if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
@@ -224,7 +225,12 @@ public class ExecuteSQL extends AbstractProcessor {
JdbcCommon.setParameters(st, fileToProcess.getAttributes());
}
logger.debug("Executing query {}", new Object[]{selectQuery});
+ final StopWatch executionTime = new StopWatch(true);
+
boolean hasResults = st.execute();
+
+ long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS);
+
boolean hasUpdateCount = st.getUpdateCount() != -1;
while(hasResults || hasUpdateCount) {
@@ -238,6 +244,8 @@ public class ExecuteSQL extends AbstractProcessor {
resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
}
+ final StopWatch fetchTime = new StopWatch(true);
+
final AtomicLong nrOfRows = new AtomicLong(0L);
resultSetFF = session.write(resultSetFF, out -> {
try {
@@ -255,17 +263,19 @@ public class ExecuteSQL extends AbstractProcessor {
}
});
- long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
+ long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
// set attribute how many rows were selected
resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
- resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(duration));
+ resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
+ resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
+ resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{resultSetFF, nrOfRows.get()});
- session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", duration);
+ session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
session.transfer(resultSetFF, REL_SUCCESS);
resultCount++;
http://git-wip-us.apache.org/repos/asf/nifi/blob/c1083dfb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index b4a7c69..a5d8f45 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -312,9 +312,17 @@ public class TestExecuteSQL {
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_QUERY_DURATION);
+ runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_QUERY_EXECUTION_TIME);
+ runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_QUERY_FETCH_TIME);
runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_ROW_COUNT);
final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS);
+ final long executionTime = Long.parseLong(flowfiles.get(0).getAttribute(ExecuteSQL.RESULT_QUERY_EXECUTION_TIME));
+ final long fetchTime = Long.parseLong(flowfiles.get(0).getAttribute(ExecuteSQL.RESULT_QUERY_FETCH_TIME));
+ final long durationTime = Long.parseLong(flowfiles.get(0).getAttribute(ExecuteSQL.RESULT_QUERY_DURATION));
+
+ assertEquals(durationTime, fetchTime + executionTime);
+
final InputStream in = new ByteArrayInputStream(flowfiles.get(0).toByteArray());
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {