You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by pvillard31 <gi...@git.apache.org> on 2016/04/25 12:00:35 UTC

[GitHub] nifi pull request: NIFI-1022 Added Tachyon/Alluxio processors

GitHub user pvillard31 opened a pull request:

    https://github.com/apache/nifi/pull/379

    NIFI-1022 Added Tachyon/Alluxio processors

    - ListAlluxio processor
    - PutAlluxio processor
    - GetAlluxio processor

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pvillard31/nifi NIFI-1022

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/379.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #379
    
----
commit de67993bd16ee2b7504978cf3ebf16540c0f18b0
Author: Pierre Villard <pi...@gmail.com>
Date:   2016-04-25T09:58:52Z

    NIFI-1022 Added Tachyon/Alluxio processors
    
    - ListAlluxio processor
    - PutAlluxio processor
    - GetAlluxio processor

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    Hey @apiri, yes Alluxio guys informed me about this, I'll try to update the PR asap. Will let you know when it's done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 closed the pull request at:

    https://github.com/apache/nifi/pull/379


---

[GitHub] nifi pull request #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/379#discussion_r67958464
  
    --- Diff: nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/AbstractAlluxioProcessor.java ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.alluxio;
    +
    +import alluxio.client.file.FileSystem;
    +import alluxio.client.file.URIStatus;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Modifier;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * AbstractCassandraProcessor is a base class for Alluxio processors and contains logic and variables common to most
    + * processors integrating with Alluxio.
    + */
    +public abstract class AbstractAlluxioProcessor extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor MASTER_HOSTNAME = new PropertyDescriptor.Builder()
    +            .name("alluxio-master-ip")
    +            .displayName("Master hostname")
    +            .description("Hostname of the Alluxio File System Master node.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MASTER_PORT = new PropertyDescriptor.Builder()
    +            .name("alluxio-master-port")
    +            .displayName("Master port")
    +            .description("Port to use when connecting to the Alluxio File System Master node.")
    +            .required(true)
    +            .defaultValue("19998")
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
    +            .name("alluxio-uri")
    +            .displayName("URI")
    +            .description("Alluxio URI to use. Example: /path")
    --- End diff --
    
    Oh, good to know and that is super helpful.  Apologies for overlooking it as I sometimes neglect JIRA while in GitHub land.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/379#discussion_r67956841
  
    --- Diff: nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/AbstractAlluxioProcessor.java ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.alluxio;
    +
    +import alluxio.client.file.FileSystem;
    +import alluxio.client.file.URIStatus;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Modifier;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * AbstractCassandraProcessor is a base class for Alluxio processors and contains logic and variables common to most
    + * processors integrating with Alluxio.
    + */
    +public abstract class AbstractAlluxioProcessor extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor MASTER_HOSTNAME = new PropertyDescriptor.Builder()
    +            .name("alluxio-master-ip")
    +            .displayName("Master hostname")
    +            .description("Hostname of the Alluxio File System Master node.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MASTER_PORT = new PropertyDescriptor.Builder()
    +            .name("alluxio-master-port")
    +            .displayName("Master port")
    +            .description("Port to use when connecting to the Alluxio File System Master node.")
    +            .required(true)
    +            .defaultValue("19998")
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
    +            .name("alluxio-uri")
    +            .displayName("URI")
    +            .description("Alluxio URI to use. Example: /path")
    --- End diff --
    
    I assumed the user would use expression language to define the file name with Get/Put. Something like /${filename} for Put, and /${alluxio_name} for Get. But if this is unclear, I certainly need to update descriptions. FYI, in the JIRA, I added one of the templates I used when testing the processors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    Hey guys,
    
    I've done some rework on this PR. After some discussions with Alluxio folks, it seems that there are some limitations at the moment (v1.3.0) and I've made some modifications to take into account those limitations.
    
    ### System properties
    At the moment, it seems it is not possible to initialize the Alluxio client inside NiFi without using system properties. And in any case, multi Alluxio clusters handling is not supported at the moment.
    
    ### Initialization
    Once the client is initialized, it cannot be unchanged unless NiFi is restarted. Even if provided IP/port are incorrect, the client initialization will not do any check. To avoid mistakes, I've added a simple check to ensure that something is listening at IP/port before initializing the client.
    *Improvement on Alluxio's side is tracked here: https://alluxio.atlassian.net/browse/ALLUXIO-2120*
    
    I've updated processors description to note it as **experimental** and let the users know what are the current limitations. Hopefully, this won't be a show stopper to get this first version into NiFi and get feedbacks from the community.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/379#discussion_r67610606
  
    --- Diff: nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/GetAlluxio.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.alluxio;
    +
    +import alluxio.AlluxioURI;
    +import alluxio.client.ReadType;
    +import alluxio.client.file.FileInStream;
    +import alluxio.client.file.URIStatus;
    +import alluxio.client.file.options.OpenFileOptions;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.util.StopWatch;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"alluxio", "tachyon", "get", "file"})
    +@EventDriven
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@CapabilityDescription("This processor will access the file using the input URI provided and write the content of "
    +        + "the remote file to the content of the incoming FlowFile.")
    +public class GetAlluxio extends AbstractAlluxioProcessor {
    +
    +    public static final PropertyDescriptor READ_TYPE = new PropertyDescriptor.Builder()
    +            .name("alluxio-read-type")
    +            .displayName("Read type")
    +            .description("The Read Type to use when accessing the remote file")
    +            .defaultValue(ReadType.CACHE_PROMOTE.toString())
    +            .required(true)
    +            .allowableValues(ReadType.values())
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All files successfully retrieved are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("In case of failure, flow files will be routed to this relationship")
    +            .autoTerminateDefault(true)
    +            .build();
    +    public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
    +            .name("original")
    +            .description("In case of success, the original FlowFile will be routed to this relationship")
    +            .autoTerminateDefault(true)
    +            .build();
    +
    +    private final static Set<Relationship> relationships;
    +
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.addAll(descriptors);
    +        _propertyDescriptors.add(READ_TYPE);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_SUCCESS_REQ);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        createFileSystem(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile request = null;
    +        if (context.hasIncomingConnection()) {
    +            request = session.get();
    +
    +            // If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
    +            // However, if we have no FlowFile and we have connections coming from other Processors, then
    +            // we know that we should run only if we have a FlowFile.
    +            if (request == null && context.hasNonLoopConnection()) {
    +                return;
    +            }
    +        }
    +
    +        final StopWatch stopWatch = new StopWatch(true);
    +        final String uri = context.getProperty(URI).evaluateAttributeExpressions(request).getValue();
    +        final AlluxioURI path = new AlluxioURI(uri);
    +        final OpenFileOptions options = OpenFileOptions.defaults().setReadType(ReadType.valueOf(context.getProperty(READ_TYPE).getValue()));
    +
    +        FileInStream in = null;
    +        FlowFile flowFile = null;
    +
    +        if(request == null) {
    +            flowFile = session.create(request);
    +        } else {
    +            flowFile = session.create();
    +        }
    +
    +        try {
    +            final URIStatus status = fileSystem.get().getStatus(path);
    +            flowFile = updateFlowFile(status, flowFile, session);
    +
    +            in = fileSystem.get().openFile(path, options);
    +            final FileInStream toCopy = in;
    +
    +            flowFile = session.write(flowFile, new OutputStreamCallback() {
    +                @Override
    +                public void process(final OutputStream out) throws IOException {
    +                    IOUtils.copy(toCopy, out);
    --- End diff --
    
    It seems like the Alluxio client logger may be interfering with NiFi's logger and is causing bulletins to not work appropriately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    @jdye64 any chance you could have some cycles to review this as the originator of the JIRA?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    i dont know anything about the target systems so I probably won't be a great reviewer.  If nobody else is in a good position to help though I'll give it a whirl.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/379#discussion_r67956263
  
    --- Diff: nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/AbstractAlluxioProcessor.java ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.alluxio;
    +
    +import alluxio.client.file.FileSystem;
    +import alluxio.client.file.URIStatus;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Modifier;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * AbstractCassandraProcessor is a base class for Alluxio processors and contains logic and variables common to most
    --- End diff --
    
    good catch, done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    Hey @pvillard31.
    
    Gave an initial scan over this code and played around a bit with Alluxio.  I didn't have much success interacting with the processors but have left some notes on some of the items I was confused with whilst using.
    
    While I am thinking of it, could you get this rebased/updated to master?
    
    I'll try to give it a fresh look when I have some spare moments to scope out the Alluxio docs a bit more and try to figure out what is going on.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by trixpan <gi...@git.apache.org>.
Github user trixpan commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    @pvillard31 that connection strategy is agnostic to the underlying filesystem supporting alluxio. What it does is to allow a hadoop compatible application to access alluxio without further coding and let alluxio  deal with the underlying stirage strategies 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/379#discussion_r67610380
  
    --- Diff: nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/AbstractAlluxioProcessor.java ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.alluxio;
    +
    +import alluxio.client.file.FileSystem;
    +import alluxio.client.file.URIStatus;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Modifier;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * AbstractCassandraProcessor is a base class for Alluxio processors and contains logic and variables common to most
    + * processors integrating with Alluxio.
    + */
    +public abstract class AbstractAlluxioProcessor extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor MASTER_HOSTNAME = new PropertyDescriptor.Builder()
    +            .name("alluxio-master-ip")
    +            .displayName("Master hostname")
    +            .description("Hostname of the Alluxio File System Master node.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MASTER_PORT = new PropertyDescriptor.Builder()
    +            .name("alluxio-master-port")
    +            .displayName("Master port")
    +            .description("Port to use when connecting to the Alluxio File System Master node.")
    +            .required(true)
    +            .defaultValue("19998")
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
    +            .name("alluxio-uri")
    +            .displayName("URI")
    +            .description("Alluxio URI to use. Example: /path")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static List<PropertyDescriptor> descriptors = new ArrayList<>();
    +
    +    static {
    +        descriptors.add(MASTER_HOSTNAME);
    +        descriptors.add(MASTER_PORT);
    +        descriptors.add(URI);
    +    }
    +
    +    protected final AtomicReference<FileSystem> fileSystem = new AtomicReference<>(null);
    +
    +    protected void createFileSystem(ProcessContext context) {
    +        System.setProperty("alluxio.master.hostname", context.getProperty(MASTER_HOSTNAME).getValue());
    +        System.setProperty("alluxio.master.port", context.getProperty(MASTER_PORT).getValue());
    +        fileSystem.set(FileSystem.Factory.get());
    --- End diff --
    
    This doesn't seem to update regardless of changing a property.  I accidentally used the wrong port when testing and despite changing in the properties, stopping/starting or disabling/enabling, the backing Alluxio client seems stuck on the initial port.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/379#discussion_r67610514
  
    --- Diff: nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/AbstractAlluxioProcessor.java ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.alluxio;
    +
    +import alluxio.client.file.FileSystem;
    +import alluxio.client.file.URIStatus;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Modifier;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * AbstractCassandraProcessor is a base class for Alluxio processors and contains logic and variables common to most
    + * processors integrating with Alluxio.
    + */
    +public abstract class AbstractAlluxioProcessor extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor MASTER_HOSTNAME = new PropertyDescriptor.Builder()
    +            .name("alluxio-master-ip")
    +            .displayName("Master hostname")
    +            .description("Hostname of the Alluxio File System Master node.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MASTER_PORT = new PropertyDescriptor.Builder()
    +            .name("alluxio-master-port")
    +            .displayName("Master port")
    +            .description("Port to use when connecting to the Alluxio File System Master node.")
    +            .required(true)
    +            .defaultValue("19998")
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
    +            .name("alluxio-uri")
    +            .displayName("URI")
    +            .description("Alluxio URI to use. Example: /path")
    --- End diff --
    
    This is a little unclear to me in the context of a file.  I had specified a static path, but then received errors about the specified path already existing.  Accordingly, my assumption was then that this is path + file like an S3 bucket, however I didn't seem to have significantly better luck in that scenario either.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/379#discussion_r67609786
  
    --- Diff: nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/AbstractAlluxioProcessor.java ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.alluxio;
    +
    +import alluxio.client.file.FileSystem;
    +import alluxio.client.file.URIStatus;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Modifier;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * AbstractCassandraProcessor is a base class for Alluxio processors and contains logic and variables common to most
    + * processors integrating with Alluxio.
    + */
    +public abstract class AbstractAlluxioProcessor extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor MASTER_HOSTNAME = new PropertyDescriptor.Builder()
    +            .name("alluxio-master-ip")
    +            .displayName("Master hostname")
    +            .description("Hostname of the Alluxio File System Master node.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MASTER_PORT = new PropertyDescriptor.Builder()
    +            .name("alluxio-master-port")
    +            .displayName("Master port")
    +            .description("Port to use when connecting to the Alluxio File System Master node.")
    +            .required(true)
    +            .defaultValue("19998")
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
    +            .name("alluxio-uri")
    +            .displayName("URI")
    +            .description("Alluxio URI to use. Example: /path")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static List<PropertyDescriptor> descriptors = new ArrayList<>();
    +
    +    static {
    +        descriptors.add(MASTER_HOSTNAME);
    +        descriptors.add(MASTER_PORT);
    +        descriptors.add(URI);
    +    }
    +
    +    protected final AtomicReference<FileSystem> fileSystem = new AtomicReference<>(null);
    +
    +    protected void createFileSystem(ProcessContext context) {
    +        System.setProperty("alluxio.master.hostname", context.getProperty(MASTER_HOSTNAME).getValue());
    --- End diff --
    
    Is there no way around these system properties?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    Closing this one as I don't have time to get back into it at the moment. For anyone interested, it would at least be necessary to upgrade Alluxio version to the latest. (@HorizonNet - I see you're in Alluxio's PMC, feel free to chime in :))


---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    @apiri @joewitt 
    I updated the PR with the following changes:
    - Alluxio version changed to 1.4.0 to include the expected improvement (https://github.com/Alluxio/alluxio/pull/4422) and removed the test I was doing before
    - changed the GetAlluxio to a FetchAlluxio
    - changed the ListAlluxio to include a state to store the timestamp of the most recent file listed by the processor
    - updated the unit tests to match the changes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    Hey @joewitt, I will update the PR asap this week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/379#discussion_r67610439
  
    --- Diff: nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/GetAlluxio.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.alluxio;
    +
    +import alluxio.AlluxioURI;
    +import alluxio.client.ReadType;
    +import alluxio.client.file.FileInStream;
    +import alluxio.client.file.URIStatus;
    +import alluxio.client.file.options.OpenFileOptions;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.util.StopWatch;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"alluxio", "tachyon", "get", "file"})
    +@EventDriven
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@CapabilityDescription("This processor will access the file using the input URI provided and write the content of "
    +        + "the remote file to the content of the incoming FlowFile.")
    +public class GetAlluxio extends AbstractAlluxioProcessor {
    +
    +    public static final PropertyDescriptor READ_TYPE = new PropertyDescriptor.Builder()
    +            .name("alluxio-read-type")
    +            .displayName("Read type")
    +            .description("The Read Type to use when accessing the remote file")
    +            .defaultValue(ReadType.CACHE_PROMOTE.toString())
    +            .required(true)
    +            .allowableValues(ReadType.values())
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All files successfully retrieved are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("In case of failure, flow files will be routed to this relationship")
    +            .autoTerminateDefault(true)
    +            .build();
    +    public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
    +            .name("original")
    +            .description("In case of success, the original FlowFile will be routed to this relationship")
    +            .autoTerminateDefault(true)
    +            .build();
    +
    +    private final static Set<Relationship> relationships;
    +
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.addAll(descriptors);
    +        _propertyDescriptors.add(READ_TYPE);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_SUCCESS_REQ);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        createFileSystem(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile request = null;
    +        if (context.hasIncomingConnection()) {
    +            request = session.get();
    +
    +            // If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
    +            // However, if we have no FlowFile and we have connections coming from other Processors, then
    +            // we know that we should run only if we have a FlowFile.
    +            if (request == null && context.hasNonLoopConnection()) {
    +                return;
    +            }
    +        }
    +
    +        final StopWatch stopWatch = new StopWatch(true);
    +        final String uri = context.getProperty(URI).evaluateAttributeExpressions(request).getValue();
    +        final AlluxioURI path = new AlluxioURI(uri);
    +        final OpenFileOptions options = OpenFileOptions.defaults().setReadType(ReadType.valueOf(context.getProperty(READ_TYPE).getValue()));
    +
    +        FileInStream in = null;
    +        FlowFile flowFile = null;
    +
    +        if(request == null) {
    +            flowFile = session.create(request);
    +        } else {
    +            flowFile = session.create();
    +        }
    +
    +        try {
    +            final URIStatus status = fileSystem.get().getStatus(path);
    +            flowFile = updateFlowFile(status, flowFile, session);
    +
    +            in = fileSystem.get().openFile(path, options);
    +            final FileInStream toCopy = in;
    +
    +            flowFile = session.write(flowFile, new OutputStreamCallback() {
    +                @Override
    +                public void process(final OutputStream out) throws IOException {
    +                    IOUtils.copy(toCopy, out);
    --- End diff --
    
    This is throwing errors in some cases and not bubbling out to the wrapping catch so there is no indication of issues.  One particular instance was:
    
    > 016-06-19 01:12:52,512 ERROR [Timer-Driven Process Thread-8] o.a.nifi.processors.alluxio.PutAlluxio PutAlluxio[id=37744f8c-57fe-48c9-ac0b-e3fd255efc4b] An error occurred while writing file /nifi-630c823e-7492-4539-bf14-632dfa4b85e2; transferring to 'failure': java.lang.RuntimeException: No available Alluxio worker found
    2016-06-19 01:12:52,513 ERROR [Timer-Driven Process Thread-8] o.a.nifi.processors.alluxio.PutAlluxio
    java.lang.RuntimeException: No available Alluxio worker found
    	at alluxio.client.block.AlluxioBlockStore.getOutStream(AlluxioBlockStore.java:174) ~[alluxio-core-client-1.0.1.jar:na]
    	at alluxio.client.file.FileOutStream.getNextBlock(FileOutStream.java:266) ~[alluxio-core-client-1.0.1.jar:na]
    	at alluxio.client.file.FileOutStream.write(FileOutStream.java:231) ~[alluxio-core-client-1.0.1.jar:na]
    	at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1793) ~[commons-io-2.4.jar:2.4]
    	at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1769) ~[commons-io-2.4.jar:2.4]
    	at org.apache.commons.io.IOUtils.copy(IOUtils.java:1744) ~[commons-io-2.4.jar:2.4]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/379#discussion_r67956202
  
    --- Diff: nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/AbstractAlluxioProcessor.java ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.alluxio;
    +
    +import alluxio.client.file.FileSystem;
    +import alluxio.client.file.URIStatus;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Modifier;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * AbstractCassandraProcessor is a base class for Alluxio processors and contains logic and variables common to most
    + * processors integrating with Alluxio.
    + */
    +public abstract class AbstractAlluxioProcessor extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor MASTER_HOSTNAME = new PropertyDescriptor.Builder()
    +            .name("alluxio-master-ip")
    +            .displayName("Master hostname")
    +            .description("Hostname of the Alluxio File System Master node.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MASTER_PORT = new PropertyDescriptor.Builder()
    +            .name("alluxio-master-port")
    +            .displayName("Master port")
    +            .description("Port to use when connecting to the Alluxio File System Master node.")
    +            .required(true)
    +            .defaultValue("19998")
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
    +            .name("alluxio-uri")
    +            .displayName("URI")
    +            .description("Alluxio URI to use. Example: /path")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static List<PropertyDescriptor> descriptors = new ArrayList<>();
    +
    +    static {
    +        descriptors.add(MASTER_HOSTNAME);
    +        descriptors.add(MASTER_PORT);
    +        descriptors.add(URI);
    +    }
    +
    +    protected final AtomicReference<FileSystem> fileSystem = new AtomicReference<>(null);
    +
    +    protected void createFileSystem(ProcessContext context) {
    +        System.setProperty("alluxio.master.hostname", context.getProperty(MASTER_HOSTNAME).getValue());
    --- End diff --
    
    I didn't find another way to instantiate it... but I agree this is not ideal.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/379#discussion_r67959551
  
    --- Diff: nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/AbstractAlluxioProcessor.java ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.alluxio;
    +
    +import alluxio.client.file.FileSystem;
    +import alluxio.client.file.URIStatus;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Modifier;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * AbstractCassandraProcessor is a base class for Alluxio processors and contains logic and variables common to most
    + * processors integrating with Alluxio.
    + */
    +public abstract class AbstractAlluxioProcessor extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor MASTER_HOSTNAME = new PropertyDescriptor.Builder()
    +            .name("alluxio-master-ip")
    +            .displayName("Master hostname")
    +            .description("Hostname of the Alluxio File System Master node.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MASTER_PORT = new PropertyDescriptor.Builder()
    +            .name("alluxio-master-port")
    +            .displayName("Master port")
    +            .description("Port to use when connecting to the Alluxio File System Master node.")
    +            .required(true)
    +            .defaultValue("19998")
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
    +            .name("alluxio-uri")
    +            .displayName("URI")
    +            .description("Alluxio URI to use. Example: /path")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static List<PropertyDescriptor> descriptors = new ArrayList<>();
    +
    +    static {
    +        descriptors.add(MASTER_HOSTNAME);
    +        descriptors.add(MASTER_PORT);
    +        descriptors.add(URI);
    +    }
    +
    +    protected final AtomicReference<FileSystem> fileSystem = new AtomicReference<>(null);
    +
    +    protected void createFileSystem(ProcessContext context) {
    +        System.setProperty("alluxio.master.hostname", context.getProperty(MASTER_HOSTNAME).getValue());
    --- End diff --
    
    Hmm, okay. Not sure how we've handled this in other cases, but given NiFi being a single process, this limits us to being able to talk to only one instance.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/379#discussion_r67958810
  
    --- Diff: nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/GetAlluxio.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.alluxio;
    +
    +import alluxio.AlluxioURI;
    +import alluxio.client.ReadType;
    +import alluxio.client.file.FileInStream;
    +import alluxio.client.file.URIStatus;
    +import alluxio.client.file.options.OpenFileOptions;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.util.StopWatch;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"alluxio", "tachyon", "get", "file"})
    +@EventDriven
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@CapabilityDescription("This processor will access the file using the input URI provided and write the content of "
    +        + "the remote file to the content of the incoming FlowFile.")
    +public class GetAlluxio extends AbstractAlluxioProcessor {
    +
    +    public static final PropertyDescriptor READ_TYPE = new PropertyDescriptor.Builder()
    +            .name("alluxio-read-type")
    +            .displayName("Read type")
    +            .description("The Read Type to use when accessing the remote file")
    +            .defaultValue(ReadType.CACHE_PROMOTE.toString())
    +            .required(true)
    +            .allowableValues(ReadType.values())
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All files successfully retrieved are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("In case of failure, flow files will be routed to this relationship")
    +            .autoTerminateDefault(true)
    +            .build();
    +    public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
    +            .name("original")
    +            .description("In case of success, the original FlowFile will be routed to this relationship")
    +            .autoTerminateDefault(true)
    +            .build();
    +
    +    private final static Set<Relationship> relationships;
    +
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.addAll(descriptors);
    +        _propertyDescriptors.add(READ_TYPE);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_SUCCESS_REQ);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        createFileSystem(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile request = null;
    +        if (context.hasIncomingConnection()) {
    +            request = session.get();
    +
    +            // If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
    +            // However, if we have no FlowFile and we have connections coming from other Processors, then
    +            // we know that we should run only if we have a FlowFile.
    +            if (request == null && context.hasNonLoopConnection()) {
    +                return;
    +            }
    +        }
    +
    +        final StopWatch stopWatch = new StopWatch(true);
    +        final String uri = context.getProperty(URI).evaluateAttributeExpressions(request).getValue();
    +        final AlluxioURI path = new AlluxioURI(uri);
    +        final OpenFileOptions options = OpenFileOptions.defaults().setReadType(ReadType.valueOf(context.getProperty(READ_TYPE).getValue()));
    +
    +        FileInStream in = null;
    +        FlowFile flowFile = null;
    +
    +        if(request == null) {
    +            flowFile = session.create(request);
    +        } else {
    +            flowFile = session.create();
    +        }
    +
    +        try {
    +            final URIStatus status = fileSystem.get().getStatus(path);
    +            flowFile = updateFlowFile(status, flowFile, session);
    +
    +            in = fileSystem.get().openFile(path, options);
    +            final FileInStream toCopy = in;
    +
    +            flowFile = session.write(flowFile, new OutputStreamCallback() {
    +                @Override
    +                public void process(final OutputStream out) throws IOException {
    +                    IOUtils.copy(toCopy, out);
    --- End diff --
    
    I'll have to scope this out again.  After the fact I realized that it may have just been the bulletin bug that was on master that was causing bulletins to work.  When I next get the opportunity, I'll see if that's still the case.
    
    For some additional context, in the logs, which I now realize I failed to capture, there was mention of the Alluxio logger.  I was curious if that was somehow interfering with NiFi's logger and preventing bulletins.  I'll let you know if I can recreate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/379#discussion_r67610311
  
    --- Diff: nifi-nar-bundles/pom.xml ---
    @@ -59,6 +59,7 @@
             <module>nifi-jms-bundle</module>
             <module>nifi-cassandra-bundle</module>
             <module>nifi-spring-bundle</module>
    +        <module>nifi-alluxio-bundle</module>
    --- End diff --
    
    Will also need to provide the alluxio-nar in the nifi-assembly pom and as a managed dependency in the root pom.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    Hey @trixpan, I didn't try but maybe that would work, I honestly don't know. However, Alluxio can run on top of a lot of storage solutions, so it makes sense to have a dedicated processor. Anyway, I see there is a conflict, I'll need to rebase the PR, will do tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    @pvillard31 this poor PR has been hanging out for quite a while.  Shall we try again?  Can you update?


---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    Thanks for the review @apiri!
    I still need to take into account some of your comments but I rebased the PR against master. While doing some additional testing, for a reason I can't explain yet, I was unable to use the ListProcessor:
    
    ````
    2016-06-21 23:28:56,127 WARN [Timer-Driven Process Thread-9] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding ListAlluxio[id=42835c14-3896-47e0-95ae-ce8266e7c030] due to uncaught Exception: java.lang.NullPointerException: Name is null
    2016-06-21 23:28:56,130 WARN [Timer-Driven Process Thread-9] o.a.n.c.t.ContinuallyRunProcessorTask 
    java.lang.NullPointerException: Name is null
    	at java.lang.Enum.valueOf(Enum.java:236) ~[na:1.8.0_77]
    	at alluxio.exception.AlluxioExceptionType.valueOf(AlluxioExceptionType.java:19) ~[na:na]
    	at alluxio.exception.AlluxioException.from(AlluxioException.java:69) ~[na:na]
    	at alluxio.AbstractClient.retryRPC(AbstractClient.java:324) ~[na:na]
    	at alluxio.client.file.FileSystemMasterClient.listStatus(FileSystemMasterClient.java:271) ~[na:na]
    	at alluxio.client.file.BaseFileSystem.listStatus(BaseFileSystem.java:188) ~[na:na]
    	at alluxio.client.file.BaseFileSystem.listStatus(BaseFileSystem.java:179) ~[na:na]
    	at org.apache.nifi.processors.alluxio.ListAlluxio.onTrigger(ListAlluxio.java:91) ~[na:na]
    	at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
    	at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077) ~[nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
    	at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
    	at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
    	at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_77]
    	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_77]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_77]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_77]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_77]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_77]
    	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
    ````
    
    I'll try to dig more into this tomorrow. Otherwise, a guy from Alluxio reached me to propose his help if needed on this PR, I guess I can ask him to have a look and make some comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/379#discussion_r67956421
  
    --- Diff: nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/GetAlluxio.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.alluxio;
    +
    +import alluxio.AlluxioURI;
    +import alluxio.client.ReadType;
    +import alluxio.client.file.FileInStream;
    +import alluxio.client.file.URIStatus;
    +import alluxio.client.file.options.OpenFileOptions;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.util.StopWatch;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"alluxio", "tachyon", "get", "file"})
    +@EventDriven
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@CapabilityDescription("This processor will access the file using the input URI provided and write the content of "
    +        + "the remote file to the content of the incoming FlowFile.")
    +public class GetAlluxio extends AbstractAlluxioProcessor {
    +
    +    public static final PropertyDescriptor READ_TYPE = new PropertyDescriptor.Builder()
    +            .name("alluxio-read-type")
    +            .displayName("Read type")
    +            .description("The Read Type to use when accessing the remote file")
    +            .defaultValue(ReadType.CACHE_PROMOTE.toString())
    +            .required(true)
    +            .allowableValues(ReadType.values())
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All files successfully retrieved are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("In case of failure, flow files will be routed to this relationship")
    +            .autoTerminateDefault(true)
    +            .build();
    +    public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
    +            .name("original")
    +            .description("In case of success, the original FlowFile will be routed to this relationship")
    +            .autoTerminateDefault(true)
    +            .build();
    +
    +    private final static Set<Relationship> relationships;
    +
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.addAll(descriptors);
    +        _propertyDescriptors.add(READ_TYPE);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_SUCCESS_REQ);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        createFileSystem(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile request = null;
    +        if (context.hasIncomingConnection()) {
    +            request = session.get();
    +
    +            // If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
    +            // However, if we have no FlowFile and we have connections coming from other Processors, then
    +            // we know that we should run only if we have a FlowFile.
    +            if (request == null && context.hasNonLoopConnection()) {
    +                return;
    +            }
    +        }
    +
    +        final StopWatch stopWatch = new StopWatch(true);
    +        final String uri = context.getProperty(URI).evaluateAttributeExpressions(request).getValue();
    +        final AlluxioURI path = new AlluxioURI(uri);
    +        final OpenFileOptions options = OpenFileOptions.defaults().setReadType(ReadType.valueOf(context.getProperty(READ_TYPE).getValue()));
    +
    +        FileInStream in = null;
    +        FlowFile flowFile = null;
    +
    +        if(request == null) {
    +            flowFile = session.create(request);
    +        } else {
    +            flowFile = session.create();
    +        }
    +
    +        try {
    +            final URIStatus status = fileSystem.get().getStatus(path);
    +            flowFile = updateFlowFile(status, flowFile, session);
    +
    +            in = fileSystem.get().openFile(path, options);
    +            final FileInStream toCopy = in;
    +
    +            flowFile = session.write(flowFile, new OutputStreamCallback() {
    +                @Override
    +                public void process(final OutputStream out) throws IOException {
    +                    IOUtils.copy(toCopy, out);
    --- End diff --
    
    What do you mean/suggest?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    no rush!  was just trying to revisit items in the queue


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by trixpan <gi...@git.apache.org>.
Github user trixpan commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    @pvillard31 not sure if it is the case but have you by any chance tried to use the Hadoop alluxio interfaces with puthdfs and co instead of a new processor? 
    
    e.g
    
    
    https://www.alluxio.com/docs/community/1.3/en/Running-Hadoop-MapReduce-on-Alluxio.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    @pvillard31 I saw that the referenced ticket seems to now be resolved.  Not sure if this was still of interest, but if so would be happy to revisit upon updating.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/379#discussion_r67609747
  
    --- Diff: nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/AbstractAlluxioProcessor.java ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.alluxio;
    +
    +import alluxio.client.file.FileSystem;
    +import alluxio.client.file.URIStatus;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Modifier;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * AbstractCassandraProcessor is a base class for Alluxio processors and contains logic and variables common to most
    --- End diff --
    
    Cassandra -> Alluxio


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    and... almost forgot to link a template to play with the processors:
    https://gist.github.com/pvillard31/2bf996adefaf1709339451d71c343ac1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by trixpan <gi...@git.apache.org>.
Github user trixpan commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    And by last comment I naturally refer to use a customised core-site.xml that includes:
    
    ```
    <property>
      <name>fs.alluxio.impl</name>
      <value>alluxio.hadoop.FileSystem</value>
      <description>The Alluxio FileSystem (Hadoop 1.x and 2.x)</description>
    </property>
    <property>
      <name>fs.alluxio-ft.impl</name>
      <value>alluxio.hadoop.FaultTolerantFileSystem</value>
      <description>The Alluxio FileSystem (Hadoop 1.x and 2.x) with fault tolerant support</description>
    </property>
    <property>
      <name>fs.AbstractFileSystem.alluxio.impl</name>
      <value>alluxio.hadoop.AlluxioFileSystem</value>
      <description>The Alluxio AbstractFileSystem (Hadoop 2.x)</description>
    </property>
    ```
    
    And the class_path technique described here https://community.hortonworks.com/articles/71916/connecting-to-azure-data-lake-from-a-nifi-dataflow.html



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    @pvillard31 do you still think we can get this across the finish line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the issue:

    https://github.com/apache/nifi/pull/379
  
    Starting review.
    
    For anyone who was interested in checking it out, I was able to locate a Docker image that seems quite nice.
    
    You can run it locally via:
    `docker run -it -p 19999:19999 -p 30000:30000 --name alluxio harisekhon/alluxio`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #379: NIFI-1022 Added Tachyon/Alluxio processors

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/379#discussion_r67609796
  
    --- Diff: nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/GetAlluxio.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.alluxio;
    +
    +import alluxio.AlluxioURI;
    +import alluxio.client.ReadType;
    +import alluxio.client.file.FileInStream;
    +import alluxio.client.file.URIStatus;
    +import alluxio.client.file.options.OpenFileOptions;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.util.StopWatch;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"alluxio", "tachyon", "get", "file"})
    +@EventDriven
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@CapabilityDescription("This processor will access the file using the input URI provided and write the content of "
    +        + "the remote file to the content of the incoming FlowFile.")
    +public class GetAlluxio extends AbstractAlluxioProcessor {
    +
    +    public static final PropertyDescriptor READ_TYPE = new PropertyDescriptor.Builder()
    +            .name("alluxio-read-type")
    +            .displayName("Read type")
    +            .description("The Read Type to use when accessing the remote file")
    +            .defaultValue(ReadType.CACHE_PROMOTE.toString())
    +            .required(true)
    +            .allowableValues(ReadType.values())
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All files successfully retrieved are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("In case of failure, flow files will be routed to this relationship")
    --- End diff --
    
    minor:  Could we standardize the use of flow file/FlowFile throughout these properties?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---