You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by mattyb149 <gi...@git.apache.org> on 2016/12/09 14:13:15 UTC

[GitHub] nifi pull request #1217: NIFI-3031 - Multi-Statement Script support for PutH...

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1217#discussion_r91722035
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java ---
    @@ -135,42 +197,100 @@
             return relationships;
         }
     
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +        // If the query is not set, then an incoming flow file is needed. Otherwise fail the initialization
    +        if (!context.getProperty(HIVEQL_SELECT_QUERY).isSet() && !context.hasIncomingConnection()) {
    +            final String errorString = "Either the Select Query must be specified or there must be an incoming connection "
    +                    + "providing flowfile(s) containing a SQL select query";
    +            getLogger().error(errorString);
    +            throw new ProcessException(errorString);
    +        }
    +    }
    +
         @Override
         public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    -        FlowFile fileToProcess = null;
    -        if (context.hasIncomingConnection()) {
    -            fileToProcess = session.get();
    +        final FlowFile fileToProcess = (context.hasIncomingConnection()? session.get():null);
    +        FlowFile flowfile = null;
     
    -            // If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
    -            // However, if we have no FlowFile and we have connections coming from other Processors, then
    -            // we know that we should run only if we have a FlowFile.
    +        // If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
    +        // However, if we have no FlowFile and we have connections coming from other Processors, then
    +        // we know that we should run only if we have a FlowFile.
    +        if (context.hasIncomingConnection()) {
                 if (fileToProcess == null && context.hasNonLoopConnection()) {
                     return;
                 }
             }
     
             final ComponentLog logger = getLogger();
             final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
    -        final String selectQuery = context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
    +        final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
    +
    +        final boolean flowbased = !(context.getProperty(HIVEQL_SELECT_QUERY).isSet());
    +
    +        // Source the SQL
    +        final String selectQuery;
    +
    +        if (context.getProperty(HIVEQL_SELECT_QUERY).isSet()) {
    +            selectQuery = context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
    +        } else {
    +            // If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
    +            // If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
    +            final StringBuilder queryContents = new StringBuilder();
    +            session.read(fileToProcess, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream in) throws IOException {
    +                    queryContents.append(IOUtils.toString(in));
    +                }
    +            });
    +            selectQuery = queryContents.toString();
    +        }
    +
    +
             final String outputFormat = context.getProperty(HIVEQL_OUTPUT_FORMAT).getValue();
             final StopWatch stopWatch = new StopWatch(true);
    +        final boolean header = context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
    +        final String altHeader = context.getProperty(HIVEQL_CSV_ALT_HEADER).evaluateAttributeExpressions(fileToProcess).getValue();
    +        final String delimiter = context.getProperty(HIVEQL_CSV_DELIMITER).evaluateAttributeExpressions(fileToProcess).getValue();
    +        final boolean quote = context.getProperty(HIVEQL_CSV_QUOTE).asBoolean();
    +        final boolean escape = context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
     
             try (final Connection con = dbcpService.getConnection();
    -             final Statement st = con.createStatement()) {
    +             final Statement st = ( flowbased ? con.prepareStatement(selectQuery): con.createStatement())
    --- End diff --
    
    Isn't it possible to specify a parameterized query in the Select Query property, expecting that each flow file has the appropriate attributes set? I'm wondering if there's a better check to know when to call prepareStatement vs createStatement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---