You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by aglotero <gi...@git.apache.org> on 2018/11/01 17:05:53 UTC
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user aglotero commented on a diff in the pull request:
https://github.com/apache/nifi/pull/3051#discussion_r230119046
--- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java ---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile inputFlowFile = null;
FlowFile fileToProcess = null;
+
+ Map<String, String> attributes = null;
+
if (context.hasIncomingConnection()) {
- fileToProcess = session.get();
+ inputFlowFile = session.get();
// If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
// However, if we have no FlowFile and we have connections coming from other Processors, then
// we know that we should run only if we have a FlowFile.
- if (fileToProcess == null && context.hasNonLoopConnection()) {
+ if (inputFlowFile == null && context.hasNonLoopConnection()) {
return;
}
+
+ attributes = inputFlowFile.getAttributes();
}
final ComponentLog logger = getLogger();
- final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
- final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+ final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+ final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue();
- final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+ final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+ final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
final StopWatch stopWatch = new StopWatch(true);
- if (fileToProcess == null) {
- fileToProcess = session.create();
+ if(inputFlowFile != null){
+ session.transfer(inputFlowFile, REL_ORIGINAL);
}
try {
// The documentation for the driver recommends the session remain open the entire time the processor is running
// and states that it is thread-safe. This is why connectionSession is not in a try-with-resources.
final Session connectionSession = cassandraSession.get();
- final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery);
+ final ResultSet resultSet;
+
+ if (queryTimeout > 0) {
+ resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS);
+ }else{
+ resultSet = connectionSession.execute(selectQuery);
+ }
+
final AtomicLong nrOfRows = new AtomicLong(0L);
- fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws IOException {
- try {
- logger.debug("Executing CQL query {}", new Object[]{selectQuery});
- final ResultSet resultSet;
- if (queryTimeout > 0) {
- resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
- if (AVRO_FORMAT.equals(outputFormat)) {
- nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS));
- } else if (JSON_FORMAT.equals(outputFormat)) {
- nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS));
- }
- } else {
- resultSet = queryFuture.getUninterruptibly();
- if (AVRO_FORMAT.equals(outputFormat)) {
- nrOfRows.set(convertToAvroStream(resultSet, out, 0, null));
- } else if (JSON_FORMAT.equals(outputFormat)) {
- nrOfRows.set(convertToJsonStream(resultSet, out, charset, 0, null));
+ do {
+ fileToProcess = session.create();
+
+ // Assuring that if we have an input FlowFile
+ // the generated output inherit the attributes
+ if(attributes != null){
+ fileToProcess = session.putAllAttributes(fileToProcess, attributes);
+ }
+
+ fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws IOException {
+ try {
+ logger.debug("Executing CQL query {}", new Object[]{selectQuery});
+ if (queryTimeout > 0) {
+ if (AVRO_FORMAT.equals(outputFormat)) {
+ nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+ out, queryTimeout, TimeUnit.MILLISECONDS));
+ } else if (JSON_FORMAT.equals(outputFormat)) {
+ nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+ out, charset, queryTimeout, TimeUnit.MILLISECONDS));
+ }
+ } else {
+ if (AVRO_FORMAT.equals(outputFormat)) {
+ nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+ out, 0, null));
+ } else if (JSON_FORMAT.equals(outputFormat)) {
+ nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+ out, charset, 0, null));
+ }
}
- }
- } catch (final TimeoutException | InterruptedException | ExecutionException e) {
- throw new ProcessException(e);
+ } catch (final TimeoutException | InterruptedException | ExecutionException e) {
+ throw new ProcessException(e);
+ }
}
- }
- });
+ });
+
- // set attribute how many rows were selected
- fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+ // set attribute how many rows were selected
+ fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
- // set mime.type based on output format
- fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(),
- JSON_FORMAT.equals(outputFormat) ? "application/json" : "application/avro-binary");
+ // set mime.type based on output format
+ fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(),
+ JSON_FORMAT.equals(outputFormat) ? "application/json" : "application/avro-binary");
- logger.info("{} contains {} Avro records; transferring to 'success'",
- new Object[]{fileToProcess, nrOfRows.get()});
- session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows",
- stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(fileToProcess, REL_SUCCESS);
+ logger.info("{} contains {} records; transferring to 'success'",
+ new Object[]{fileToProcess, nrOfRows.get()});
+ session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows",
+ stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(fileToProcess, REL_SUCCESS);
+ session.commit();
--- End diff --
Hi @mattyb149 @! Did you have a chance to look at my comments? What do you think?
---