You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by aglotero <gi...@git.apache.org> on 2018/11/01 17:05:53 UTC

[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...

Github user aglotero commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/3051#discussion_r230119046
  
    --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java ---
    @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) {
     
         @Override
         public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile inputFlowFile = null;
             FlowFile fileToProcess = null;
    +
    +        Map<String, String> attributes = null;
    +
             if (context.hasIncomingConnection()) {
    -            fileToProcess = session.get();
    +            inputFlowFile = session.get();
     
                 // If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
                 // However, if we have no FlowFile and we have connections coming from other Processors, then
                 // we know that we should run only if we have a FlowFile.
    -            if (fileToProcess == null && context.hasNonLoopConnection()) {
    +            if (inputFlowFile == null && context.hasNonLoopConnection()) {
                     return;
                 }
    +
    +            attributes = inputFlowFile.getAttributes();
             }
     
             final ComponentLog logger = getLogger();
    -        final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
    -        final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
    +        final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
    +        final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
             final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue();
    -        final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
    +        final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
    +        final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
             final StopWatch stopWatch = new StopWatch(true);
     
    -        if (fileToProcess == null) {
    -            fileToProcess = session.create();
    +        if(inputFlowFile != null){
    +            session.transfer(inputFlowFile, REL_ORIGINAL);
             }
     
             try {
                 // The documentation for the driver recommends the session remain open the entire time the processor is running
                 // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources.
                 final Session connectionSession = cassandraSession.get();
    -            final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery);
    +            final ResultSet resultSet;
    +
    +            if (queryTimeout > 0) {
    +                resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS);
    +            }else{
    +                resultSet = connectionSession.execute(selectQuery);
    +            }
    +
                 final AtomicLong nrOfRows = new AtomicLong(0L);
     
    -            fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
    -                @Override
    -                public void process(final OutputStream out) throws IOException {
    -                    try {
    -                        logger.debug("Executing CQL query {}", new Object[]{selectQuery});
    -                        final ResultSet resultSet;
    -                        if (queryTimeout > 0) {
    -                            resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
    -                            if (AVRO_FORMAT.equals(outputFormat)) {
    -                                nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS));
    -                            } else if (JSON_FORMAT.equals(outputFormat)) {
    -                                nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS));
    -                            }
    -                        } else {
    -                            resultSet = queryFuture.getUninterruptibly();
    -                            if (AVRO_FORMAT.equals(outputFormat)) {
    -                                nrOfRows.set(convertToAvroStream(resultSet, out, 0, null));
    -                            } else if (JSON_FORMAT.equals(outputFormat)) {
    -                                nrOfRows.set(convertToJsonStream(resultSet, out, charset, 0, null));
    +            do {
    +                fileToProcess = session.create();
    +
    +                // Assuring that if we have an input FlowFile
    +                // the generated output inherit the attributes
    +                if(attributes != null){
    +                    fileToProcess = session.putAllAttributes(fileToProcess, attributes);
    +                }
    +
    +                fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
    +                    @Override
    +                    public void process(final OutputStream out) throws IOException {
    +                        try {
    +                            logger.debug("Executing CQL query {}", new Object[]{selectQuery});
    +                            if (queryTimeout > 0) {
    +                                if (AVRO_FORMAT.equals(outputFormat)) {
    +                                    nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
    +                                            out, queryTimeout, TimeUnit.MILLISECONDS));
    +                                } else if (JSON_FORMAT.equals(outputFormat)) {
    +                                    nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
    +                                            out, charset, queryTimeout, TimeUnit.MILLISECONDS));
    +                                }
    +                            } else {
    +                                if (AVRO_FORMAT.equals(outputFormat)) {
    +                                    nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
    +                                            out, 0, null));
    +                                } else if (JSON_FORMAT.equals(outputFormat)) {
    +                                    nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
    +                                            out, charset, 0, null));
    +                                }
                                 }
    -                        }
     
    -                    } catch (final TimeoutException | InterruptedException | ExecutionException e) {
    -                        throw new ProcessException(e);
    +                        } catch (final TimeoutException | InterruptedException | ExecutionException e) {
    +                            throw new ProcessException(e);
    +                        }
                         }
    -                }
    -            });
    +                });
    +
     
    -            // set attribute how many rows were selected
    -            fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
    +                // set attribute how many rows were selected
    +                fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
     
    -            // set mime.type based on output format
    -            fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(),
    -                    JSON_FORMAT.equals(outputFormat) ? "application/json" : "application/avro-binary");
    +                // set mime.type based on output format
    +                fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(),
    +                        JSON_FORMAT.equals(outputFormat) ? "application/json" : "application/avro-binary");
     
    -            logger.info("{} contains {} Avro records; transferring to 'success'",
    -                    new Object[]{fileToProcess, nrOfRows.get()});
    -            session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows",
    -                    stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    -            session.transfer(fileToProcess, REL_SUCCESS);
    +                logger.info("{} contains {} records; transferring to 'success'",
    +                        new Object[]{fileToProcess, nrOfRows.get()});
    +                session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows",
    +                        stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    +                session.transfer(fileToProcess, REL_SUCCESS);
    +                session.commit();
    --- End diff --
    
    Hi @mattyb149 @! Did you have a chance to look at my comments? What do you think?


---