You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/06/29 12:59:37 UTC

nifi git commit: NIFI-5339 - Better Time Tracking for ExecuteSQL Durations

Repository: nifi
Updated Branches:
  refs/heads/master 6508c191b -> c1083dfb6


NIFI-5339 - Better Time Tracking for ExecuteSQL Durations

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2817.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c1083dfb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c1083dfb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c1083dfb

Branch: refs/heads/master
Commit: c1083dfb62185567dca0b3bcd6b1202cd34093dd
Parents: 6508c19
Author: patricker <pa...@gmail.com>
Authored: Tue Jun 26 13:27:11 2018 -0600
Committer: Pierre Villard <pi...@gmail.com>
Committed: Fri Jun 29 14:59:27 2018 +0200

----------------------------------------------------------------------
 .../nifi/processors/standard/ExecuteSQL.java      | 18 ++++++++++++++----
 .../nifi/processors/standard/TestExecuteSQL.java  |  8 ++++++++
 2 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c1083dfb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index ac93feb..af22fc3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -100,6 +100,8 @@ public class ExecuteSQL extends AbstractProcessor {
 
     public static final String RESULT_ROW_COUNT = "executesql.row.count";
     public static final String RESULT_QUERY_DURATION = "executesql.query.duration";
+    public static final String RESULT_QUERY_EXECUTION_TIME = "executesql.query.executiontime";
+    public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime";
     public static final String RESULTSET_INDEX = "executesql.resultset.index";
 
     // Relationships
@@ -203,7 +205,6 @@ public class ExecuteSQL extends AbstractProcessor {
         final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
         final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(fileToProcess).asInteger();
         final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(fileToProcess).asInteger();
-        final StopWatch stopWatch = new StopWatch(true);
         final String selectQuery;
         if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
             selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
@@ -224,7 +225,12 @@ public class ExecuteSQL extends AbstractProcessor {
                 JdbcCommon.setParameters(st, fileToProcess.getAttributes());
             }
             logger.debug("Executing query {}", new Object[]{selectQuery});
+            final StopWatch executionTime = new StopWatch(true);
+
             boolean hasResults = st.execute();
+
+            long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS);
+
             boolean hasUpdateCount = st.getUpdateCount() != -1;
 
             while(hasResults || hasUpdateCount) {
@@ -238,6 +244,8 @@ public class ExecuteSQL extends AbstractProcessor {
                         resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
                     }
 
+                    final StopWatch fetchTime = new StopWatch(true);
+
                     final AtomicLong nrOfRows = new AtomicLong(0L);
                     resultSetFF = session.write(resultSetFF, out -> {
                         try {
@@ -255,17 +263,19 @@ public class ExecuteSQL extends AbstractProcessor {
                         }
                     });
 
-                    long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
+                    long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
 
                     // set attribute how many rows were selected
                     resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
-                    resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(duration));
+                    resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
+                    resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
+                    resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
                     resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
                     resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
 
                     logger.info("{} contains {} Avro records; transferring to 'success'",
                             new Object[]{resultSetFF, nrOfRows.get()});
-                    session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", duration);
+                    session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
                     session.transfer(resultSetFF, REL_SUCCESS);
 
                     resultCount++;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1083dfb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index b4a7c69..a5d8f45 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -312,9 +312,17 @@ public class TestExecuteSQL {
         runner.run();
         runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
         runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_QUERY_DURATION);
+        runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_QUERY_EXECUTION_TIME);
+        runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_QUERY_FETCH_TIME);
         runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_ROW_COUNT);
 
         final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS);
+        final long executionTime = Long.parseLong(flowfiles.get(0).getAttribute(ExecuteSQL.RESULT_QUERY_EXECUTION_TIME));
+        final long fetchTime = Long.parseLong(flowfiles.get(0).getAttribute(ExecuteSQL.RESULT_QUERY_FETCH_TIME));
+        final long durationTime = Long.parseLong(flowfiles.get(0).getAttribute(ExecuteSQL.RESULT_QUERY_DURATION));
+
+        assertEquals(durationTime, fetchTime + executionTime);
+
         final InputStream in = new ByteArrayInputStream(flowfiles.get(0).toByteArray());
         final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
         try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {