You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/06/15 20:43:09 UTC

nifi git commit: NIFI-5044: Applied changes in SelectHiveQL to SelectHive3QL

Repository: nifi
Updated Branches:
  refs/heads/master 187417d07 -> 97f71fd6c


NIFI-5044: Applied changes in SelectHiveQL to SelectHive3QL

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2799.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/97f71fd6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/97f71fd6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/97f71fd6

Branch: refs/heads/master
Commit: 97f71fd6c72d7053ecf13d4c58be56f04fa2064d
Parents: 187417d
Author: Matthew Burgess <ma...@apache.org>
Authored: Fri Jun 15 13:39:28 2018 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Fri Jun 15 22:42:59 2018 +0200

----------------------------------------------------------------------
 .../nifi/processors/hive/SelectHive3QL.java     | 129 ++++++++++---
 .../nifi/processors/hive/TestSelectHive3QL.java | 188 +++++++++++++++----
 2 files changed, 256 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/97f71fd6/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java
index cb0b000..3bda931 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -35,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -100,10 +102,21 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
             .build();
     static final Relationship REL_FAILURE = new Relationship.Builder()
             .name("failure")
-            .description("HiveQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship")
+            .description("HiveQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship.")
             .build();
 
 
+    public static final PropertyDescriptor HIVEQL_PRE_QUERY = new PropertyDescriptor.Builder()
+            .name("hive-pre-query")
+            .displayName("HiveQL Pre-Query")
+            .description("HiveQL pre-query to execute. Semicolon-delimited list of queries. "
+                    + "Example: 'set tez.queue.name=queue1; set hive.exec.orc.split.strategy=ETL; set hive.exec.reducers.bytes.per.reducer=1073741824'. "
+                    + "Note, the results/outputs of these queries will be suppressed if successfully executed.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
     static final PropertyDescriptor HIVEQL_SELECT_QUERY = new PropertyDescriptor.Builder()
             .name("hive-query")
             .displayName("HiveQL Select Query")
@@ -113,6 +126,16 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
+    public static final PropertyDescriptor HIVEQL_POST_QUERY = new PropertyDescriptor.Builder()
+            .name("hive-post-query")
+            .displayName("HiveQL Post-Query")
+            .description("HiveQL post-query to execute. Semicolon-delimited list of queries. "
+                    + "Note, the results/outputs of these queries will be suppressed if successfully executed.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
     static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
             .name("hive-fetch-size")
             .displayName("Fetch Size")
@@ -214,7 +237,9 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
     static {
         List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
         _propertyDescriptors.add(HIVE_DBCP_SERVICE);
+        _propertyDescriptors.add(HIVEQL_PRE_QUERY);
         _propertyDescriptors.add(HIVEQL_SELECT_QUERY);
+        _propertyDescriptors.add(HIVEQL_POST_QUERY);
         _propertyDescriptors.add(FETCH_SIZE);
         _propertyDescriptors.add(QUERY_TIMEOUT);
         _propertyDescriptors.add(MAX_ROWS_PER_FLOW_FILE);
@@ -278,19 +303,22 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
         final Hive3DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive3DBCPService.class);
         final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
 
+        List<String> preQueries = getQueries(context.getProperty(HIVEQL_PRE_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
+        List<String> postQueries = getQueries(context.getProperty(HIVEQL_POST_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
+
         final boolean flowbased = !(context.getProperty(HIVEQL_SELECT_QUERY).isSet());
 
         // Source the SQL
-        final String selectQuery;
+        String hqlStatement;
 
         if (context.getProperty(HIVEQL_SELECT_QUERY).isSet()) {
-            selectQuery = context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
+            hqlStatement = context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
         } else {
             // If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
             // If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
             final StringBuilder queryContents = new StringBuilder();
             session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, charset)));
-            selectQuery = queryContents.toString();
+            hqlStatement = queryContents.toString();
         }
 
 
@@ -309,10 +337,17 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
         final boolean escape = context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
         final String fragmentIdentifier = UUID.randomUUID().toString();
 
-        try (final Connection con = dbcpService.getConnection();
-             final Statement st = (flowbased ? con.prepareStatement(selectQuery) : con.createStatement())
+        try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
+             final Statement st = (flowbased ? con.prepareStatement(hqlStatement) : con.createStatement())
         ) {
-
+            Pair<String,SQLException> failure = executeConfigStatements(con, preQueries);
+            if (failure != null) {
+                // In case of failure, assigning config query to "hqlStatement"  to follow current error handling
+                hqlStatement = failure.getLeft();
+                flowfile = (fileToProcess == null) ? session.create() : fileToProcess;
+                fileToProcess = null;
+                throw failure.getRight();
+            }
             st.setQueryTimeout(context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asInteger());
 
             if (fetchSize != null && fetchSize > 0) {
@@ -326,14 +361,14 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
 
             final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
             try {
-                logger.debug("Executing query {}", new Object[]{selectQuery});
+                logger.debug("Executing query {}", new Object[]{hqlStatement});
                 if (flowbased) {
                     // Hive JDBC Doesn't Support this yet:
                     // ParameterMetaData pmd = ((PreparedStatement)st).getParameterMetaData();
                     // int paramCount = pmd.getParameterCount();
 
                     // Alternate way to determine number of params in SQL.
-                    int paramCount = StringUtils.countMatches(selectQuery, "?");
+                    int paramCount = StringUtils.countMatches(hqlStatement, "?");
 
                     if (paramCount > 0) {
                         setParameters(1, (PreparedStatement) st, paramCount, fileToProcess.getAttributes());
@@ -343,7 +378,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
                 final ResultSet resultSet;
 
                 try {
-                    resultSet = (flowbased ? ((PreparedStatement) st).executeQuery() : st.executeQuery(selectQuery));
+                    resultSet = (flowbased ? ((PreparedStatement) st).executeQuery() : st.executeQuery(hqlStatement));
                 } catch (SQLException se) {
                     // If an error occurs during the query, a flowfile is expected to be routed to failure, so ensure one here
                     flowfile = (fileToProcess == null) ? session.create() : fileToProcess;
@@ -355,7 +390,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
                 String baseFilename = (fileToProcess != null) ? fileToProcess.getAttribute(CoreAttributes.FILENAME.key()) : null;
                 while (true) {
                     final AtomicLong nrOfRows = new AtomicLong(0L);
-                    flowfile = (flowfile == null) ? session.create() : session.create(flowfile);
+                    flowfile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
                     if (baseFilename == null) {
                         baseFilename = flowfile.getAttribute(CoreAttributes.FILENAME.key());
                     }
@@ -388,10 +423,10 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
 
                         try {
                             // Set input/output table names by parsing the query
-                            attributes.putAll(toQueryTableAttributes(findTableNames(selectQuery)));
+                            attributes.putAll(toQueryTableAttributes(findTableNames(hqlStatement)));
                         } catch (Exception e) {
                             // If failed to parse the query, just log a warning message, but continue.
-                            getLogger().warn("Failed to parse query: {} due to {}", new Object[]{selectQuery, e}, e);
+                            getLogger().warn("Failed to parse query: {} due to {}", new Object[]{hqlStatement, e}, e);
                         }
 
                         // Set MIME type on output document and add extension to filename
@@ -410,14 +445,13 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
 
                         flowfile = session.putAllAttributes(flowfile, attributes);
 
-                        logger.info("{} contains {} Avro records; transferring to 'success'",
+                        logger.info("{} contains {} " + outputFormat + " records; transferring to 'success'",
                                 new Object[]{flowfile, nrOfRows.get()});
 
                         if (context.hasIncomingConnection()) {
-                            // If the flow file came from an incoming connection, issue a Modify Content provenance event
-
-                            session.getProvenanceReporter().modifyContent(flowfile, "Retrieved " + nrOfRows.get() + " rows",
-                                    stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                            // If the flow file came from an incoming connection, issue a Fetch provenance event
+                            session.getProvenanceReporter().fetch(flowfile, dbcpService.getConnectionURL(),
+                                    "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                         } else {
                             // If we created a flow file from rows received from Hive, issue a Receive provenance event
                             session.getProvenanceReporter().receive(flowfile, dbcpService.getConnectionURL(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
@@ -426,6 +460,9 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
                     } else {
                         // If there were no rows returned (and the first flow file has been sent, we're done processing, so remove the flowfile and carry on
                         session.remove(flowfile);
+                        if (resultSetFlowFiles != null && resultSetFlowFiles.size() > 0) {
+                            flowfile = resultSetFlowFiles.get(resultSetFlowFiles.size() - 1);
+                        }
                         break;
                     }
 
@@ -447,31 +484,73 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
                 throw e;
             }
 
+            failure = executeConfigStatements(con, postQueries);
+            if (failure != null) {
+                hqlStatement = failure.getLeft();
+                if (resultSetFlowFiles != null) {
+                    resultSetFlowFiles.forEach(ff -> session.remove(ff));
+                }
+                flowfile = (fileToProcess == null) ? session.create() : fileToProcess;
+                fileToProcess = null;
+                throw failure.getRight();
+            }
+
             session.transfer(resultSetFlowFiles, REL_SUCCESS);
+            if (fileToProcess != null) {
+                session.remove(fileToProcess);
+            }
 
         } catch (final ProcessException | SQLException e) {
-            logger.error("Issue processing SQL {} due to {}.", new Object[]{selectQuery, e});
+            logger.error("Issue processing SQL {} due to {}.", new Object[]{hqlStatement, e});
             if (flowfile == null) {
                 // This can happen if any exceptions occur while setting up the connection, statement, etc.
                 logger.error("Unable to execute HiveQL select query {} due to {}. No FlowFile to route to failure",
-                        new Object[]{selectQuery, e});
+                        new Object[]{hqlStatement, e});
                 context.yield();
             } else {
                 if (context.hasIncomingConnection()) {
                     logger.error("Unable to execute HiveQL select query {} for {} due to {}; routing to failure",
-                            new Object[]{selectQuery, flowfile, e});
+                            new Object[]{hqlStatement, flowfile, e});
                     flowfile = session.penalize(flowfile);
                 } else {
                     logger.error("Unable to execute HiveQL select query {} due to {}; routing to failure",
-                            new Object[]{selectQuery, e});
+                            new Object[]{hqlStatement, e});
                     context.yield();
                 }
                 session.transfer(flowfile, REL_FAILURE);
             }
-        } finally {
-            if (fileToProcess != null) {
-                session.remove(fileToProcess);
+        }
+    }
+
+    /*
+     * Executes given queries using pre-defined connection.
+     * Returns null on success, or a query string if failed.
+     */
+    protected Pair<String,SQLException> executeConfigStatements(final Connection con, final List<String> configQueries){
+        if (configQueries == null || configQueries.isEmpty()) {
+            return null;
+        }
+
+        for (String confSQL : configQueries) {
+            try(final Statement st = con.createStatement()){
+                st.execute(confSQL);
+            } catch (SQLException e) {
+                return Pair.of(confSQL, e);
+            }
+        }
+        return null;
+    }
+
+    protected List<String> getQueries(final String value) {
+        if (value == null || value.length() == 0 || value.trim().length() == 0) {
+            return null;
+        }
+        final List<String> queries = new LinkedList<>();
+        for (String query : value.split(";")) {
+            if (query.trim().length() > 0) {
+                queries.add(query.trim());
             }
         }
+        return queries;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/97f71fd6/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
index 50e83ac..67f0dd4 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
@@ -25,6 +25,8 @@ import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.dbcp.hive.Hive3DBCPService;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -64,6 +66,8 @@ public class TestSelectHive3QL {
 
     private static final Logger LOGGER;
     private final static String MAX_ROWS_KEY = "maxRows";
+    private final int NUM_OF_ROWS = 100;
+
 
     static {
         System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
@@ -118,11 +122,26 @@ public class TestSelectHive3QL {
     public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException {
         runner.setIncomingConnection(false);
         invokeOnTrigger(QUERY_WITHOUT_EL, false, "Avro");
+
+        final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
+        final ProvenanceEventRecord provenance0 = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.RECEIVE, provenance0.getEventType());
+        assertEquals("jdbc:derby:target/db;create=true", provenance0.getTransitUri());
     }
 
     @Test
     public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException {
         invokeOnTrigger(QUERY_WITH_EL, true, "Avro");
+
+        final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
+        assertEquals(2, provenanceEvents.size());
+
+        final ProvenanceEventRecord provenance0 = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.FORK, provenance0.getEventType());
+
+        final ProvenanceEventRecord provenance1 = provenanceEvents.get(1);
+        assertEquals(ProvenanceEventType.FETCH, provenance1.getEventType());
+        assertEquals("jdbc:derby:target/db;create=true", provenance1.getTransitUri());
     }
 
 
@@ -182,6 +201,51 @@ public class TestSelectHive3QL {
     }
 
     @Test
+    public void invokeOnTriggerExceptionInPreQueriesNoIncomingFlows()
+            throws InitializationException, ClassNotFoundException, SQLException, IOException {
+
+        doOnTrigger(QUERY_WITHOUT_EL, false, CSV,
+                "select 'no exception' from persons; select exception from persons",
+                null);
+
+        runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void invokeOnTriggerExceptionInPreQueriesWithIncomingFlows()
+            throws InitializationException, ClassNotFoundException, SQLException, IOException {
+
+        doOnTrigger(QUERY_WITHOUT_EL, true, CSV,
+                "select 'no exception' from persons; select exception from persons",
+                null);
+
+        runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void invokeOnTriggerExceptionInPostQueriesNoIncomingFlows()
+            throws InitializationException, ClassNotFoundException, SQLException, IOException {
+
+        doOnTrigger(QUERY_WITHOUT_EL, false, CSV,
+                null,
+                "select 'no exception' from persons; select exception from persons");
+
+        runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void invokeOnTriggerExceptionInPostQueriesWithIncomingFlows()
+            throws InitializationException, ClassNotFoundException, SQLException, IOException {
+
+        doOnTrigger(QUERY_WITHOUT_EL, true, CSV,
+                null,
+                "select 'no exception' from persons; select exception from persons");
+
+        // with incoming connections, it should be rolled back
+        runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1);
+    }
+
+    @Test
     public void testWithBadSQL() throws SQLException {
         final String BAD_SQL = "create table TEST_NO_ROWS (id integer)";
 
@@ -218,45 +282,45 @@ public class TestSelectHive3QL {
         invokeOnTrigger(QUERY_WITHOUT_EL, false, AVRO);
     }
 
-    public void invokeOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat)
+    @Test
+    public void invokeOnTriggerWithValidPreQueries()
             throws InitializationException, ClassNotFoundException, SQLException, IOException {
+        invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV,
+                "select '1' from persons; select '2' from persons", //should not be 'select'. But Derby driver doesn't support "set param=val" format.
+                null);
+    }
 
-        // remove previous test database, if any
-        final File dbLocation = new File(DB_LOCATION);
-        dbLocation.delete();
-
-        // load test data to database
-        final Connection con = ((Hive3DBCPService) runner.getControllerService("dbcp")).getConnection();
-        final Statement stmt = con.createStatement();
-        try {
-            stmt.execute("drop table persons");
-        } catch (final SQLException sqle) {
-            // Nothing to do here, the table didn't exist
-        }
+    @Test
+    public void invokeOnTriggerWithValidPostQueries()
+            throws InitializationException, ClassNotFoundException, SQLException, IOException {
+        invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV,
+                null,
+                //should not be 'select'. But Derby driver doesn't support "set param=val" format,
+                //so just providing any "compilable" query.
+                " select '4' from persons; \nselect '5' from persons");
+    }
 
-        stmt.execute("create table persons (id integer, name varchar(100), code integer)");
-        Random rng = new Random(53496);
-        final int nrOfRows = 100;
-        stmt.executeUpdate("insert into persons values (1, 'Joe Smith', " + rng.nextInt(469947) + ")");
-        for (int i = 2; i < nrOfRows; i++) {
-            stmt.executeUpdate("insert into persons values (" + i + ", 'Someone Else', " + rng.nextInt(469947) + ")");
-        }
-        stmt.executeUpdate("insert into persons values (" + nrOfRows + ", 'Last Person', NULL)");
+    @Test
+    public void invokeOnTriggerWithValidPrePostQueries()
+            throws InitializationException, ClassNotFoundException, SQLException, IOException {
+        invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV,
+                //should not be 'select'. But Derby driver doesn't support "set param=val" format,
+                //so just providing any "compilable" query.
+                "select '1' from persons; select '2' from persons",
+                " select '4' from persons; \nselect '5' from persons");
+    }
 
-        LOGGER.info("test data loaded");
 
-        runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, query);
-        runner.setProperty(HIVEQL_OUTPUT_FORMAT, outputFormat);
+    public void invokeOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat)
+            throws InitializationException, ClassNotFoundException, SQLException, IOException {
+        invokeOnTrigger(query, incomingFlowFile, outputFormat, null, null);
+    }
 
-        if (incomingFlowFile) {
-            // incoming FlowFile content is not used, but attributes are used
-            final Map<String, String> attributes = new HashMap<>();
-            attributes.put("person.id", "10");
-            runner.enqueue("Hello".getBytes(), attributes);
-        }
+    public void invokeOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat,
+                                String preQueries, String postQueries)
+            throws InitializationException, ClassNotFoundException, SQLException, IOException {
 
-        runner.setIncomingConnection(incomingFlowFile);
-        runner.run();
+        TestRunner runner = doOnTrigger(query, incomingFlowFile, outputFormat, preQueries, postQueries);
         runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
 
         final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS);
@@ -289,7 +353,7 @@ public class TestSelectHive3QL {
             while ((line = br.readLine()) != null) {
                 recordsFromStream++;
                 String[] values = line.split(",");
-                if (recordsFromStream < (nrOfRows - 10)) {
+                if (recordsFromStream < (NUM_OF_ROWS - 10)) {
                     assertEquals(3, values.length);
                     assertTrue(values[1].startsWith("\""));
                     assertTrue(values[1].endsWith("\""));
@@ -298,11 +362,60 @@ public class TestSelectHive3QL {
                 }
             }
         }
-        assertEquals(nrOfRows - 10, recordsFromStream);
+        assertEquals(NUM_OF_ROWS - 10, recordsFromStream);
         assertEquals(recordsFromStream, Integer.parseInt(flowFile.getAttribute(SelectHive3QL.RESULT_ROW_COUNT)));
         flowFile.assertAttributeEquals(AbstractHive3QLProcessor.ATTR_INPUT_TABLES, "persons");
     }
 
+    public TestRunner doOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat,
+                                  String preQueries, String postQueries)
+            throws InitializationException, ClassNotFoundException, SQLException, IOException {
+
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((Hive3DBCPService) runner.getControllerService("dbcp")).getConnection();
+        final Statement stmt = con.createStatement();
+        try {
+            stmt.execute("drop table persons");
+        } catch (final SQLException sqle) {
+            // Nothing to do here, the table didn't exist
+        }
+
+        stmt.execute("create table persons (id integer, name varchar(100), code integer)");
+        Random rng = new Random(53496);
+        stmt.executeUpdate("insert into persons values (1, 'Joe Smith', " + rng.nextInt(469947) + ")");
+        for (int i = 2; i < NUM_OF_ROWS; i++) {
+            stmt.executeUpdate("insert into persons values (" + i + ", 'Someone Else', " + rng.nextInt(469947) + ")");
+        }
+        stmt.executeUpdate("insert into persons values (" + NUM_OF_ROWS + ", 'Last Person', NULL)");
+
+        LOGGER.info("test data loaded");
+
+        runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, query);
+        runner.setProperty(HIVEQL_OUTPUT_FORMAT, outputFormat);
+        if (preQueries != null) {
+            runner.setProperty(SelectHive3QL.HIVEQL_PRE_QUERY, preQueries);
+        }
+        if (postQueries != null) {
+            runner.setProperty(SelectHive3QL.HIVEQL_POST_QUERY, postQueries);
+        }
+
+        if (incomingFlowFile) {
+            // incoming FlowFile content is not used, but attributes are used
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put("person.id", "10");
+            runner.enqueue("Hello".getBytes(), attributes);
+        }
+
+        runner.setIncomingConnection(incomingFlowFile);
+        runner.run();
+
+        return runner;
+    }
+
     @Test
     public void testMaxRowsPerFlowFileAvro() throws ClassNotFoundException, SQLException, InitializationException, IOException {
 
@@ -388,6 +501,10 @@ public class TestSelectHive3QL {
 
         runner.run();
         runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0);
+        // Assert the attributes from the incoming flow file are preserved in the outgoing flow file(s)
+        flowFile.assertAttributeEquals("hiveql.args.1.value", "1");
+        flowFile.assertAttributeEquals("hiveql.args.1.type", String.valueOf(Types.INTEGER));
         runner.clearTransferState();
     }
 
@@ -535,5 +652,4 @@ public class TestSelectHive3QL {
             return "jdbc:derby:" + DB_LOCATION + ";create=true";
         }
     }
-
-}
+}
\ No newline at end of file