You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by vivekmuniyandi <gi...@git.apache.org> on 2018/05/03 03:28:53 UTC

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

GitHub user vivekmuniyandi opened a pull request:

    https://github.com/apache/nifi/pull/2671

    NiFi-5102 - Adding Processors for MarkLogic DB

    …tMarkLogic
    
    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    Yes, there is a JIRA ticket - https://issues.apache.org/jira/browse/NIFI-5102
    
    - [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    Yes
    
    - [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
    Yes
    
    - [x] Is your initial contribution a single, squashed commit?
    No
    
    ### For code changes:
    - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    Yes, but certain tests from other processors fail - such as from grpc processors were failing with an unapproved license error for auto-generated files in the target folder.
    
    - [x] Have you written or updated unit tests to verify your changes?
    Yes
    
    - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    Yes
    
    - [x] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    No
    - [x] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    Yes
    
    Have added a NOTICE file and a LICENSE file under the nifi-marklogic-bundle project
    
    - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    Yes
    
    ### For documentation related changes:
    - [x] Have you ensured that format looks appropriate for the output in which it is rendered?
    Yes
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/marklogic/nifi nifi-5102

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2671.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2671
    
----
commit 16f8ffef0e0a853535c5af63b7e28441b4674c47
Author: Vivek Siddharthan Muniyandi <vm...@...>
Date:   2018-05-03T03:01:25Z

    NIFI-5102 Adding MarkLogic DB NiFi processors - QueryMarkLogic and PutMarkLogic

commit e7b4bee69b6bd6a5274613ff884625a9d30b5ffb
Author: Vivek Siddharthan Muniyandi <vm...@...>
Date:   2018-05-03T03:26:14Z

    NIFI-5102 Added displaynames for Properties

----


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r191615452
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/NOTICE.txt ---
    @@ -0,0 +1,2106 @@
    +MarkLogic NiFi Processors
    +
    +
    +Copyright � 2018 MarkLogic Corporation.
    +
    +This project is licensed under the Apache License, Version 2.0 (the "License"); you may not use this project except in compliance with the License. You may obtain a copy of the License at
    +
    +http://www.apache.org/licenses/LICENSE-2.0 
    +
    +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
    +
    +
    +Please direct questions, comments and requests to fossreview@marklogic.com. 
    +
    +Open source software required to be made available under license is included herein. In the event you are unable to obtain a copy of such open source software, please contact fossreview@marklogic.com and a copy will be made available to you.
    +
    +
    +The following software may be included in this project (last updated May 1, 2018):
    +
    +Apache Commons Codec� 1.7 
    +Attribution Statements
    +http://commons.apache.org/codec/
    +
    +src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
    +contains test data from http://aspell.net/test/orig/batch0.tab.
    +Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
    +
    +The content of package org.apache.commons.codec.language.bm has been translated
    +from the original php source code available at http://stevemorse.org/phoneticinfo.htm
    +with permission from the original authors.
    +Original source copyright:
    +Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
    +
    +Copyright Statements
    +Copyright 2002-2016 The Apache Software Foundation
    +
    +License Text (http://spdx.org/licenses/Apache-2.0)
    +Made available under the Apache License 2.0. See Appendix for full text.
    +
    +Source materials are available for download at: http://commons.apache.org/proper/commons-codec/source-repository.html 
    +
    +
    +Apache Commons Lang� 3.4 
    +Attribution Statements
    +http://commons.apache.org/proper/commons-lang/
    +
    +This product includes software from the Spring Framework,
    +under the Apache License 2.0 (see: StringUtils.containsWhitespace())
    +
    +Copyright Statements
    +Copyright 2001-2017 The Apache Software Foundation
    +
    +License Text (http://spdx.org/licenses/Apache-2.0)
    +Made available under the Apache License 2.0. See Appendix for full text.
    +
    +Source materials are available for download at: https://github.com/apache/commons-lang 
    +
    +
    +Apache Commons Logging� 1.1.1 
    +Attribution Statements
    +http://commons.apache.org/logging
    +
    +Copyright Statements
    +Copyright 2003-2016 The Apache Software Foundation
    +
    +License Text (http://spdx.org/licenses/Apache-2.0)
    +Made available under the Apache License 2.0. See Appendix for full text.
    +
    +Source materials are available for download at: https://github.com/apache/commons-logging 
    +
    +
    +Apache Derby 10.13.1.1 
    --- End diff --
    
    i dont see apache derby in your nars.  do you?


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by vivekmuniyandi <gi...@git.apache.org>.
Github user vivekmuniyandi commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    I followed this link - https://cwiki.apache.org/confluence/display/NIFI/Contributor+Guide#ContributorGuide-Keepingyourfeaturebranchcurrent
    
    and looks like this pulled other's commits as well.


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    If you take a look here https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#core-properties-br and checkout the 'nifi.nar.library.directory' section you can see how someone could be guided to create a directory for custom nars, add that in there, and start up.
    
    Once we have support for extensions in the NiFi Registry this will be beautiful/easy.


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    @ryanjdew We have addressed Travis-CI related tests issues and the build has been stable now since.  It could break again as people add timing dependent tests that behave wildly different on slow environments but we'll see.
    
    This PR, assuming the L&N and such are sorted now, is just tricky because we need a committer with time to devote to learning MarkLogic enough to setup an environment and verify function or leverage a provided instance (not a sustainable model).  It is a good example where the limits of what the community can reasonably support comes into play.
    
    Now, this said this is probably a really cool and useful thing to offer folks and beneficial to both the NiFi userbase and MarkLogic user base.  This is why I suggested MarkLogic folks just hang onto this code in some public github repo and have it be ALv2.  They can publish their nars into maven central or wherever they do and provide instructions to it.  I'd be supportive, and I'd assume the community at large would, of having links to such extensions on the apache website.  This feels to me like the best tradeoff right now for all parties.



---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r191615429
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/NOTICE.txt ---
    @@ -0,0 +1,2106 @@
    +MarkLogic NiFi Processors
    +
    +
    +Copyright � 2018 MarkLogic Corporation.
    +
    +This project is licensed under the Apache License, Version 2.0 (the "License"); you may not use this project except in compliance with the License. You may obtain a copy of the License at
    +
    +http://www.apache.org/licenses/LICENSE-2.0 
    +
    +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
    +
    +
    +Please direct questions, comments and requests to fossreview@marklogic.com. 
    +
    +Open source software required to be made available under license is included herein. In the event you are unable to obtain a copy of such open source software, please contact fossreview@marklogic.com and a copy will be made available to you.
    +
    +
    +The following software may be included in this project (last updated May 1, 2018):
    +
    +Apache Commons Codec� 1.7 
    +Attribution Statements
    +http://commons.apache.org/codec/
    +
    +src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
    +contains test data from http://aspell.net/test/orig/batch0.tab.
    +Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
    +
    +The content of package org.apache.commons.codec.language.bm has been translated
    +from the original php source code available at http://stevemorse.org/phoneticinfo.htm
    +with permission from the original authors.
    +Original source copyright:
    +Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
    +
    +Copyright Statements
    +Copyright 2002-2016 The Apache Software Foundation
    +
    +License Text (http://spdx.org/licenses/Apache-2.0)
    +Made available under the Apache License 2.0. See Appendix for full text.
    +
    +Source materials are available for download at: http://commons.apache.org/proper/commons-codec/source-repository.html 
    +
    +
    +Apache Commons Lang� 3.4 
    +Attribution Statements
    +http://commons.apache.org/proper/commons-lang/
    +
    +This product includes software from the Spring Framework,
    +under the Apache License 2.0 (see: StringUtils.containsWhitespace())
    +
    +Copyright Statements
    +Copyright 2001-2017 The Apache Software Foundation
    +
    +License Text (http://spdx.org/licenses/Apache-2.0)
    +Made available under the Apache License 2.0. See Appendix for full text.
    +
    +Source materials are available for download at: https://github.com/apache/commons-lang 
    +
    +
    +Apache Commons Logging� 1.1.1 
    --- End diff --
    
    i dont see commons logging in your nars.  Where do you see it?


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    @vivekmuniyandi looks like you pulled a few other folks' commits in with your last push. Do this to clear that up:
    
    1. git checkout master
    2. git pull <apache remote here> master
    3. git checkout nifi-5102
    4. git rebase master
    5. git push marklogic --force nifi-5102
    
    You probably did a pull on master into nifi-5102. You want to avoid that for your own sanity's sake and use a rebase instead.


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    @vivekmuniyandi you have a merge conflict now.


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r185740310
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/QueryMarkLogic.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.datamovement.ExportListener;
    +import com.marklogic.client.ext.datamovement.job.SimpleQueryBatcherJob;
    +import com.marklogic.client.io.BytesHandle;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +
    +@Tags({"MarkLogic"})
    +@CapabilityDescription("Creates FlowFiles from batches of documents, matching the given criteria," +
    +    " retrieved from a MarkLogic server using the MarkLogic Data Movement SDK (DMSDK)")
    +public class QueryMarkLogic extends AbstractMarkLogicProcessor {
    +
    +    public static final PropertyDescriptor CONSISTENT_SNAPSHOT = new PropertyDescriptor.Builder()
    +        .name("Consistent snapshot")
    +        .displayName("Consistent snapshot")
    +        .defaultValue("true")
    +        .description("Boolean used to indicate that the matching documents were retrieved from a " +
    +            "consistent snapshot")
    +        .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor COLLECTIONS = new PropertyDescriptor.Builder()
    +        .name("Collections")
    +        .displayName("Collections")
    +        .description("Comma-separated list of collections to query from a MarkLogic server")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URIS_QUERY = new PropertyDescriptor.Builder()
    +        .name("URIs query")
    +        .displayName("URIs query")
    +        .description("CTS URI Query for retrieving documents from a MarkLogic server")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_PATTERN = new PropertyDescriptor.Builder()
    +        .name("URI pattern")
    +        .displayName("URI pattern")
    +        .description("URI pattern for retrieving documents from a MarkLogic server")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    protected static final Relationship SUCCESS = new Relationship.Builder()
    +        .name("SUCCESS")
    +        .description("All FlowFiles that are created from documents read from MarkLogic are routed to" +
    +            " this success relationship")
    +        .build();
    +
    +    @Override
    +    public void init(ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        List<PropertyDescriptor> list = new ArrayList<>();
    +        list.addAll(properties);
    +        list.add(CONSISTENT_SNAPSHOT);
    +        list.add(COLLECTIONS);
    +        list.add(URIS_QUERY);
    +        list.add(URI_PATTERN);
    +        properties = Collections.unmodifiableList(list);
    +        Set<Relationship> set = new HashSet<>();
    +        set.add(SUCCESS);
    +        relationships = Collections.unmodifiableSet(set);
    +    }
    +
    +    @Override
    +    public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
    +        SimpleQueryBatcherJob job = new SimpleQueryBatcherJob();
    +        job.setBatchSize(context.getProperty(BATCH_SIZE).asInteger());
    +        job.setThreadCount(context.getProperty(THREAD_COUNT).asInteger());
    +        job.setConsistentSnapshot(context.getProperty(CONSISTENT_SNAPSHOT).asBoolean());
    +
    +        String value = context.getProperty(COLLECTIONS).getValue();
    +        if (value != null) {
    +            job.setWhereCollections(value.split(","));
    +        }
    +
    +        job.setWhereUriPattern(context.getProperty(URI_PATTERN).getValue());
    +        job.setWhereUrisQuery(context.getProperty(URIS_QUERY).getValue());
    +
    +        // TODO Replace ExportListener with custom listener so that we can do a commit per batch
    --- End diff --
    
    I would strongly recommend doing this TODO. `GetMongo` follows a similar pattern and it really saves the user time and resources to have a batch of documents written instead of 1:1 with a flowfile. For the loads MarkLogic is known to handle, that could be a significant volume of flowfiles, especially if a user uses this to do the equivalent of a full table fetch.


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r185738926
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/AbstractMarkLogicProcessor.java ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.DatabaseClient;
    +import com.marklogic.nifi.controller.DatabaseClientService;
    +import com.marklogic.nifi.controller.DatabaseClientService;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * Defines common properties for MarkLogic processors.
    + */
    +public abstract class AbstractMarkLogicProcessor extends AbstractSessionFactoryProcessor {
    +
    +    protected List<PropertyDescriptor> properties;
    +    protected Set<Relationship> relationships;
    +
    +    // NiFi requires a validator for every property, even those that don't need any validation
    +    protected static Validator NO_VALIDATION_VALIDATOR = new Validator() {
    +        @Override
    +        public ValidationResult validate(String subject, String input, ValidationContext context) {
    +            return new ValidationResult.Builder().valid(true).build();
    +        }
    +    };
    +
    +    public static final PropertyDescriptor DATABASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("DatabaseClient Service")
    +        .displayName("DatabaseClient Service")
    +        .required(true)
    +        .description("The DatabaseClient Controller Service that provides the MarkLogic connection")
    +        .identifiesControllerService(DatabaseClientService.class)
    +        .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +        .name("Batch size")
    +        .displayName("Batch size")
    +        .required(true)
    +        .defaultValue("100")
    +        .description("The number of documents per batch - sets the batch size on the Batcher")
    +        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor THREAD_COUNT = new PropertyDescriptor.Builder()
    +        .name("Thread count")
    +        .displayName("Thread count")
    +        .required(true)
    +        .defaultValue("16")
    --- End diff --
    
    NiFi uses a lot of threads, so you might want to think about lowering this default so it doesn't risk getting greedy.


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r185741806
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/PutMarkLogic.java ---
    @@ -0,0 +1,382 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.datamovement.DataMovementManager;
    +import com.marklogic.client.datamovement.WriteBatcher;
    +import com.marklogic.client.datamovement.WriteEvent;
    +import com.marklogic.client.datamovement.impl.WriteEventImpl;
    +import com.marklogic.client.document.ServerTransform;
    +import com.marklogic.client.io.BytesHandle;
    +import com.marklogic.client.io.DocumentMetadataHandle;
    +import com.marklogic.client.io.Format;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +
    +/**
    + * The TriggerWhenEmpty annotation is used so that this processor has a chance to flush the WriteBatcher when no
    + * flowfiles are ready to be received.
    + */
    +@Tags({"MarkLogic"})
    +@CapabilityDescription("Write batches of FlowFiles as documents to a MarkLogic server using the " +
    +    "MarkLogic Data Movement SDK (DMSDK)")
    +@TriggerWhenEmpty
    +public class PutMarkLogic extends AbstractMarkLogicProcessor {
    +
    +    class FlowFileInfo {
    +        FlowFile flowFile;
    +        ProcessSession session;
    +        FlowFileInfo(FlowFile flowFile, ProcessSession session) {
    +            this.flowFile = flowFile;
    +            this.session = session;
    +        }
    +    }
    +    private Map<String, FlowFileInfo> URIFlowFileMap = new HashMap<>();
    +    public static final PropertyDescriptor COLLECTIONS = new PropertyDescriptor.Builder()
    --- End diff --
    
    Rule of thumb for all of these:
    
    1. Add an explicit call to `required(boolean)` so it's obvious what your intent is there.
    2. Switch `NO_VALIDATION_VALIDATOR` to Validator.VALID.


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r185738577
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/AbstractMarkLogicProcessor.java ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.DatabaseClient;
    +import com.marklogic.nifi.controller.DatabaseClientService;
    +import com.marklogic.nifi.controller.DatabaseClientService;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * Defines common properties for MarkLogic processors.
    + */
    +public abstract class AbstractMarkLogicProcessor extends AbstractSessionFactoryProcessor {
    +
    +    protected List<PropertyDescriptor> properties;
    +    protected Set<Relationship> relationships;
    +
    +    // NiFi requires a validator for every property, even those that don't need any validation
    +    protected static Validator NO_VALIDATION_VALIDATOR = new Validator() {
    --- End diff --
    
    `Validator.VALID` does the same thing.


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    as a good example where we should have though it through more as well we have some cool work done by the InfluxDB folks.  They're now wanting to improve it in https://github.com/apache/nifi/pull/2743
    
    But the reality is we just don't have people knowledgeable enough to do reliable code reviews/testing of this.



---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r191612335
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/NOTICE.txt ---
    @@ -0,0 +1,2106 @@
    +MarkLogic NiFi Processors
    +
    +
    +Copyright � 2018 MarkLogic Corporation.
    --- End diff --
    
    This notice file should be at the same location under META-INF just like this notice file is for the azure nar
    
    https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
    
    Also, it isn't Copyright MarkLogic if you're submitting this as a pull request to the Apache Software Foundation.  The notice header then should read
    ---
    nifi-marklogic-nar
    Copyright 2015-2018 The Apache Software Foundation
    
    This product includes software developed at
    The Apache Software Foundation (http://www.apache.org/).
    --
    
    Then be followed by that which is legally required by the binary dependencies of this nar bundle - no more no less.


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    team; given https://github.com/marklogic/nifi/releases could we consider closing this PR and keeping the MarkLogic artifact creation/maintenance something MarkLogic takes care of at this time? It is a perfectly fine model.  We could even create a nifi web page to point at vendor/other community managed/supported extensions possibly.


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by ryanjdew <gi...@git.apache.org>.
Github user ryanjdew commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    @joewitt I understand the concern. We would like to simplify the user experience of using NiFi with MarkLogic processors. We have a current model for creating and releasing NARs via a GitHub releases page, but would you happen to have a good example of a process using Maven publishing with NARs in NiFi?


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r185739906
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/QueryMarkLogic.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.datamovement.ExportListener;
    +import com.marklogic.client.ext.datamovement.job.SimpleQueryBatcherJob;
    +import com.marklogic.client.io.BytesHandle;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +
    +@Tags({"MarkLogic"})
    +@CapabilityDescription("Creates FlowFiles from batches of documents, matching the given criteria," +
    +    " retrieved from a MarkLogic server using the MarkLogic Data Movement SDK (DMSDK)")
    +public class QueryMarkLogic extends AbstractMarkLogicProcessor {
    --- End diff --
    
    You might want to think about setting `@InputRequirement` to ALLOWED so that you can add an incoming relationship for having the queries provided by flowfile properties or content. There are examples of how to do this in `GetMongo` and `ExecuteSQL`. When looking there, take note of the top of their `onTrigger` methods to see how the presence of a connection is detected.


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by vivekmuniyandi <gi...@git.apache.org>.
Github user vivekmuniyandi commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    @MikeThomsen Sure, will add that to our backlog. Thanks!


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r185740990
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/PutMarkLogic.java ---
    @@ -0,0 +1,382 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.datamovement.DataMovementManager;
    +import com.marklogic.client.datamovement.WriteBatcher;
    +import com.marklogic.client.datamovement.WriteEvent;
    +import com.marklogic.client.datamovement.impl.WriteEventImpl;
    +import com.marklogic.client.document.ServerTransform;
    +import com.marklogic.client.io.BytesHandle;
    +import com.marklogic.client.io.DocumentMetadataHandle;
    +import com.marklogic.client.io.Format;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +
    +/**
    + * The TriggerWhenEmpty annotation is used so that this processor has a chance to flush the WriteBatcher when no
    + * flowfiles are ready to be received.
    + */
    +@Tags({"MarkLogic"})
    +@CapabilityDescription("Write batches of FlowFiles as documents to a MarkLogic server using the " +
    +    "MarkLogic Data Movement SDK (DMSDK)")
    +@TriggerWhenEmpty
    +public class PutMarkLogic extends AbstractMarkLogicProcessor {
    +
    +    class FlowFileInfo {
    +        FlowFile flowFile;
    +        ProcessSession session;
    +        FlowFileInfo(FlowFile flowFile, ProcessSession session) {
    +            this.flowFile = flowFile;
    +            this.session = session;
    +        }
    +    }
    +    private Map<String, FlowFileInfo> URIFlowFileMap = new HashMap<>();
    +    public static final PropertyDescriptor COLLECTIONS = new PropertyDescriptor.Builder()
    +        .name("Collections")
    +        .displayName("Collections")
    +        .description("Comma-delimited sequence of collections to add to each document")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
    +        .name("Format")
    +        .displayName("Format")
    +        .description("Format for each document; if not specified, MarkLogic will determine the format" +
    +            " based on the URI")
    +        .allowableValues(Format.JSON.name(), Format.XML.name(), Format.TEXT.name(), Format.BINARY.name(), Format.UNKNOWN.name())
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor JOB_ID = new PropertyDescriptor.Builder()
    +        .name("Job ID")
    +        .displayName("Job ID")
    +        .description("ID for the WriteBatcher job")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder()
    +        .name("Job Name")
    +        .displayName("Job Name")
    +        .description("Name for the WriteBatcher job")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    --- End diff --
    
    Change to Validator.VALID


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r191613968
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/NOTICE.txt ---
    @@ -0,0 +1,2106 @@
    +MarkLogic NiFi Processors
    +
    +
    +Copyright � 2018 MarkLogic Corporation.
    +
    +This project is licensed under the Apache License, Version 2.0 (the "License"); you may not use this project except in compliance with the License. You may obtain a copy of the License at
    +
    +http://www.apache.org/licenses/LICENSE-2.0 
    +
    +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
    +
    +
    +Please direct questions, comments and requests to fossreview@marklogic.com. 
    +
    +Open source software required to be made available under license is included herein. In the event you are unable to obtain a copy of such open source software, please contact fossreview@marklogic.com and a copy will be made available to you.
    +
    +
    +The following software may be included in this project (last updated May 1, 2018):
    --- End diff --
    
    This line can be removed.  The NOTICE needs to reflect what is actually bundled when this NAR is built and should account for what is in it that required NOTICE entries no more no less.


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r185740483
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/QueryMarkLogic.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.datamovement.ExportListener;
    +import com.marklogic.client.ext.datamovement.job.SimpleQueryBatcherJob;
    +import com.marklogic.client.io.BytesHandle;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +
    +@Tags({"MarkLogic"})
    +@CapabilityDescription("Creates FlowFiles from batches of documents, matching the given criteria," +
    +    " retrieved from a MarkLogic server using the MarkLogic Data Movement SDK (DMSDK)")
    +public class QueryMarkLogic extends AbstractMarkLogicProcessor {
    +
    +    public static final PropertyDescriptor CONSISTENT_SNAPSHOT = new PropertyDescriptor.Builder()
    +        .name("Consistent snapshot")
    +        .displayName("Consistent snapshot")
    +        .defaultValue("true")
    +        .description("Boolean used to indicate that the matching documents were retrieved from a " +
    +            "consistent snapshot")
    +        .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor COLLECTIONS = new PropertyDescriptor.Builder()
    +        .name("Collections")
    +        .displayName("Collections")
    +        .description("Comma-separated list of collections to query from a MarkLogic server")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URIS_QUERY = new PropertyDescriptor.Builder()
    +        .name("URIs query")
    +        .displayName("URIs query")
    +        .description("CTS URI Query for retrieving documents from a MarkLogic server")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_PATTERN = new PropertyDescriptor.Builder()
    +        .name("URI pattern")
    +        .displayName("URI pattern")
    +        .description("URI pattern for retrieving documents from a MarkLogic server")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    protected static final Relationship SUCCESS = new Relationship.Builder()
    +        .name("SUCCESS")
    +        .description("All FlowFiles that are created from documents read from MarkLogic are routed to" +
    +            " this success relationship")
    +        .build();
    +
    +    @Override
    +    public void init(ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        List<PropertyDescriptor> list = new ArrayList<>();
    +        list.addAll(properties);
    +        list.add(CONSISTENT_SNAPSHOT);
    +        list.add(COLLECTIONS);
    +        list.add(URIS_QUERY);
    +        list.add(URI_PATTERN);
    +        properties = Collections.unmodifiableList(list);
    +        Set<Relationship> set = new HashSet<>();
    +        set.add(SUCCESS);
    +        relationships = Collections.unmodifiableSet(set);
    +    }
    +
    +    @Override
    +    public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
    +        SimpleQueryBatcherJob job = new SimpleQueryBatcherJob();
    +        job.setBatchSize(context.getProperty(BATCH_SIZE).asInteger());
    +        job.setThreadCount(context.getProperty(THREAD_COUNT).asInteger());
    +        job.setConsistentSnapshot(context.getProperty(CONSISTENT_SNAPSHOT).asBoolean());
    +
    +        String value = context.getProperty(COLLECTIONS).getValue();
    +        if (value != null) {
    +            job.setWhereCollections(value.split(","));
    +        }
    +
    +        job.setWhereUriPattern(context.getProperty(URI_PATTERN).getValue());
    +        job.setWhereUrisQuery(context.getProperty(URIS_QUERY).getValue());
    +
    +        // TODO Replace ExportListener with custom listener so that we can do a commit per batch
    +        ExportListener exportListener = new ExportListener();
    +        exportListener.onDocumentReady(documentRecord -> {
    +            final ProcessSession session = sessionFactory.createSession();
    +            try {
    +                FlowFile flowFile = session.create();
    +                flowFile = session.write(flowFile, out -> out.write(documentRecord.getContent(new BytesHandle()).get()));
    +                session.putAttribute(flowFile, "uri", documentRecord.getUri());
    +                session.transfer(flowFile, SUCCESS);
    +                session.commit();
    +                if (getLogger().isDebugEnabled()) {
    +                    getLogger().debug("Routing " + documentRecord.getUri() + " to " + SUCCESS.getName());
    +                }
    +            } catch (final Throwable t) {
    +                getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
    +                session.rollback(true);
    +                throw t;
    --- End diff --
    
    You should throw a `ProcessException` here instead.


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by ryanjdew <gi...@git.apache.org>.
Github user ryanjdew commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    @joewitt I appreciate the positive feedback on the process we've put in place for releasing our NiFi bundle.  I would still like to see this bundle make its way into an Apache NiFi release. I'll be taking on the responsibility for addressing issues in this PR.  After a review of the PR history, I believe previous issues referenced have been addressed. Please let me know if there are any other changes you'd like to see made.
    
    

Also, I've noticed that the Travis CI tests can be inconsistent at times. Is this a known issue and is there anything I can do to help the matter? 



---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    @vivekmuniyandi Unrelated, but I don't have your email: consider adding a MarkLogicLookupService in a future sprint. You can look at HBaseLookupService and MongoDBLookupService as examples. Might be highly useful to users to be able to enrich a record set using MarkLogic. I have PRs open for some LookupService-related tasks that add some additional schema-related capabilities and those might be useful to your team on that issue.


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by vivekmuniyandi <gi...@git.apache.org>.
Github user vivekmuniyandi commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    Thanks @MikeThomsen ! Have made the changes. 


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by vivekmuniyandi <gi...@git.apache.org>.
Github user vivekmuniyandi commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    Thanks @joewitt for the comment. I am addressing all the changes you have mentioned. I will address the SSLContext and remove the Kerberos and Certificate auth for now. 
    
    ``` The other thing that needs to happen is the nar bundles need their LICENSE/NOTICE file(s) added if necessary. I looked at one of the nars and there would definitely need to be entries.```
    
    Wrt this, I have a [LICENSE](https://github.com/apache/nifi/pull/2671/files#diff-53deed39bf31085fbecf77ea6a2382dc) and [NOTICE](https://github.com/apache/nifi/pull/2671/files#diff-a2f6b487a7a70d5f43fa320730b2c87a) file prepared by our legal team in the root directory (to account for the contents of the root directory and the sub directories) of the nifi marklogic bundle. That constitutes for all the dependecies added in our MarkLogic bundle. Should we do something more? Can you explain a bit more here as to what is required? 
    
    Thanks for all the help?


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    @vivekmuniyandi I'll try to find time to set up a MarkLogic node for testing some time in the next few days (day job and such is getting in the way). In the mean time, I would suggest reaching out to @joewitt directly to see if he or any of his folks have any time they can spare to jump in and help you out. Also, just a suggestion, you might want to think about setting up a secure cluster that you can privately allow reviewers to access so we/they can work with you to confirm everything works the way you expect.


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r191615698
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/NOTICE.txt ---
    @@ -0,0 +1,2106 @@
    +MarkLogic NiFi Processors
    +
    +
    +Copyright � 2018 MarkLogic Corporation.
    +
    +This project is licensed under the Apache License, Version 2.0 (the "License"); you may not use this project except in compliance with the License. You may obtain a copy of the License at
    +
    +http://www.apache.org/licenses/LICENSE-2.0 
    +
    +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
    +
    +
    +Please direct questions, comments and requests to fossreview@marklogic.com. 
    +
    +Open source software required to be made available under license is included herein. In the event you are unable to obtain a copy of such open source software, please contact fossreview@marklogic.com and a copy will be made available to you.
    +
    +
    +The following software may be included in this project (last updated May 1, 2018):
    +
    +Apache Commons Codec� 1.7 
    +Attribution Statements
    +http://commons.apache.org/codec/
    +
    +src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
    +contains test data from http://aspell.net/test/orig/batch0.tab.
    +Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
    +
    +The content of package org.apache.commons.codec.language.bm has been translated
    +from the original php source code available at http://stevemorse.org/phoneticinfo.htm
    +with permission from the original authors.
    +Original source copyright:
    +Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
    +
    +Copyright Statements
    +Copyright 2002-2016 The Apache Software Foundation
    +
    +License Text (http://spdx.org/licenses/Apache-2.0)
    +Made available under the Apache License 2.0. See Appendix for full text.
    +
    +Source materials are available for download at: http://commons.apache.org/proper/commons-codec/source-repository.html 
    +
    +
    +Apache Commons Lang� 3.4 
    +Attribution Statements
    +http://commons.apache.org/proper/commons-lang/
    +
    +This product includes software from the Spring Framework,
    +under the Apache License 2.0 (see: StringUtils.containsWhitespace())
    +
    +Copyright Statements
    +Copyright 2001-2017 The Apache Software Foundation
    +
    +License Text (http://spdx.org/licenses/Apache-2.0)
    +Made available under the Apache License 2.0. See Appendix for full text.
    +
    +Source materials are available for download at: https://github.com/apache/commons-lang 
    +
    +
    +Apache Commons Logging� 1.1.1 
    +Attribution Statements
    +http://commons.apache.org/logging
    +
    +Copyright Statements
    +Copyright 2003-2016 The Apache Software Foundation
    +
    +License Text (http://spdx.org/licenses/Apache-2.0)
    +Made available under the Apache License 2.0. See Appendix for full text.
    +
    +Source materials are available for download at: https://github.com/apache/commons-logging 
    +
    +
    +Apache Derby 10.13.1.1 
    +Attribution Statements
    +https://db.apache.org/derby/releases/release-10.13.1.1.cgi
    +
    +Portions of Derby were originally developed by
    +International Business Machines Corporation and are
    +licensed to the Apache Software Foundation under the
    +"Software Grant and Corporate Contribution License Agreement",
    +informally known as the "Derby CLA".
    +The following copyright notice(s) were affixed to portions of the code
    +with which this file is now or was at one time distributed
    +and are placed here unaltered.
    +
    +(C) Copyright 1997,2004 International Business Machines Corporation.  All rights reserved.
    +
    +(C) Copyright IBM Corp. 2003. 
    +
    +
    +=========================================================================
    +
    +
    +The portion of the functionTests under 'nist' was originally 
    +developed by the National Institute of Standards and Technology (NIST), 
    +an agency of the United States Department of Commerce, and adapted by
    +International Business Machines Corporation in accordance with the NIST
    +Software Acknowledgment and Redistribution document at
    +http://www.itl.nist.gov/div897/ctg/sql_form.htm
    +
    +
    +
    +=========================================================================
    +
    +
    +Derby uses the  SerialBlob and SerialClob implementations from the Apache
    +Harmony project. The following notice covers the Harmony sources:
    +
    +Portions of Harmony were originally developed by
    +Intel Corporation and are licensed to the Apache Software
    +Foundation under the "Software Grant and Corporate Contribution
    +License Agreement", informally known as the "Intel Harmony CLA".
    +
    +
    +=========================================================================
    +
    +
    +The Derby build relies on source files supplied by the Apache Felix
    +project. The following notice covers the Felix files:
    +
    +  Apache Felix Main
    +  Copyright 2008 The Apache Software Foundation
    +
    +
    +  I. Included Software
    +
    +  This product includes software developed at
    +  The Apache Software Foundation (http://www.apache.org/).
    +  Licensed under the Apache License 2.0.
    +
    +  This product includes software developed at
    +  The OSGi Alliance (http://www.osgi.org/).
    +  Copyright (c) OSGi Alliance (2000, 2007).
    +  Licensed under the Apache License 2.0.
    +
    +  This product includes software from http://kxml.sourceforge.net.
    +  Copyright (c) 2002,2003, Stefan Haustein, Oberhausen, Rhld., Germany.
    +  Licensed under BSD License.
    +
    +  II. Used Software
    +
    +  This product uses software developed at
    +  The OSGi Alliance (http://www.osgi.org/).
    +  Copyright (c) OSGi Alliance (2000, 2007).
    +  Licensed under the Apache License 2.0.
    +
    +
    +  III. License Summary
    +  - Apache License 2.0
    +  - BSD License
    +
    +
    +=========================================================================
    +
    +
    +The Derby build relies on jar files supplied by the Apache Lucene
    +project. The following notice covers the Lucene files:
    +
    +Apache Lucene
    +Copyright 2013 The Apache Software Foundation
    --- End diff --
    
    ok can you please dramatically reduce the size of this NOTICE following the above advice.
    
    1) Only include reference to things actually bundled within a given nar.  No more.
    2) Only include that which is necessary and consistent with other Nars.
    3) Do not include reference to other nifi things.  It is part of the nifi project/source so we're good.
    



---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r185741393
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/PutMarkLogic.java ---
    @@ -0,0 +1,382 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.datamovement.DataMovementManager;
    +import com.marklogic.client.datamovement.WriteBatcher;
    +import com.marklogic.client.datamovement.WriteEvent;
    +import com.marklogic.client.datamovement.impl.WriteEventImpl;
    +import com.marklogic.client.document.ServerTransform;
    +import com.marklogic.client.io.BytesHandle;
    +import com.marklogic.client.io.DocumentMetadataHandle;
    +import com.marklogic.client.io.Format;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +
    +/**
    + * The TriggerWhenEmpty annotation is used so that this processor has a chance to flush the WriteBatcher when no
    + * flowfiles are ready to be received.
    + */
    +@Tags({"MarkLogic"})
    +@CapabilityDescription("Write batches of FlowFiles as documents to a MarkLogic server using the " +
    +    "MarkLogic Data Movement SDK (DMSDK)")
    +@TriggerWhenEmpty
    +public class PutMarkLogic extends AbstractMarkLogicProcessor {
    +
    +    class FlowFileInfo {
    +        FlowFile flowFile;
    +        ProcessSession session;
    +        FlowFileInfo(FlowFile flowFile, ProcessSession session) {
    +            this.flowFile = flowFile;
    +            this.session = session;
    +        }
    +    }
    +    private Map<String, FlowFileInfo> URIFlowFileMap = new HashMap<>();
    +    public static final PropertyDescriptor COLLECTIONS = new PropertyDescriptor.Builder()
    +        .name("Collections")
    +        .displayName("Collections")
    +        .description("Comma-delimited sequence of collections to add to each document")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
    +        .name("Format")
    +        .displayName("Format")
    +        .description("Format for each document; if not specified, MarkLogic will determine the format" +
    +            " based on the URI")
    +        .allowableValues(Format.JSON.name(), Format.XML.name(), Format.TEXT.name(), Format.BINARY.name(), Format.UNKNOWN.name())
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor JOB_ID = new PropertyDescriptor.Builder()
    +        .name("Job ID")
    +        .displayName("Job ID")
    +        .description("ID for the WriteBatcher job")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder()
    +        .name("Job Name")
    +        .displayName("Job Name")
    +        .description("Name for the WriteBatcher job")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor MIMETYPE = new PropertyDescriptor.Builder()
    +        .name("MIME type")
    +        .displayName("MIME type")
    +        .description("MIME type for each document; if not specified, MarkLogic will determine the " +
    +            "MIME type based on the URI")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor PERMISSIONS = new PropertyDescriptor.Builder()
    +        .name("Permissions")
    +        .displayName("Permissions")
    +        .defaultValue("rest-reader,read,rest-writer,update")
    +        .description("Comma-delimited sequence of permissions - role1, capability1, role2, " +
    +            "capability2 - to add to each document")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TEMPORAL_COLLECTION = new PropertyDescriptor.Builder()
    +        .name("Temporal collection")
    +        .displayName("Temporal collection")
    +        .description("The temporal collection to use for a temporal document insert")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    --- End diff --
    
    This should use StandardValidators.NON_BLANK_VALIDATOR if you need it to be present. If not, use Validator.VALID.


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r185742052
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/PutMarkLogic.java ---
    @@ -0,0 +1,382 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.datamovement.DataMovementManager;
    +import com.marklogic.client.datamovement.WriteBatcher;
    +import com.marklogic.client.datamovement.WriteEvent;
    +import com.marklogic.client.datamovement.impl.WriteEventImpl;
    +import com.marklogic.client.document.ServerTransform;
    +import com.marklogic.client.io.BytesHandle;
    +import com.marklogic.client.io.DocumentMetadataHandle;
    +import com.marklogic.client.io.Format;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +
    +/**
    + * The TriggerWhenEmpty annotation is used so that this processor has a chance to flush the WriteBatcher when no
    + * flowfiles are ready to be received.
    + */
    +@Tags({"MarkLogic"})
    +@CapabilityDescription("Write batches of FlowFiles as documents to a MarkLogic server using the " +
    +    "MarkLogic Data Movement SDK (DMSDK)")
    +@TriggerWhenEmpty
    +public class PutMarkLogic extends AbstractMarkLogicProcessor {
    +
    +    class FlowFileInfo {
    +        FlowFile flowFile;
    +        ProcessSession session;
    +        FlowFileInfo(FlowFile flowFile, ProcessSession session) {
    +            this.flowFile = flowFile;
    +            this.session = session;
    +        }
    +    }
    +    private Map<String, FlowFileInfo> URIFlowFileMap = new HashMap<>();
    +    public static final PropertyDescriptor COLLECTIONS = new PropertyDescriptor.Builder()
    +        .name("Collections")
    +        .displayName("Collections")
    +        .description("Comma-delimited sequence of collections to add to each document")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
    +        .name("Format")
    +        .displayName("Format")
    +        .description("Format for each document; if not specified, MarkLogic will determine the format" +
    +            " based on the URI")
    +        .allowableValues(Format.JSON.name(), Format.XML.name(), Format.TEXT.name(), Format.BINARY.name(), Format.UNKNOWN.name())
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor JOB_ID = new PropertyDescriptor.Builder()
    +        .name("Job ID")
    +        .displayName("Job ID")
    +        .description("ID for the WriteBatcher job")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder()
    +        .name("Job Name")
    +        .displayName("Job Name")
    +        .description("Name for the WriteBatcher job")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor MIMETYPE = new PropertyDescriptor.Builder()
    +        .name("MIME type")
    +        .displayName("MIME type")
    +        .description("MIME type for each document; if not specified, MarkLogic will determine the " +
    +            "MIME type based on the URI")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor PERMISSIONS = new PropertyDescriptor.Builder()
    +        .name("Permissions")
    +        .displayName("Permissions")
    +        .defaultValue("rest-reader,read,rest-writer,update")
    +        .description("Comma-delimited sequence of permissions - role1, capability1, role2, " +
    +            "capability2 - to add to each document")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TEMPORAL_COLLECTION = new PropertyDescriptor.Builder()
    +        .name("Temporal collection")
    +        .displayName("Temporal collection")
    +        .description("The temporal collection to use for a temporal document insert")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TRANSFORM = new PropertyDescriptor.Builder()
    +        .name("Server transform")
    +        .displayName("Server transform")
    +        .description("(Optional) The name of REST server transform to apply to every document as it's" +
    +            " written")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
    +        .name("URI attribute name")
    +        .displayName("URI attribute name")
    +        .defaultValue("uuid")
    +        .required(true)
    +        .description("The name of the FlowFile attribute whose value will be used as the URI")
    +        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_PREFIX = new PropertyDescriptor.Builder()
    +        .name("URI prefix")
    +        .displayName("URI prefix")
    +        .description("(Optional) The prefix to prepend to each URI")
    --- End diff --
    
    Remove `(Optional)` and replace with an explicit `required(boolean)` declaration.


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r191616334
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/AbstractMarkLogicProcessor.java ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.DatabaseClient;
    +import com.marklogic.nifi.controller.MarkLogicDatabaseClientService;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * Defines common properties for MarkLogic processors.
    + */
    +public abstract class AbstractMarkLogicProcessor extends AbstractSessionFactoryProcessor {
    +
    +    protected List<PropertyDescriptor> properties;
    +    protected Set<Relationship> relationships;
    +
    +    public static final PropertyDescriptor DATABASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("DatabaseClient Service")
    +        .displayName("DatabaseClient Service")
    +        .required(true)
    +        .description("The DatabaseClient Controller Service that provides the MarkLogic connection")
    +        .identifiesControllerService(MarkLogicDatabaseClientService.class)
    +        .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +        .name("Batch size")
    +        .displayName("Batch size")
    +        .required(true)
    +        .defaultValue("100")
    +        .description("The number of documents per batch - sets the batch size on the Batcher")
    +        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor THREAD_COUNT = new PropertyDescriptor.Builder()
    +        .name("Thread count")
    +        .displayName("Thread count")
    +        .required(false)
    +        .description("The number of threads - sets the thread count on the Batcher")
    +        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    --- End diff --
    
    probably makes sense to include a default value here for people to undersatnd how many threads this will have by default


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r185745410
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/PutMarkLogic.java ---
    @@ -0,0 +1,382 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.datamovement.DataMovementManager;
    +import com.marklogic.client.datamovement.WriteBatcher;
    +import com.marklogic.client.datamovement.WriteEvent;
    +import com.marklogic.client.datamovement.impl.WriteEventImpl;
    +import com.marklogic.client.document.ServerTransform;
    +import com.marklogic.client.io.BytesHandle;
    +import com.marklogic.client.io.DocumentMetadataHandle;
    +import com.marklogic.client.io.Format;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +
    +/**
    + * The TriggerWhenEmpty annotation is used so that this processor has a chance to flush the WriteBatcher when no
    + * flowfiles are ready to be received.
    + */
    +@Tags({"MarkLogic"})
    +@CapabilityDescription("Write batches of FlowFiles as documents to a MarkLogic server using the " +
    +    "MarkLogic Data Movement SDK (DMSDK)")
    +@TriggerWhenEmpty
    +public class PutMarkLogic extends AbstractMarkLogicProcessor {
    +
    +    class FlowFileInfo {
    +        FlowFile flowFile;
    +        ProcessSession session;
    +        FlowFileInfo(FlowFile flowFile, ProcessSession session) {
    +            this.flowFile = flowFile;
    +            this.session = session;
    +        }
    +    }
    +    private Map<String, FlowFileInfo> URIFlowFileMap = new HashMap<>();
    +    public static final PropertyDescriptor COLLECTIONS = new PropertyDescriptor.Builder()
    +        .name("Collections")
    +        .displayName("Collections")
    +        .description("Comma-delimited sequence of collections to add to each document")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
    +        .name("Format")
    +        .displayName("Format")
    +        .description("Format for each document; if not specified, MarkLogic will determine the format" +
    +            " based on the URI")
    +        .allowableValues(Format.JSON.name(), Format.XML.name(), Format.TEXT.name(), Format.BINARY.name(), Format.UNKNOWN.name())
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor JOB_ID = new PropertyDescriptor.Builder()
    +        .name("Job ID")
    +        .displayName("Job ID")
    +        .description("ID for the WriteBatcher job")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder()
    +        .name("Job Name")
    +        .displayName("Job Name")
    +        .description("Name for the WriteBatcher job")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor MIMETYPE = new PropertyDescriptor.Builder()
    +        .name("MIME type")
    +        .displayName("MIME type")
    +        .description("MIME type for each document; if not specified, MarkLogic will determine the " +
    +            "MIME type based on the URI")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor PERMISSIONS = new PropertyDescriptor.Builder()
    +        .name("Permissions")
    +        .displayName("Permissions")
    +        .defaultValue("rest-reader,read,rest-writer,update")
    +        .description("Comma-delimited sequence of permissions - role1, capability1, role2, " +
    +            "capability2 - to add to each document")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TEMPORAL_COLLECTION = new PropertyDescriptor.Builder()
    +        .name("Temporal collection")
    +        .displayName("Temporal collection")
    +        .description("The temporal collection to use for a temporal document insert")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TRANSFORM = new PropertyDescriptor.Builder()
    +        .name("Server transform")
    +        .displayName("Server transform")
    +        .description("(Optional) The name of REST server transform to apply to every document as it's" +
    +            " written")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
    +        .name("URI attribute name")
    +        .displayName("URI attribute name")
    +        .defaultValue("uuid")
    +        .required(true)
    +        .description("The name of the FlowFile attribute whose value will be used as the URI")
    +        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_PREFIX = new PropertyDescriptor.Builder()
    +        .name("URI prefix")
    +        .displayName("URI prefix")
    +        .description("(Optional) The prefix to prepend to each URI")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_SUFFIX = new PropertyDescriptor.Builder()
    +        .name("URI suffix")
    +        .displayName("URI suffix")
    +        .description("(Optional) The suffix to append to each URI")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    protected static final Relationship SUCCESS = new Relationship.Builder()
    +        .name("SUCCESS")
    +        .description("All FlowFiles that are successfully written to MarkLogic are routed to the " +
    +            "success relationship for future processing")
    +        .build();
    +
    +    protected static final Relationship FAILURE = new Relationship.Builder()
    +        .name("FAILURE")
    +        .description("All FlowFiles that failed to be written to MarkLogic are routed to the " +
    +            "failure relationship for future processing")
    +        .build();
    +
    +    private DataMovementManager dataMovementManager;
    +    private WriteBatcher writeBatcher;
    +    // If no FlowFile exists when this processor is triggered, this variable determines whether or not a call is made to
    +    // flush the WriteBatcher
    +    private boolean shouldFlushIfEmpty = true;
    +
    +    @Override
    +    public void init(ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        List<PropertyDescriptor> list = new ArrayList<>();
    +        list.addAll(properties);
    +        list.add(COLLECTIONS);
    +        list.add(FORMAT);
    +        list.add(JOB_ID);
    +        list.add(JOB_NAME);
    +        list.add(MIMETYPE);
    +        list.add(PERMISSIONS);
    +        list.add(TRANSFORM);
    +        list.add(TEMPORAL_COLLECTION);
    +        list.add(URI_ATTRIBUTE_NAME);
    +        list.add(URI_PREFIX);
    +        list.add(URI_SUFFIX);
    +        properties = Collections.unmodifiableList(list);
    +        Set<Relationship> set = new HashSet<>();
    +        set.add(SUCCESS);
    +        set.add(FAILURE);
    +        relationships = Collections.unmodifiableSet(set);
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        dataMovementManager = getDatabaseClient(context).newDataMovementManager();
    +        writeBatcher = dataMovementManager.newWriteBatcher()
    +            .withJobId(context.getProperty(JOB_ID).getValue())
    +            .withJobName(context.getProperty(JOB_NAME).getValue())
    +            .withBatchSize(context.getProperty(BATCH_SIZE).asInteger())
    +            .withThreadCount(context.getProperty(THREAD_COUNT).asInteger())
    +            .withTemporalCollection(context.getProperty(TEMPORAL_COLLECTION).getValue());
    +
    +        final String transform = context.getProperty(TRANSFORM).getValue();
    +        if (transform != null) {
    +            writeBatcher.withTransform(new ServerTransform(transform));
    +        }
    +        this.writeBatcher.onBatchSuccess(writeBatch -> {
    +            for(WriteEvent writeEvent : writeBatch.getItems()) {
    +                routeDocumentToRelationship(writeEvent, SUCCESS);
    +            }
    +        }).onBatchFailure((writeBatch, throwable) -> {
    +            for(WriteEvent writeEvent : writeBatch.getItems()) {
    +                routeDocumentToRelationship(writeEvent, FAILURE);
    +            }
    +        });
    +        dataMovementManager.startJob(writeBatcher);
    +    }
    +
    +    private void routeDocumentToRelationship(WriteEvent writeEvent, Relationship relationship) {
    +        DocumentMetadataHandle metadata = (DocumentMetadataHandle) writeEvent.getMetadata();
    +        String flowFileUUID = metadata.getMetadataValues().get("flowFileUUID");
    +        FlowFileInfo flowFile = URIFlowFileMap.get(flowFileUUID);
    +        if(flowFile != null) {
    --- End diff --
    
    Have you tried this with a very large data set to see how it performs? Like millions of sample XML records?


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r186576405
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/PutMarkLogic.java ---
    @@ -0,0 +1,396 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.datamovement.DataMovementManager;
    +import com.marklogic.client.datamovement.WriteBatcher;
    +import com.marklogic.client.datamovement.WriteEvent;
    +import com.marklogic.client.datamovement.impl.WriteEventImpl;
    +import com.marklogic.client.document.ServerTransform;
    +import com.marklogic.client.io.BytesHandle;
    +import com.marklogic.client.io.DocumentMetadataHandle;
    +import com.marklogic.client.io.Format;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +
    +/**
    + * The TriggerWhenEmpty annotation is used so that this processor has a chance to flush the WriteBatcher when no
    + * flowfiles are ready to be received.
    + */
    +@Tags({"MarkLogic", "Put", "Write", "Insert"})
    +@CapabilityDescription("Write batches of FlowFiles as documents to a MarkLogic server using the " +
    +    "MarkLogic Data Movement SDK (DMSDK)")
    +@TriggerWhenEmpty
    +public class PutMarkLogic extends AbstractMarkLogicProcessor {
    +
    +    class FlowFileInfo {
    +        FlowFile flowFile;
    +        ProcessSession session;
    +        FlowFileInfo(FlowFile flowFile, ProcessSession session) {
    +            this.flowFile = flowFile;
    +            this.session = session;
    +        }
    +    }
    +    private Map<String, FlowFileInfo> uriFlowFileMap = new HashMap<>();
    +    public static final PropertyDescriptor COLLECTIONS = new PropertyDescriptor.Builder()
    +        .name("Collections")
    +        .displayName("Collections")
    +        .description("Comma-delimited sequence of collections to add to each document")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
    +        .name("Format")
    +        .displayName("Format")
    +        .description("Format for each document; if not specified, MarkLogic will determine the format" +
    +            " based on the URI")
    +        .allowableValues(Format.JSON.name(), Format.XML.name(), Format.TEXT.name(), Format.BINARY.name(), Format.UNKNOWN.name())
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    public static final PropertyDescriptor JOB_ID = new PropertyDescriptor.Builder()
    +        .name("Job ID")
    +        .displayName("Job ID")
    +        .description("ID for the WriteBatcher job")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    public static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder()
    +        .name("Job Name")
    +        .displayName("Job Name")
    +        .description("Name for the WriteBatcher job")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    public static final PropertyDescriptor MIMETYPE = new PropertyDescriptor.Builder()
    +        .name("MIME type")
    +        .displayName("MIME type")
    +        .description("MIME type for each document; if not specified, MarkLogic will determine the " +
    +            "MIME type based on the URI")
    +        .addValidator(Validator.VALID)
    +        .required(false)
    +        .build();
    +
    +    public static final PropertyDescriptor PERMISSIONS = new PropertyDescriptor.Builder()
    +        .name("Permissions")
    +        .displayName("Permissions")
    +        .defaultValue("rest-reader,read,rest-writer,update")
    +        .description("Comma-delimited sequence of permissions - role1, capability1, role2, " +
    +            "capability2 - to add to each document")
    +        .addValidator(Validator.VALID)
    +        .required(false)
    +        .build();
    +
    +    public static final PropertyDescriptor TEMPORAL_COLLECTION = new PropertyDescriptor.Builder()
    +        .name("Temporal collection")
    +        .displayName("Temporal collection")
    +        .description("The temporal collection to use for a temporal document insert")
    +        .addValidator(Validator.VALID)
    +        .required(false)
    +        .build();
    +
    +    public static final PropertyDescriptor TRANSFORM = new PropertyDescriptor.Builder()
    +        .name("Server transform")
    +        .displayName("Server transform")
    +        .description("The name of REST server transform to apply to every document as it's" +
    +            " written")
    +        .addValidator(Validator.VALID)
    +        .required(false)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
    +        .name("URI attribute name")
    +        .displayName("URI attribute name")
    +        .defaultValue("uuid")
    +        .required(true)
    +        .description("The name of the FlowFile attribute whose value will be used as the URI")
    +        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_PREFIX = new PropertyDescriptor.Builder()
    +        .name("URI prefix")
    +        .displayName("URI prefix")
    +        .description("The prefix to prepend to each URI")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_SUFFIX = new PropertyDescriptor.Builder()
    +        .name("URI suffix")
    +        .displayName("URI suffix")
    +        .description("The suffix to append to each URI")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    protected static final Relationship SUCCESS = new Relationship.Builder()
    +        .name("SUCCESS")
    +        .description("All FlowFiles that are successfully written to MarkLogic are routed to the " +
    +            "success relationship for future processing.")
    +        .build();
    +
    +    protected static final Relationship FAILURE = new Relationship.Builder()
    +        .name("FAILURE")
    +        .description("All FlowFiles that failed to be written to MarkLogic are routed to the " +
    +            "failure relationship for future processing.")
    +        .build();
    +
    +    private DataMovementManager dataMovementManager;
    +    private WriteBatcher writeBatcher;
    +    // If no FlowFile exists when this processor is triggered, this variable determines whether or not a call is made to
    +    // flush the WriteBatcher
    +    private boolean shouldFlushIfEmpty = true;
    --- End diff --
    
    It appears this member variable could benefit from being 'volatile'.  This is true for the writeBatcher and possibly dataMovementManager as well.


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by vivekmuniyandi <gi...@git.apache.org>.
Github user vivekmuniyandi commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    @MikeThomsen We don't have a public MarkLogic Docker image but we do have [this](https://hub.docker.com/r/patrickmcelwee/marklogic-dependencies/) on Docker Hub which would give you a head start on having a MarkLogic instance up and running. 
    
    I will drop them an email and I will work on getting a secure access to the cluster. Thanks for all the help. 


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r186803533
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/pom.xml ---
    @@ -0,0 +1,88 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-marklogic-bundle</artifactId>
    +        <version>1.7.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-marklogic-processors</artifactId>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-api</artifactId>
    +            <version>${nifi.version}</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-utils</artifactId>
    +            <version>${nifi.version}</version>
    --- End diff --
    
    i'm editing this and other findings on a local branch.  I'll share them in a patch


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r191612648
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/NOTICE.txt ---
    @@ -0,0 +1,2106 @@
    +MarkLogic NiFi Processors
    +
    +
    +Copyright � 2018 MarkLogic Corporation.
    +
    +This project is licensed under the Apache License, Version 2.0 (the "License"); you may not use this project except in compliance with the License. You may obtain a copy of the License at
    +
    +http://www.apache.org/licenses/LICENSE-2.0 
    +
    +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
    +
    +
    +Please direct questions, comments and requests to fossreview@marklogic.com. 
    --- End diff --
    
    This is not appropriate for a NOTICE within an Apache project.  Same applies to the above statement.  If these are important to Marklogic it is important to consider that you don't have to contribute your code/Nar to the apache project.  You could simply cancel the PR and JIRA and have this code available on your own company website/etc..
    
    Finally, consider that all contributions to Apache projects are as individuals.  This is not a contribution of Marklogic the corporation or else we cannot accept it.


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    @vivekmuniyandi I'll try to build a MarkLogic Docker image and share it on Docker Hub so others can use that if they want.


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    BTW, I would strongly recommend your team discuss adding a `PutMarkLogicRecord` processor so you can do a bulk ingestion invent from a single flowfile. We have quite a few good implementations such as ones for HBase, ElasticSearch and MongoDB that you can use/steal from to make it happen. Would **strongly** recommend you do that because it'll make bulk ingestion of very large data sets go much faster for MarkLogic. If you want to do that, feel free to just start work on it and push changes into this PR and we'll just keep going.


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r191615146
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/NOTICE.txt ---
    @@ -0,0 +1,2106 @@
    +MarkLogic NiFi Processors
    +
    +
    +Copyright � 2018 MarkLogic Corporation.
    +
    +This project is licensed under the Apache License, Version 2.0 (the "License"); you may not use this project except in compliance with the License. You may obtain a copy of the License at
    +
    +http://www.apache.org/licenses/LICENSE-2.0 
    +
    +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
    +
    +
    +Please direct questions, comments and requests to fossreview@marklogic.com. 
    +
    +Open source software required to be made available under license is included herein. In the event you are unable to obtain a copy of such open source software, please contact fossreview@marklogic.com and a copy will be made available to you.
    +
    +
    +The following software may be included in this project (last updated May 1, 2018):
    +
    +Apache Commons Codec� 1.7 
    --- End diff --
    
    Please model all of these dependency references off examples you can find in the existing source tree.  I dont see any new dependencies here that you would have trouble finding examples of how to reference.  If you have questions on how to find any of these please let me know.  Formatting is important to help this be readable/consistent/maintainable.


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by ryanjdew <gi...@git.apache.org>.
Github user ryanjdew commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    Following up, are there any other concerns with this PR? If needed, I can provide credentials to a MarkLogic instance for testing.


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r191612470
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/NOTICE.txt ---
    @@ -0,0 +1,2106 @@
    +MarkLogic NiFi Processors
    +
    +
    +Copyright � 2018 MarkLogic Corporation.
    +
    +This project is licensed under the Apache License, Version 2.0 (the "License"); you may not use this project except in compliance with the License. You may obtain a copy of the License at
    --- End diff --
    
    Please see other NOTICE files for example.  https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
    
    All of this ALv2 reference information does not belong here.


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r185740763
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/PutMarkLogic.java ---
    @@ -0,0 +1,382 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.datamovement.DataMovementManager;
    +import com.marklogic.client.datamovement.WriteBatcher;
    +import com.marklogic.client.datamovement.WriteEvent;
    +import com.marklogic.client.datamovement.impl.WriteEventImpl;
    +import com.marklogic.client.document.ServerTransform;
    +import com.marklogic.client.io.BytesHandle;
    +import com.marklogic.client.io.DocumentMetadataHandle;
    +import com.marklogic.client.io.Format;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +
    +/**
    + * The TriggerWhenEmpty annotation is used so that this processor has a chance to flush the WriteBatcher when no
    + * flowfiles are ready to be received.
    + */
    +@Tags({"MarkLogic"})
    --- End diff --
    
    Should add a few more descriptive tags to help people find it.


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r186527597
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/pom.xml ---
    @@ -0,0 +1,88 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-marklogic-bundle</artifactId>
    +        <version>1.7.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-marklogic-processors</artifactId>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-api</artifactId>
    +            <version>${nifi.version}</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-utils</artifactId>
    +            <version>${nifi.version}</version>
    --- End diff --
    
    i strongly recommend avoiding use of ${} notation for version of components being built within this reactor/multi-module build of nifi.  It can create problems during the release process and our release plugins take care of changing the strings for us.  Please use '1.7.0-SNAPSHOT' instead.


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    Ok i've attached a patch which helps with some aspects of POM construction, flagging things like resource utilization since it appears to be loading full content into memory, and renaming the service to indicate it is a MarkLogic service rather than just a database service.  There is an outstanding need to sort out the security configuration.  For SSLContext stuff those things should utilize the standard mechanism of obtaining that as you can follow from a number of other processors.  Also, there is a kerberos context for security setting but there does not appear to be any associated settings for the user.  The security configurations should be removed in favor of simple/digest for now OR completed and with some consistency to other items.  For security relevant things CVEs become a concern so we take these more seriously.  For things about the performance/logic of the processor interaction with MarkLogic that we can improve over time if needed but security we want to get right 
 up front.  The other thing that needs to happen is the nar bundles need their LICENSE/NOTICE file(s) added if necessary.  I looked at one of the nars and there would definitely need to be entries.  Please try adding these in like other nars and I'm happy to help tweak it to get it to the finish line.
    
    If you have questions on how to achieve any of the above please ask.  Show an example nar you looked at which is similar so that we can best help close remaining gaps but from a place of good examples that you've looked at.
    
    Thanks


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r191612034
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/LICENSE.txt ---
    @@ -0,0 +1,72 @@
    +Apache License 
    --- End diff --
    
    Also unless you're altering the license by appending items to it as would be necessary was there any copied source from somewhere and that source was from the ASF category A you dont need to include it as it will be pulled in automatically.


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r185742208
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/PutMarkLogic.java ---
    @@ -0,0 +1,382 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.datamovement.DataMovementManager;
    +import com.marklogic.client.datamovement.WriteBatcher;
    +import com.marklogic.client.datamovement.WriteEvent;
    +import com.marklogic.client.datamovement.impl.WriteEventImpl;
    +import com.marklogic.client.document.ServerTransform;
    +import com.marklogic.client.io.BytesHandle;
    +import com.marklogic.client.io.DocumentMetadataHandle;
    +import com.marklogic.client.io.Format;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +
    +/**
    + * The TriggerWhenEmpty annotation is used so that this processor has a chance to flush the WriteBatcher when no
    + * flowfiles are ready to be received.
    + */
    +@Tags({"MarkLogic"})
    +@CapabilityDescription("Write batches of FlowFiles as documents to a MarkLogic server using the " +
    +    "MarkLogic Data Movement SDK (DMSDK)")
    +@TriggerWhenEmpty
    +public class PutMarkLogic extends AbstractMarkLogicProcessor {
    +
    +    class FlowFileInfo {
    +        FlowFile flowFile;
    +        ProcessSession session;
    +        FlowFileInfo(FlowFile flowFile, ProcessSession session) {
    +            this.flowFile = flowFile;
    +            this.session = session;
    +        }
    +    }
    +    private Map<String, FlowFileInfo> URIFlowFileMap = new HashMap<>();
    +    public static final PropertyDescriptor COLLECTIONS = new PropertyDescriptor.Builder()
    +        .name("Collections")
    +        .displayName("Collections")
    +        .description("Comma-delimited sequence of collections to add to each document")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
    +        .name("Format")
    +        .displayName("Format")
    +        .description("Format for each document; if not specified, MarkLogic will determine the format" +
    +            " based on the URI")
    +        .allowableValues(Format.JSON.name(), Format.XML.name(), Format.TEXT.name(), Format.BINARY.name(), Format.UNKNOWN.name())
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor JOB_ID = new PropertyDescriptor.Builder()
    +        .name("Job ID")
    +        .displayName("Job ID")
    +        .description("ID for the WriteBatcher job")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder()
    +        .name("Job Name")
    +        .displayName("Job Name")
    +        .description("Name for the WriteBatcher job")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor MIMETYPE = new PropertyDescriptor.Builder()
    +        .name("MIME type")
    +        .displayName("MIME type")
    +        .description("MIME type for each document; if not specified, MarkLogic will determine the " +
    +            "MIME type based on the URI")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor PERMISSIONS = new PropertyDescriptor.Builder()
    +        .name("Permissions")
    +        .displayName("Permissions")
    +        .defaultValue("rest-reader,read,rest-writer,update")
    +        .description("Comma-delimited sequence of permissions - role1, capability1, role2, " +
    +            "capability2 - to add to each document")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TEMPORAL_COLLECTION = new PropertyDescriptor.Builder()
    +        .name("Temporal collection")
    +        .displayName("Temporal collection")
    +        .description("The temporal collection to use for a temporal document insert")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TRANSFORM = new PropertyDescriptor.Builder()
    +        .name("Server transform")
    +        .displayName("Server transform")
    +        .description("(Optional) The name of REST server transform to apply to every document as it's" +
    +            " written")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
    +        .name("URI attribute name")
    +        .displayName("URI attribute name")
    +        .defaultValue("uuid")
    +        .required(true)
    +        .description("The name of the FlowFile attribute whose value will be used as the URI")
    +        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_PREFIX = new PropertyDescriptor.Builder()
    +        .name("URI prefix")
    +        .displayName("URI prefix")
    +        .description("(Optional) The prefix to prepend to each URI")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_SUFFIX = new PropertyDescriptor.Builder()
    +        .name("URI suffix")
    +        .displayName("URI suffix")
    +        .description("(Optional) The suffix to append to each URI")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    protected static final Relationship SUCCESS = new Relationship.Builder()
    --- End diff --
    
    Nit: both of these descriptions should have a `.` at the end of them.


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r185743318
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/PutMarkLogic.java ---
    @@ -0,0 +1,382 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.datamovement.DataMovementManager;
    +import com.marklogic.client.datamovement.WriteBatcher;
    +import com.marklogic.client.datamovement.WriteEvent;
    +import com.marklogic.client.datamovement.impl.WriteEventImpl;
    +import com.marklogic.client.document.ServerTransform;
    +import com.marklogic.client.io.BytesHandle;
    +import com.marklogic.client.io.DocumentMetadataHandle;
    +import com.marklogic.client.io.Format;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +
    +/**
    + * The TriggerWhenEmpty annotation is used so that this processor has a chance to flush the WriteBatcher when no
    + * flowfiles are ready to be received.
    + */
    +@Tags({"MarkLogic"})
    +@CapabilityDescription("Write batches of FlowFiles as documents to a MarkLogic server using the " +
    +    "MarkLogic Data Movement SDK (DMSDK)")
    +@TriggerWhenEmpty
    +public class PutMarkLogic extends AbstractMarkLogicProcessor {
    +
    +    class FlowFileInfo {
    +        FlowFile flowFile;
    +        ProcessSession session;
    +        FlowFileInfo(FlowFile flowFile, ProcessSession session) {
    +            this.flowFile = flowFile;
    +            this.session = session;
    +        }
    +    }
    +    private Map<String, FlowFileInfo> URIFlowFileMap = new HashMap<>();
    --- End diff --
    
    This variable name violates standard Java conventions.


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r191611907
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/LICENSE.txt ---
    @@ -0,0 +1,72 @@
    +Apache License 
    --- End diff --
    
    This license file should be located in the NAR under meta-inf just like the following example.  Further, it should contain the same text/same format of text for the ALv2 license.
    
    https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/LICENSE
    



---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by vivekmuniyandi <gi...@git.apache.org>.
Github user vivekmuniyandi commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    Thanks @MikeThomsen for the comment. `PutMarkLogicRecord` is definitely on our roadmap but I am not sure when we will be able to get to it. We have an internal sprint for NiFi. We will add this to our backlog, check with PM and address this with priority. 
    
    We don't want to keep this PR waiting for that processor. I would raise a separate PR in the future for that. Thanks!


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by vivekmuniyandi <gi...@git.apache.org>.
Github user vivekmuniyandi commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r185979858
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/QueryMarkLogic.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.datamovement.ExportListener;
    +import com.marklogic.client.ext.datamovement.job.SimpleQueryBatcherJob;
    +import com.marklogic.client.io.BytesHandle;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +
    +@Tags({"MarkLogic"})
    +@CapabilityDescription("Creates FlowFiles from batches of documents, matching the given criteria," +
    +    " retrieved from a MarkLogic server using the MarkLogic Data Movement SDK (DMSDK)")
    +public class QueryMarkLogic extends AbstractMarkLogicProcessor {
    --- End diff --
    
    We have added this as a task in our sprint and accommodate in the coming weeks. 


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r186527567
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-nar/pom.xml ---
    @@ -0,0 +1,56 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-marklogic-bundle</artifactId>
    +        <version>1.7.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-marklogic-nar</artifactId>
    +
    +    <packaging>nar</packaging>
    +
    +    <properties>
    +        <maven.javadoc.skip>true</maven.javadoc.skip>
    +        <source.skip>true</source.skip>
    +    </properties>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-marklogic-services-api-nar</artifactId>
    +            <version>${project.parent.version}</version>
    --- End diff --
    
    i strongly recommend avoiding use of ${} notation for version of components being built within this reactor/multi-module build of nifi.  It can create problems during the release process and our release plugins take care of changing the strings for us.  Please use '1.7.0-SNAPSHOT' instead.


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r191615567
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/NOTICE.txt ---
    @@ -0,0 +1,2106 @@
    +MarkLogic NiFi Processors
    +
    +
    +Copyright � 2018 MarkLogic Corporation.
    +
    +This project is licensed under the Apache License, Version 2.0 (the "License"); you may not use this project except in compliance with the License. You may obtain a copy of the License at
    +
    +http://www.apache.org/licenses/LICENSE-2.0 
    +
    +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
    +
    +
    +Please direct questions, comments and requests to fossreview@marklogic.com. 
    +
    +Open source software required to be made available under license is included herein. In the event you are unable to obtain a copy of such open source software, please contact fossreview@marklogic.com and a copy will be made available to you.
    +
    +
    +The following software may be included in this project (last updated May 1, 2018):
    +
    +Apache Commons Codec� 1.7 
    +Attribution Statements
    +http://commons.apache.org/codec/
    +
    +src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
    +contains test data from http://aspell.net/test/orig/batch0.tab.
    +Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
    +
    +The content of package org.apache.commons.codec.language.bm has been translated
    +from the original php source code available at http://stevemorse.org/phoneticinfo.htm
    +with permission from the original authors.
    +Original source copyright:
    +Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
    +
    +Copyright Statements
    +Copyright 2002-2016 The Apache Software Foundation
    +
    +License Text (http://spdx.org/licenses/Apache-2.0)
    +Made available under the Apache License 2.0. See Appendix for full text.
    +
    +Source materials are available for download at: http://commons.apache.org/proper/commons-codec/source-repository.html 
    +
    +
    +Apache Commons Lang� 3.4 
    +Attribution Statements
    +http://commons.apache.org/proper/commons-lang/
    +
    +This product includes software from the Spring Framework,
    +under the Apache License 2.0 (see: StringUtils.containsWhitespace())
    +
    +Copyright Statements
    +Copyright 2001-2017 The Apache Software Foundation
    +
    +License Text (http://spdx.org/licenses/Apache-2.0)
    +Made available under the Apache License 2.0. See Appendix for full text.
    +
    +Source materials are available for download at: https://github.com/apache/commons-lang 
    +
    +
    +Apache Commons Logging� 1.1.1 
    +Attribution Statements
    +http://commons.apache.org/logging
    +
    +Copyright Statements
    +Copyright 2003-2016 The Apache Software Foundation
    +
    +License Text (http://spdx.org/licenses/Apache-2.0)
    +Made available under the Apache License 2.0. See Appendix for full text.
    +
    +Source materials are available for download at: https://github.com/apache/commons-logging 
    +
    +
    +Apache Derby 10.13.1.1 
    +Attribution Statements
    +https://db.apache.org/derby/releases/release-10.13.1.1.cgi
    +
    +Portions of Derby were originally developed by
    +International Business Machines Corporation and are
    +licensed to the Apache Software Foundation under the
    +"Software Grant and Corporate Contribution License Agreement",
    +informally known as the "Derby CLA".
    +The following copyright notice(s) were affixed to portions of the code
    +with which this file is now or was at one time distributed
    +and are placed here unaltered.
    +
    +(C) Copyright 1997,2004 International Business Machines Corporation.  All rights reserved.
    +
    +(C) Copyright IBM Corp. 2003. 
    +
    +
    +=========================================================================
    +
    +
    +The portion of the functionTests under 'nist' was originally 
    +developed by the National Institute of Standards and Technology (NIST), 
    +an agency of the United States Department of Commerce, and adapted by
    +International Business Machines Corporation in accordance with the NIST
    +Software Acknowledgment and Redistribution document at
    +http://www.itl.nist.gov/div897/ctg/sql_form.htm
    +
    +
    +
    +=========================================================================
    +
    +
    +Derby uses the  SerialBlob and SerialClob implementations from the Apache
    +Harmony project. The following notice covers the Harmony sources:
    +
    +Portions of Harmony were originally developed by
    +Intel Corporation and are licensed to the Apache Software
    +Foundation under the "Software Grant and Corporate Contribution
    +License Agreement", informally known as the "Intel Harmony CLA".
    +
    +
    +=========================================================================
    +
    +
    +The Derby build relies on source files supplied by the Apache Felix
    +project. The following notice covers the Felix files:
    +
    +  Apache Felix Main
    +  Copyright 2008 The Apache Software Foundation
    +
    +
    +  I. Included Software
    +
    +  This product includes software developed at
    +  The Apache Software Foundation (http://www.apache.org/).
    +  Licensed under the Apache License 2.0.
    +
    +  This product includes software developed at
    +  The OSGi Alliance (http://www.osgi.org/).
    +  Copyright (c) OSGi Alliance (2000, 2007).
    +  Licensed under the Apache License 2.0.
    +
    +  This product includes software from http://kxml.sourceforge.net.
    +  Copyright (c) 2002,2003, Stefan Haustein, Oberhausen, Rhld., Germany.
    +  Licensed under BSD License.
    +
    +  II. Used Software
    +
    +  This product uses software developed at
    +  The OSGi Alliance (http://www.osgi.org/).
    +  Copyright (c) OSGi Alliance (2000, 2007).
    +  Licensed under the Apache License 2.0.
    +
    +
    +  III. License Summary
    +  - Apache License 2.0
    +  - BSD License
    +
    +
    +=========================================================================
    +
    +
    +The Derby build relies on jar files supplied by the Apache Lucene
    +project. The following notice covers the Lucene files:
    +
    +Apache Lucene
    --- End diff --
    
    i dont think lucene is bundled.


---

[GitHub] nifi issue #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by vivekmuniyandi <gi...@git.apache.org>.
Github user vivekmuniyandi commented on the issue:

    https://github.com/apache/nifi/pull/2671
  
    @joewitt I have addressed all your comments except for the License and Notice comments. Can you please let us know what more should we add apart from the LICENSE and NOTICE file prepared by our legal team which we have included in the root directory of the nar? That constitutes for all the dependecies added. What more should be added? Please help. Thanks.


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r191614275
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/NOTICE.txt ---
    @@ -0,0 +1,2106 @@
    +MarkLogic NiFi Processors
    +
    +
    +Copyright � 2018 MarkLogic Corporation.
    +
    +This project is licensed under the Apache License, Version 2.0 (the "License"); you may not use this project except in compliance with the License. You may obtain a copy of the License at
    +
    +http://www.apache.org/licenses/LICENSE-2.0 
    +
    +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
    +
    +
    +Please direct questions, comments and requests to fossreview@marklogic.com. 
    +
    +Open source software required to be made available under license is included herein. In the event you are unable to obtain a copy of such open source software, please contact fossreview@marklogic.com and a copy will be made available to you.
    +
    +
    +The following software may be included in this project (last updated May 1, 2018):
    +
    +Apache Commons Codec� 1.7 
    --- End diff --
    
    Please have this follow formatting similar to what is shown in this example
    
    https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE#L30



---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r191618134
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/PutMarkLogic.java ---
    @@ -0,0 +1,399 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.datamovement.DataMovementManager;
    +import com.marklogic.client.datamovement.WriteBatcher;
    +import com.marklogic.client.datamovement.WriteEvent;
    +import com.marklogic.client.datamovement.impl.WriteEventImpl;
    +import com.marklogic.client.document.ServerTransform;
    +import com.marklogic.client.io.BytesHandle;
    +import com.marklogic.client.io.DocumentMetadataHandle;
    +import com.marklogic.client.io.Format;
    +import org.apache.nifi.annotation.behavior.SystemResource;
    +import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +
    +/**
    + * The TriggerWhenEmpty annotation is used so that this processor has a chance to flush the WriteBatcher when no
    + * flowfiles are ready to be received.
    + */
    +@Tags({"MarkLogic", "Put", "Write", "Insert"})
    +@CapabilityDescription("Write batches of FlowFiles as documents to a MarkLogic server using the " +
    +    "MarkLogic Data Movement SDK (DMSDK)")
    +@SystemResourceConsideration(resource = SystemResource.MEMORY)
    +@TriggerWhenEmpty
    +public class PutMarkLogic extends AbstractMarkLogicProcessor {
    +
    +    class FlowFileInfo {
    +        FlowFile flowFile;
    +        ProcessSession session;
    +        FlowFileInfo(FlowFile flowFile, ProcessSession session) {
    +            this.flowFile = flowFile;
    +            this.session = session;
    +        }
    +    }
    +    private Map<String, FlowFileInfo> uriFlowFileMap = new HashMap<>();
    +    public static final PropertyDescriptor COLLECTIONS = new PropertyDescriptor.Builder()
    +        .name("Collections")
    +        .displayName("Collections")
    +        .description("Comma-delimited sequence of collections to add to each document")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
    +        .name("Format")
    +        .displayName("Format")
    +        .description("Format for each document; if not specified, MarkLogic will determine the format" +
    +            " based on the URI")
    +        .allowableValues(Format.JSON.name(), Format.XML.name(), Format.TEXT.name(), Format.BINARY.name(), Format.UNKNOWN.name())
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    public static final PropertyDescriptor JOB_ID = new PropertyDescriptor.Builder()
    +        .name("Job ID")
    +        .displayName("Job ID")
    +        .description("ID for the WriteBatcher job")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    public static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder()
    +        .name("Job Name")
    +        .displayName("Job Name")
    +        .description("Name for the WriteBatcher job")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    public static final PropertyDescriptor MIMETYPE = new PropertyDescriptor.Builder()
    +        .name("MIME type")
    +        .displayName("MIME type")
    +        .description("MIME type for each document; if not specified, MarkLogic will determine the " +
    +            "MIME type based on the URI")
    +        .addValidator(Validator.VALID)
    +        .required(false)
    +        .build();
    +
    +    public static final PropertyDescriptor PERMISSIONS = new PropertyDescriptor.Builder()
    +        .name("Permissions")
    +        .displayName("Permissions")
    +        .defaultValue("rest-reader,read,rest-writer,update")
    +        .description("Comma-delimited sequence of permissions - role1, capability1, role2, " +
    +            "capability2 - to add to each document")
    +        .addValidator(Validator.VALID)
    +        .required(false)
    +        .build();
    +
    +    public static final PropertyDescriptor TEMPORAL_COLLECTION = new PropertyDescriptor.Builder()
    +        .name("Temporal collection")
    +        .displayName("Temporal collection")
    +        .description("The temporal collection to use for a temporal document insert")
    +        .addValidator(Validator.VALID)
    +        .required(false)
    +        .build();
    +
    +    public static final PropertyDescriptor TRANSFORM = new PropertyDescriptor.Builder()
    +        .name("Server transform")
    +        .displayName("Server transform")
    +        .description("The name of REST server transform to apply to every document as it's" +
    +            " written")
    +        .addValidator(Validator.VALID)
    +        .required(false)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
    +        .name("URI attribute name")
    +        .displayName("URI attribute name")
    +        .defaultValue("uuid")
    +        .required(true)
    +        .description("The name of the FlowFile attribute whose value will be used as the URI")
    +        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_PREFIX = new PropertyDescriptor.Builder()
    +        .name("URI prefix")
    +        .displayName("URI prefix")
    +        .description("The prefix to prepend to each URI")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_SUFFIX = new PropertyDescriptor.Builder()
    +        .name("URI suffix")
    +        .displayName("URI suffix")
    +        .description("The suffix to append to each URI")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    protected static final Relationship SUCCESS = new Relationship.Builder()
    +        .name("SUCCESS")
    +        .description("All FlowFiles that are successfully written to MarkLogic are routed to the " +
    +            "success relationship for future processing.")
    +        .build();
    +
    +    protected static final Relationship FAILURE = new Relationship.Builder()
    +        .name("FAILURE")
    +        .description("All FlowFiles that failed to be written to MarkLogic are routed to the " +
    +            "failure relationship for future processing.")
    +        .build();
    +
    +    private volatile DataMovementManager dataMovementManager;
    +    private volatile WriteBatcher writeBatcher;
    +    // If no FlowFile exists when this processor is triggered, this variable determines whether or not a call is made to
    +    // flush the WriteBatcher
    +    private volatile boolean shouldFlushIfEmpty = true;
    +
    +    @Override
    +    public void init(ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        List<PropertyDescriptor> list = new ArrayList<>();
    +        list.addAll(properties);
    +        list.add(COLLECTIONS);
    +        list.add(FORMAT);
    +        list.add(JOB_ID);
    +        list.add(JOB_NAME);
    +        list.add(MIMETYPE);
    +        list.add(PERMISSIONS);
    +        list.add(TRANSFORM);
    +        list.add(TEMPORAL_COLLECTION);
    +        list.add(URI_ATTRIBUTE_NAME);
    +        list.add(URI_PREFIX);
    +        list.add(URI_SUFFIX);
    +        properties = Collections.unmodifiableList(list);
    +        Set<Relationship> set = new HashSet<>();
    +        set.add(SUCCESS);
    +        set.add(FAILURE);
    +        relationships = Collections.unmodifiableSet(set);
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        dataMovementManager = getDatabaseClient(context).newDataMovementManager();
    +        writeBatcher = dataMovementManager.newWriteBatcher()
    +            .withJobId(context.getProperty(JOB_ID).getValue())
    +            .withJobName(context.getProperty(JOB_NAME).getValue())
    +            .withBatchSize(context.getProperty(BATCH_SIZE).asInteger())
    +            .withTemporalCollection(context.getProperty(TEMPORAL_COLLECTION).getValue());
    +
    +        final String transform = context.getProperty(TRANSFORM).getValue();
    +        if (transform != null) {
    +            writeBatcher.withTransform(new ServerTransform(transform));
    +        }
    +        Integer threadCount = context.getProperty(THREAD_COUNT).asInteger();
    +        if(threadCount != null) {
    +            writeBatcher.withThreadCount(threadCount);
    +        }
    +        this.writeBatcher.onBatchSuccess(writeBatch -> {
    +            for(WriteEvent writeEvent : writeBatch.getItems()) {
    +                routeDocumentToRelationship(writeEvent, SUCCESS);
    +            }
    +        }).onBatchFailure((writeBatch, throwable) -> {
    +            for(WriteEvent writeEvent : writeBatch.getItems()) {
    +                routeDocumentToRelationship(writeEvent, FAILURE);
    +            }
    +        });
    +        dataMovementManager.startJob(writeBatcher);
    +    }
    +
    +    private void routeDocumentToRelationship(WriteEvent writeEvent, Relationship relationship) {
    +        DocumentMetadataHandle metadata = (DocumentMetadataHandle) writeEvent.getMetadata();
    +        String flowFileUUID = metadata.getMetadataValues().get("flowFileUUID");
    +        FlowFileInfo flowFile = uriFlowFileMap.get(flowFileUUID);
    +        if(flowFile != null) {
    +            flowFile.session.getProvenanceReporter().send(flowFile.flowFile, writeEvent.getTargetUri());
    +            flowFile.session.transfer(flowFile.flowFile, relationship);
    +            flowFile.session.commit();
    +            if (getLogger().isDebugEnabled()) {
    +                getLogger().debug("Routing " + writeEvent.getTargetUri() + " to " + relationship.getName());
    +            }
    +        }
    +        uriFlowFileMap.remove(flowFileUUID);
    +    }
    +
    +    @Override
    +    public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
    +        final ProcessSession session = sessionFactory.createSession();
    +        try {
    +            onTrigger(context, session);
    +        } catch (final Throwable t) {
    +            getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
    +            session.rollback(true);
    +            throw new ProcessException(t);
    +        }
    +    }
    +    /**
    +     * When a FlowFile is received, hand it off to the WriteBatcher so it can be written to MarkLogic.
    +     * <p>
    +     * If a FlowFile is not set (possible because of the TriggerWhenEmpty annotation), then yield is called on the
    +     * ProcessContext so that Nifi doesn't invoke this method repeatedly when nothing is available. Then, a check is
    +     * made to determine if flushAsync should be called on the WriteBatcher. This ensures that any batch of documents
    +     * that is smaller than the WriteBatcher's batch size will be flushed immediately and not have to wait for more
    +     * FlowFiles to arrive to fill out the batch.
    +     *
    +     */
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            context.yield();
    +            if (shouldFlushIfEmpty) {
    +                flushWriteBatcherAsync(this.writeBatcher);
    +            }
    +            shouldFlushIfEmpty = false;
    +        } else {
    +            shouldFlushIfEmpty = true;
    +
    +            WriteEvent writeEvent = buildWriteEvent(context, session, flowFile);
    +            if (getLogger().isDebugEnabled()) {
    +                getLogger().debug("Writing URI: " + writeEvent.getTargetUri());
    +            }
    +            addWriteEvent(this.writeBatcher, writeEvent);
    +        }
    +    }
    +
    +    /*
    +     * Protected so that it can be overridden for unit testing purposes.
    +     */
    +    protected void flushWriteBatcherAsync(WriteBatcher writeBatcher) {
    +        writeBatcher.flushAsync();
    +    }
    +
    +    /*
    +     * Protected so that it can be overridden for unit testing purposes.
    +     */
    +    protected void addWriteEvent(WriteBatcher writeBatcher, WriteEvent writeEvent) {
    +        writeBatcher.add(writeEvent);
    +    }
    +
    +    protected WriteEvent buildWriteEvent(ProcessContext context, ProcessSession session, FlowFile flowFile) {
    +        String uri = flowFile.getAttribute(context.getProperty(URI_ATTRIBUTE_NAME).getValue());
    +        final String prefix = context.getProperty(URI_PREFIX).getValue();
    +        if (prefix != null) {
    +            uri = prefix + uri;
    +        }
    +        final String suffix = context.getProperty(URI_SUFFIX).getValue();
    +        if (suffix != null) {
    +            uri += suffix;
    +        }
    +
    +        DocumentMetadataHandle metadata = new DocumentMetadataHandle();
    +        final String collections = context.getProperty(COLLECTIONS).getValue();
    +        if (collections != null) {
    +            metadata.withCollections(collections.split(","));
    +        }
    +        final String permissions = context.getProperty(PERMISSIONS).getValue();
    +        if (permissions != null) {
    +            String[] tokens = permissions.split(",");
    +            for (int i = 0; i < tokens.length; i += 2) {
    +                String role = tokens[i];
    +                String capability = tokens[i + 1];
    +                metadata.withPermission(role, DocumentMetadataHandle.Capability.getValueOf(capability));
    +            }
    +        }
    +        // Add the flow file UUID for Provenance purposes and for sending them
    +        // to the appropriate relationship
    +        String flowFileUUID = flowFile.getAttribute(CoreAttributes.UUID.key());
    +        metadata.withMetadataValue("flowFileUUID", flowFileUUID);
    +        // TODO Haven't had luck with wrapping the FlowFile's inputStream with an ML InputStreamHandle, so copying everything into a byte array
    --- End diff --
    
    recommend removal of all commented code/sections.  Could file a follow on JIRA for such actions if desired


---

[GitHub] nifi pull request #2671: NiFi-5102 - Adding Processors for MarkLogic DB

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r185742105
  
    --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/PutMarkLogic.java ---
    @@ -0,0 +1,382 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.datamovement.DataMovementManager;
    +import com.marklogic.client.datamovement.WriteBatcher;
    +import com.marklogic.client.datamovement.WriteEvent;
    +import com.marklogic.client.datamovement.impl.WriteEventImpl;
    +import com.marklogic.client.document.ServerTransform;
    +import com.marklogic.client.io.BytesHandle;
    +import com.marklogic.client.io.DocumentMetadataHandle;
    +import com.marklogic.client.io.Format;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +
    +/**
    + * The TriggerWhenEmpty annotation is used so that this processor has a chance to flush the WriteBatcher when no
    + * flowfiles are ready to be received.
    + */
    +@Tags({"MarkLogic"})
    +@CapabilityDescription("Write batches of FlowFiles as documents to a MarkLogic server using the " +
    +    "MarkLogic Data Movement SDK (DMSDK)")
    +@TriggerWhenEmpty
    +public class PutMarkLogic extends AbstractMarkLogicProcessor {
    +
    +    class FlowFileInfo {
    +        FlowFile flowFile;
    +        ProcessSession session;
    +        FlowFileInfo(FlowFile flowFile, ProcessSession session) {
    +            this.flowFile = flowFile;
    +            this.session = session;
    +        }
    +    }
    +    private Map<String, FlowFileInfo> URIFlowFileMap = new HashMap<>();
    +    public static final PropertyDescriptor COLLECTIONS = new PropertyDescriptor.Builder()
    +        .name("Collections")
    +        .displayName("Collections")
    +        .description("Comma-delimited sequence of collections to add to each document")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
    +        .name("Format")
    +        .displayName("Format")
    +        .description("Format for each document; if not specified, MarkLogic will determine the format" +
    +            " based on the URI")
    +        .allowableValues(Format.JSON.name(), Format.XML.name(), Format.TEXT.name(), Format.BINARY.name(), Format.UNKNOWN.name())
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor JOB_ID = new PropertyDescriptor.Builder()
    +        .name("Job ID")
    +        .displayName("Job ID")
    +        .description("ID for the WriteBatcher job")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder()
    +        .name("Job Name")
    +        .displayName("Job Name")
    +        .description("Name for the WriteBatcher job")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor MIMETYPE = new PropertyDescriptor.Builder()
    +        .name("MIME type")
    +        .displayName("MIME type")
    +        .description("MIME type for each document; if not specified, MarkLogic will determine the " +
    +            "MIME type based on the URI")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor PERMISSIONS = new PropertyDescriptor.Builder()
    +        .name("Permissions")
    +        .displayName("Permissions")
    +        .defaultValue("rest-reader,read,rest-writer,update")
    +        .description("Comma-delimited sequence of permissions - role1, capability1, role2, " +
    +            "capability2 - to add to each document")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TEMPORAL_COLLECTION = new PropertyDescriptor.Builder()
    +        .name("Temporal collection")
    +        .displayName("Temporal collection")
    +        .description("The temporal collection to use for a temporal document insert")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TRANSFORM = new PropertyDescriptor.Builder()
    +        .name("Server transform")
    +        .displayName("Server transform")
    +        .description("(Optional) The name of REST server transform to apply to every document as it's" +
    +            " written")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
    +        .name("URI attribute name")
    +        .displayName("URI attribute name")
    +        .defaultValue("uuid")
    +        .required(true)
    +        .description("The name of the FlowFile attribute whose value will be used as the URI")
    +        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_PREFIX = new PropertyDescriptor.Builder()
    +        .name("URI prefix")
    +        .displayName("URI prefix")
    +        .description("(Optional) The prefix to prepend to each URI")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_SUFFIX = new PropertyDescriptor.Builder()
    +        .name("URI suffix")
    +        .displayName("URI suffix")
    +        .description("(Optional) The suffix to append to each URI")
    --- End diff --
    
    Same here.


---