You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/06/15 20:43:09 UTC
nifi git commit: NIFI-5044: Applied changes in SelectHiveQL to
SelectHive3QL
Repository: nifi
Updated Branches:
refs/heads/master 187417d07 -> 97f71fd6c
NIFI-5044: Applied changes in SelectHiveQL to SelectHive3QL
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #2799.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/97f71fd6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/97f71fd6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/97f71fd6
Branch: refs/heads/master
Commit: 97f71fd6c72d7053ecf13d4c58be56f04fa2064d
Parents: 187417d
Author: Matthew Burgess <ma...@apache.org>
Authored: Fri Jun 15 13:39:28 2018 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Fri Jun 15 22:42:59 2018 +0200
----------------------------------------------------------------------
.../nifi/processors/hive/SelectHive3QL.java | 129 ++++++++++---
.../nifi/processors/hive/TestSelectHive3QL.java | 188 +++++++++++++++----
2 files changed, 256 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/97f71fd6/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java
index cb0b000..3bda931 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -35,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -100,10 +102,21 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
- .description("HiveQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship")
+ .description("HiveQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship.")
.build();
+ public static final PropertyDescriptor HIVEQL_PRE_QUERY = new PropertyDescriptor.Builder()
+ .name("hive-pre-query")
+ .displayName("HiveQL Pre-Query")
+ .description("HiveQL pre-query to execute. Semicolon-delimited list of queries. "
+ + "Example: 'set tez.queue.name=queue1; set hive.exec.orc.split.strategy=ETL; set hive.exec.reducers.bytes.per.reducer=1073741824'. "
+ + "Note, the results/outputs of these queries will be suppressed if successfully executed.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
static final PropertyDescriptor HIVEQL_SELECT_QUERY = new PropertyDescriptor.Builder()
.name("hive-query")
.displayName("HiveQL Select Query")
@@ -113,6 +126,16 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
+ public static final PropertyDescriptor HIVEQL_POST_QUERY = new PropertyDescriptor.Builder()
+ .name("hive-post-query")
+ .displayName("HiveQL Post-Query")
+ .description("HiveQL post-query to execute. Semicolon-delimited list of queries. "
+ + "Note, the results/outputs of these queries will be suppressed if successfully executed.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
.name("hive-fetch-size")
.displayName("Fetch Size")
@@ -214,7 +237,9 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(HIVE_DBCP_SERVICE);
+ _propertyDescriptors.add(HIVEQL_PRE_QUERY);
_propertyDescriptors.add(HIVEQL_SELECT_QUERY);
+ _propertyDescriptors.add(HIVEQL_POST_QUERY);
_propertyDescriptors.add(FETCH_SIZE);
_propertyDescriptors.add(QUERY_TIMEOUT);
_propertyDescriptors.add(MAX_ROWS_PER_FLOW_FILE);
@@ -278,19 +303,22 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
final Hive3DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive3DBCPService.class);
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
+ List<String> preQueries = getQueries(context.getProperty(HIVEQL_PRE_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
+ List<String> postQueries = getQueries(context.getProperty(HIVEQL_POST_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
+
final boolean flowbased = !(context.getProperty(HIVEQL_SELECT_QUERY).isSet());
// Source the SQL
- final String selectQuery;
+ String hqlStatement;
if (context.getProperty(HIVEQL_SELECT_QUERY).isSet()) {
- selectQuery = context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
+ hqlStatement = context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
} else {
// If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
// If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
final StringBuilder queryContents = new StringBuilder();
session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, charset)));
- selectQuery = queryContents.toString();
+ hqlStatement = queryContents.toString();
}
@@ -309,10 +337,17 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
final boolean escape = context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
final String fragmentIdentifier = UUID.randomUUID().toString();
- try (final Connection con = dbcpService.getConnection();
- final Statement st = (flowbased ? con.prepareStatement(selectQuery) : con.createStatement())
+ try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
+ final Statement st = (flowbased ? con.prepareStatement(hqlStatement) : con.createStatement())
) {
-
+ Pair<String,SQLException> failure = executeConfigStatements(con, preQueries);
+ if (failure != null) {
+ // In case of failure, assigning config query to "hqlStatement" to follow current error handling
+ hqlStatement = failure.getLeft();
+ flowfile = (fileToProcess == null) ? session.create() : fileToProcess;
+ fileToProcess = null;
+ throw failure.getRight();
+ }
st.setQueryTimeout(context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asInteger());
if (fetchSize != null && fetchSize > 0) {
@@ -326,14 +361,14 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
try {
- logger.debug("Executing query {}", new Object[]{selectQuery});
+ logger.debug("Executing query {}", new Object[]{hqlStatement});
if (flowbased) {
// Hive JDBC Doesn't Support this yet:
// ParameterMetaData pmd = ((PreparedStatement)st).getParameterMetaData();
// int paramCount = pmd.getParameterCount();
// Alternate way to determine number of params in SQL.
- int paramCount = StringUtils.countMatches(selectQuery, "?");
+ int paramCount = StringUtils.countMatches(hqlStatement, "?");
if (paramCount > 0) {
setParameters(1, (PreparedStatement) st, paramCount, fileToProcess.getAttributes());
@@ -343,7 +378,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
final ResultSet resultSet;
try {
- resultSet = (flowbased ? ((PreparedStatement) st).executeQuery() : st.executeQuery(selectQuery));
+ resultSet = (flowbased ? ((PreparedStatement) st).executeQuery() : st.executeQuery(hqlStatement));
} catch (SQLException se) {
// If an error occurs during the query, a flowfile is expected to be routed to failure, so ensure one here
flowfile = (fileToProcess == null) ? session.create() : fileToProcess;
@@ -355,7 +390,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
String baseFilename = (fileToProcess != null) ? fileToProcess.getAttribute(CoreAttributes.FILENAME.key()) : null;
while (true) {
final AtomicLong nrOfRows = new AtomicLong(0L);
- flowfile = (flowfile == null) ? session.create() : session.create(flowfile);
+ flowfile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
if (baseFilename == null) {
baseFilename = flowfile.getAttribute(CoreAttributes.FILENAME.key());
}
@@ -388,10 +423,10 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
try {
// Set input/output table names by parsing the query
- attributes.putAll(toQueryTableAttributes(findTableNames(selectQuery)));
+ attributes.putAll(toQueryTableAttributes(findTableNames(hqlStatement)));
} catch (Exception e) {
// If failed to parse the query, just log a warning message, but continue.
- getLogger().warn("Failed to parse query: {} due to {}", new Object[]{selectQuery, e}, e);
+ getLogger().warn("Failed to parse query: {} due to {}", new Object[]{hqlStatement, e}, e);
}
// Set MIME type on output document and add extension to filename
@@ -410,14 +445,13 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
flowfile = session.putAllAttributes(flowfile, attributes);
- logger.info("{} contains {} Avro records; transferring to 'success'",
+ logger.info("{} contains {} " + outputFormat + " records; transferring to 'success'",
new Object[]{flowfile, nrOfRows.get()});
if (context.hasIncomingConnection()) {
- // If the flow file came from an incoming connection, issue a Modify Content provenance event
-
- session.getProvenanceReporter().modifyContent(flowfile, "Retrieved " + nrOfRows.get() + " rows",
- stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ // If the flow file came from an incoming connection, issue a Fetch provenance event
+ session.getProvenanceReporter().fetch(flowfile, dbcpService.getConnectionURL(),
+ "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
} else {
// If we created a flow file from rows received from Hive, issue a Receive provenance event
session.getProvenanceReporter().receive(flowfile, dbcpService.getConnectionURL(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
@@ -426,6 +460,9 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
} else {
// If there were no rows returned (and the first flow file has been sent, we're done processing, so remove the flowfile and carry on
session.remove(flowfile);
+ if (resultSetFlowFiles != null && resultSetFlowFiles.size() > 0) {
+ flowfile = resultSetFlowFiles.get(resultSetFlowFiles.size() - 1);
+ }
break;
}
@@ -447,31 +484,73 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
throw e;
}
+ failure = executeConfigStatements(con, postQueries);
+ if (failure != null) {
+ hqlStatement = failure.getLeft();
+ if (resultSetFlowFiles != null) {
+ resultSetFlowFiles.forEach(ff -> session.remove(ff));
+ }
+ flowfile = (fileToProcess == null) ? session.create() : fileToProcess;
+ fileToProcess = null;
+ throw failure.getRight();
+ }
+
session.transfer(resultSetFlowFiles, REL_SUCCESS);
+ if (fileToProcess != null) {
+ session.remove(fileToProcess);
+ }
} catch (final ProcessException | SQLException e) {
- logger.error("Issue processing SQL {} due to {}.", new Object[]{selectQuery, e});
+ logger.error("Issue processing SQL {} due to {}.", new Object[]{hqlStatement, e});
if (flowfile == null) {
// This can happen if any exceptions occur while setting up the connection, statement, etc.
logger.error("Unable to execute HiveQL select query {} due to {}. No FlowFile to route to failure",
- new Object[]{selectQuery, e});
+ new Object[]{hqlStatement, e});
context.yield();
} else {
if (context.hasIncomingConnection()) {
logger.error("Unable to execute HiveQL select query {} for {} due to {}; routing to failure",
- new Object[]{selectQuery, flowfile, e});
+ new Object[]{hqlStatement, flowfile, e});
flowfile = session.penalize(flowfile);
} else {
logger.error("Unable to execute HiveQL select query {} due to {}; routing to failure",
- new Object[]{selectQuery, e});
+ new Object[]{hqlStatement, e});
context.yield();
}
session.transfer(flowfile, REL_FAILURE);
}
- } finally {
- if (fileToProcess != null) {
- session.remove(fileToProcess);
+ }
+ }
+
+ /*
+ * Executes given queries using pre-defined connection.
+ * Returns null on success, or a query string if failed.
+ */
+ protected Pair<String,SQLException> executeConfigStatements(final Connection con, final List<String> configQueries){
+ if (configQueries == null || configQueries.isEmpty()) {
+ return null;
+ }
+
+ for (String confSQL : configQueries) {
+ try(final Statement st = con.createStatement()){
+ st.execute(confSQL);
+ } catch (SQLException e) {
+ return Pair.of(confSQL, e);
+ }
+ }
+ return null;
+ }
+
+ protected List<String> getQueries(final String value) {
+ if (value == null || value.length() == 0 || value.trim().length() == 0) {
+ return null;
+ }
+ final List<String> queries = new LinkedList<>();
+ for (String query : value.split(";")) {
+ if (query.trim().length() > 0) {
+ queries.add(query.trim());
}
}
+ return queries;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/97f71fd6/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
index 50e83ac..67f0dd4 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
@@ -25,6 +25,8 @@ import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.dbcp.hive.Hive3DBCPService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -64,6 +66,8 @@ public class TestSelectHive3QL {
private static final Logger LOGGER;
private final static String MAX_ROWS_KEY = "maxRows";
+ private final int NUM_OF_ROWS = 100;
+
static {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
@@ -118,11 +122,26 @@ public class TestSelectHive3QL {
public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException {
runner.setIncomingConnection(false);
invokeOnTrigger(QUERY_WITHOUT_EL, false, "Avro");
+
+ final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
+ final ProvenanceEventRecord provenance0 = provenanceEvents.get(0);
+ assertEquals(ProvenanceEventType.RECEIVE, provenance0.getEventType());
+ assertEquals("jdbc:derby:target/db;create=true", provenance0.getTransitUri());
}
@Test
public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTrigger(QUERY_WITH_EL, true, "Avro");
+
+ final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
+ assertEquals(2, provenanceEvents.size());
+
+ final ProvenanceEventRecord provenance0 = provenanceEvents.get(0);
+ assertEquals(ProvenanceEventType.FORK, provenance0.getEventType());
+
+ final ProvenanceEventRecord provenance1 = provenanceEvents.get(1);
+ assertEquals(ProvenanceEventType.FETCH, provenance1.getEventType());
+ assertEquals("jdbc:derby:target/db;create=true", provenance1.getTransitUri());
}
@@ -182,6 +201,51 @@ public class TestSelectHive3QL {
}
@Test
+ public void invokeOnTriggerExceptionInPreQueriesNoIncomingFlows()
+ throws InitializationException, ClassNotFoundException, SQLException, IOException {
+
+ doOnTrigger(QUERY_WITHOUT_EL, false, CSV,
+ "select 'no exception' from persons; select exception from persons",
+ null);
+
+ runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1);
+ }
+
+ @Test
+ public void invokeOnTriggerExceptionInPreQueriesWithIncomingFlows()
+ throws InitializationException, ClassNotFoundException, SQLException, IOException {
+
+ doOnTrigger(QUERY_WITHOUT_EL, true, CSV,
+ "select 'no exception' from persons; select exception from persons",
+ null);
+
+ runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1);
+ }
+
+ @Test
+ public void invokeOnTriggerExceptionInPostQueriesNoIncomingFlows()
+ throws InitializationException, ClassNotFoundException, SQLException, IOException {
+
+ doOnTrigger(QUERY_WITHOUT_EL, false, CSV,
+ null,
+ "select 'no exception' from persons; select exception from persons");
+
+ runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1);
+ }
+
+ @Test
+ public void invokeOnTriggerExceptionInPostQueriesWithIncomingFlows()
+ throws InitializationException, ClassNotFoundException, SQLException, IOException {
+
+ doOnTrigger(QUERY_WITHOUT_EL, true, CSV,
+ null,
+ "select 'no exception' from persons; select exception from persons");
+
+ // with incoming connections, it should be rolled back
+ runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1);
+ }
+
+ @Test
public void testWithBadSQL() throws SQLException {
final String BAD_SQL = "create table TEST_NO_ROWS (id integer)";
@@ -218,45 +282,45 @@ public class TestSelectHive3QL {
invokeOnTrigger(QUERY_WITHOUT_EL, false, AVRO);
}
- public void invokeOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat)
+ @Test
+ public void invokeOnTriggerWithValidPreQueries()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
+ invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV,
+ "select '1' from persons; select '2' from persons", //should not be 'select'. But Derby driver doesn't support "set param=val" format.
+ null);
+ }
- // remove previous test database, if any
- final File dbLocation = new File(DB_LOCATION);
- dbLocation.delete();
-
- // load test data to database
- final Connection con = ((Hive3DBCPService) runner.getControllerService("dbcp")).getConnection();
- final Statement stmt = con.createStatement();
- try {
- stmt.execute("drop table persons");
- } catch (final SQLException sqle) {
- // Nothing to do here, the table didn't exist
- }
+ @Test
+ public void invokeOnTriggerWithValidPostQueries()
+ throws InitializationException, ClassNotFoundException, SQLException, IOException {
+ invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV,
+ null,
+ //should not be 'select'. But Derby driver doesn't support "set param=val" format,
+ //so just providing any "compilable" query.
+ " select '4' from persons; \nselect '5' from persons");
+ }
- stmt.execute("create table persons (id integer, name varchar(100), code integer)");
- Random rng = new Random(53496);
- final int nrOfRows = 100;
- stmt.executeUpdate("insert into persons values (1, 'Joe Smith', " + rng.nextInt(469947) + ")");
- for (int i = 2; i < nrOfRows; i++) {
- stmt.executeUpdate("insert into persons values (" + i + ", 'Someone Else', " + rng.nextInt(469947) + ")");
- }
- stmt.executeUpdate("insert into persons values (" + nrOfRows + ", 'Last Person', NULL)");
+ @Test
+ public void invokeOnTriggerWithValidPrePostQueries()
+ throws InitializationException, ClassNotFoundException, SQLException, IOException {
+ invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV,
+ //should not be 'select'. But Derby driver doesn't support "set param=val" format,
+ //so just providing any "compilable" query.
+ "select '1' from persons; select '2' from persons",
+ " select '4' from persons; \nselect '5' from persons");
+ }
- LOGGER.info("test data loaded");
- runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, query);
- runner.setProperty(HIVEQL_OUTPUT_FORMAT, outputFormat);
+ public void invokeOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat)
+ throws InitializationException, ClassNotFoundException, SQLException, IOException {
+ invokeOnTrigger(query, incomingFlowFile, outputFormat, null, null);
+ }
- if (incomingFlowFile) {
- // incoming FlowFile content is not used, but attributes are used
- final Map<String, String> attributes = new HashMap<>();
- attributes.put("person.id", "10");
- runner.enqueue("Hello".getBytes(), attributes);
- }
+ public void invokeOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat,
+ String preQueries, String postQueries)
+ throws InitializationException, ClassNotFoundException, SQLException, IOException {
- runner.setIncomingConnection(incomingFlowFile);
- runner.run();
+ TestRunner runner = doOnTrigger(query, incomingFlowFile, outputFormat, preQueries, postQueries);
runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS);
@@ -289,7 +353,7 @@ public class TestSelectHive3QL {
while ((line = br.readLine()) != null) {
recordsFromStream++;
String[] values = line.split(",");
- if (recordsFromStream < (nrOfRows - 10)) {
+ if (recordsFromStream < (NUM_OF_ROWS - 10)) {
assertEquals(3, values.length);
assertTrue(values[1].startsWith("\""));
assertTrue(values[1].endsWith("\""));
@@ -298,11 +362,60 @@ public class TestSelectHive3QL {
}
}
}
- assertEquals(nrOfRows - 10, recordsFromStream);
+ assertEquals(NUM_OF_ROWS - 10, recordsFromStream);
assertEquals(recordsFromStream, Integer.parseInt(flowFile.getAttribute(SelectHive3QL.RESULT_ROW_COUNT)));
flowFile.assertAttributeEquals(AbstractHive3QLProcessor.ATTR_INPUT_TABLES, "persons");
}
+ public TestRunner doOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat,
+ String preQueries, String postQueries)
+ throws InitializationException, ClassNotFoundException, SQLException, IOException {
+
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((Hive3DBCPService) runner.getControllerService("dbcp")).getConnection();
+ final Statement stmt = con.createStatement();
+ try {
+ stmt.execute("drop table persons");
+ } catch (final SQLException sqle) {
+ // Nothing to do here, the table didn't exist
+ }
+
+ stmt.execute("create table persons (id integer, name varchar(100), code integer)");
+ Random rng = new Random(53496);
+ stmt.executeUpdate("insert into persons values (1, 'Joe Smith', " + rng.nextInt(469947) + ")");
+ for (int i = 2; i < NUM_OF_ROWS; i++) {
+ stmt.executeUpdate("insert into persons values (" + i + ", 'Someone Else', " + rng.nextInt(469947) + ")");
+ }
+ stmt.executeUpdate("insert into persons values (" + NUM_OF_ROWS + ", 'Last Person', NULL)");
+
+ LOGGER.info("test data loaded");
+
+ runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, query);
+ runner.setProperty(HIVEQL_OUTPUT_FORMAT, outputFormat);
+ if (preQueries != null) {
+ runner.setProperty(SelectHive3QL.HIVEQL_PRE_QUERY, preQueries);
+ }
+ if (postQueries != null) {
+ runner.setProperty(SelectHive3QL.HIVEQL_POST_QUERY, postQueries);
+ }
+
+ if (incomingFlowFile) {
+ // incoming FlowFile content is not used, but attributes are used
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("person.id", "10");
+ runner.enqueue("Hello".getBytes(), attributes);
+ }
+
+ runner.setIncomingConnection(incomingFlowFile);
+ runner.run();
+
+ return runner;
+ }
+
@Test
public void testMaxRowsPerFlowFileAvro() throws ClassNotFoundException, SQLException, InitializationException, IOException {
@@ -388,6 +501,10 @@ public class TestSelectHive3QL {
runner.run();
runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0);
+ // Assert the attributes from the incoming flow file are preserved in the outgoing flow file(s)
+ flowFile.assertAttributeEquals("hiveql.args.1.value", "1");
+ flowFile.assertAttributeEquals("hiveql.args.1.type", String.valueOf(Types.INTEGER));
runner.clearTransferState();
}
@@ -535,5 +652,4 @@ public class TestSelectHive3QL {
return "jdbc:derby:" + DB_LOCATION + ";create=true";
}
}
-
-}
+}
\ No newline at end of file