You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by ajansing <Ja...@bah.com> on 2016/03/01 15:08:48 UTC

GetMongo GC Overflow

Running Mac OS X 10.10.5
             Apache Maven 3.3.9
             java version "1.8.0_72"
             Java(TM) SE Runtime Environment (build 1.8.0_72-b15)

I've been trying to figure out how to use the GetMongo processor to output
to a PutHDFS processor.

Some things I think I've figured out:

*Limit* acts exactly as .limit() for Mongo, where all it does it give you
the first *n* elements in a collections.
*Batch* isn't a command in Mongo (that I know of) and I can't see how this
entry does anything for the processor.

I'm working with a collection in the millions and I can't just simply leave
the limit blank because the JVM runs out of memory. I tried to write my own
processor and got it to compile under the *mvn clean install*, but when I
copy the .nar file from the '...nar/target' directory to the
'nifi-0.6.0/lib' folder and then try to 'sh nifi.sh run' or 'start', to nifi
refuses to finish booting up and terminates itself.

Taking  GetMongo.java
<https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java>  
and it's respective other files. I modified them and changed the following
method:


    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
        final ProcessorLog logger = getLogger();
        final MongoCollection<Document> collection = getCollection(context);
        int count = (int)collection.count();
        int next = context.getProperty(BATCH_SIZE).asInteger();
        int current = next;
        while(count >= current){
            try {
                final FindIterable<Document> it =
collection.find().skip(current).limit(context.getProperty(LIMIT).asInteger());

                final MongoCursor<Document> cursor = it.iterator();
                try {
                    FlowFile flowFile = null;
                    while (cursor.hasNext()) {
                        flowFile = session.create();
                        flowFile = session.write(flowFile, new
OutputStreamCallback() {
                            @Override
                            public void process(OutputStream out) throws
IOException {
                                IOUtils.write(cursor.next().toJson(), out);
                            }
                        });

                        session.getProvenanceReporter().receive(flowFile,
context.getProperty(URI).getValue());
                        session.transfer(flowFile, REL_SUCCESS);
                    }

                    session.commit();

                } finally {
                    cursor.close();
                }
            } catch (final RuntimeException e) {
                context.yield();
                session.rollback();
            }
            current = current + next;
        }
    }


I also modified the test and abstracts so Maven would compile.

Any thoughts?

I'm trying to make a processor that can traverse over an entire collection
in the millions; and later /any/ size.

If anyone has already made one and can share, that'd be great too! Thanks!



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/GetMongo-GC-Overflow-tp7729.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: GetMongo GC Overflow

Posted by ajansing <Ja...@bah.com>.
OH! I think I just fixed it-- might be a sloppy fix, but you were on the
right track with the session.commit(). I reverted back to the original
GetMongo file in the nifi-nar-bundles folder, moved the commit up into the
while loop and it worked!

Thanks!



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/GetMongo-GC-Overflow-edited-tp7729p7745.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: GetMongo GC Overflow

Posted by ajansing <Ja...@bah.com>.
The log file seemed pretty easy to read once I induced the error again.

2016-03-02 11:05:14,732 INFO [main] o.a.n.c.repository.FileSystemRepository
Maximum Threshold for Container default set to 259517359063 bytes; if volume
exceeds this size, archived data will be deleted until it no longer exceeds
this size
2016-03-02 11:05:14,735 INFO [main] o.a.n.c.repository.FileSystemRepository
Initializing FileSystemRepository with 'Always Sync' set to false
2016-03-02 11:05:14,958 ERROR [main] org.apache.nifi.NiFi Failure to launch
NiFi due to java.util.ServiceConfigurationError:
org.apache.nifi.processor.Processor: Provider
scc.processors.myGetMongo.MyProcessor not found
java.util.ServiceConfigurationError: org.apache.nifi.processor.Processor:
Provider scc.processors.myGetMongo.MyProcessor not found
	at java.util.ServiceLoader.fail(ServiceLoader.java:239) ~[na:1.8.0_72]
	at java.util.ServiceLoader.access$300(ServiceLoader.java:185)
~[na:1.8.0_72]
	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:372)
~[na:1.8.0_72]
	at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
~[na:1.8.0_72]
	at java.util.ServiceLoader$1.next(ServiceLoader.java:480) ~[na:1.8.0_72]
	at
org.apache.nifi.nar.ExtensionManager.loadExtensions(ExtensionManager.java:107)
~[nifi-nar-utils-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
	at
org.apache.nifi.nar.ExtensionManager.discoverExtensions(ExtensionManager.java:88)
~[nifi-nar-utils-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
	at org.apache.nifi.NiFi.<init>(NiFi.java:120)
~[nifi-runtime-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
	at org.apache.nifi.NiFi.main(NiFi.java:227)
~[nifi-runtime-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
2016-03-02 11:05:14,958 INFO [Thread-1] org.apache.nifi.NiFi Initiating
shutdown of Jetty web server...
2016-03-02 11:05:14,959 INFO [Thread-1] org.apache.nifi.NiFi Jetty web
server shutdown completed (nicely or otherwise).

Looks as though it sees the .nar file, but doesn't know where a
/MyProcessor/ is-- this might come from my initializing this processor's
folder with *mvn archetype:generate* and modified the pom files for
dependencies. But the processors themselves I copied the
/Users/USER/Documents/nifi/nifi-nar-bundles/nifi-mongodb-bundle and modified
them from there.



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/GetMongo-GC-Overflow-edited-tp7729p7744.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: GetMongo GC Overflow

Posted by Bryan Bende <bb...@gmail.com>.
Hello,

I'm not that familiar with MongoDB, but from looking at the existing
GetMongo processor, it seems to create a FlowFile per Document and only
calls session.commit() once at the very end, which could possibly be a
problem when producing a very significant amount of flow files.

When you mentioned writing your own processor, did you do this as your own
project? or were you modifying the one in apache nifi and rebuilding the
whole project?

There should be some information in nifi_home/logs/nifi-app.log that
indicates why it didn't start up. If you could provide the error messages
and stack traces it would help us figure out what went wrong.

Thanks,

Bryan


On Tue, Mar 1, 2016 at 9:08 AM, ajansing <Ja...@bah.com> wrote:

> Running Mac OS X 10.10.5
>              Apache Maven 3.3.9
>              java version "1.8.0_72"
>              Java(TM) SE Runtime Environment (build 1.8.0_72-b15)
>
> I've been trying to figure out how to use the GetMongo processor to output
> to a PutHDFS processor.
>
> Some things I think I've figured out:
>
> *Limit* acts exactly as .limit() for Mongo, where all it does it give you
> the first *n* elements in a collections.
> *Batch* isn't a command in Mongo (that I know of) and I can't see how this
> entry does anything for the processor.
>
> I'm working with a collection in the millions and I can't just simply leave
> the limit blank because the JVM runs out of memory. I tried to write my own
> processor and got it to compile under the *mvn clean install*, but when I
> copy the .nar file from the '...nar/target' directory to the
> 'nifi-0.6.0/lib' folder and then try to 'sh nifi.sh run' or 'start', to
> nifi
> refuses to finish booting up and terminates itself.
>
> Taking  GetMongo.java
> <
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
> >
> and it's respective other files. I modified them and changed the following
> method:
>
>
>     @Override
>     public void onTrigger(final ProcessContext context, final
> ProcessSession
> session) throws ProcessException {
>         final ProcessorLog logger = getLogger();
>         final MongoCollection<Document> collection =
> getCollection(context);
>         int count = (int)collection.count();
>         int next = context.getProperty(BATCH_SIZE).asInteger();
>         int current = next;
>         while(count >= current){
>             try {
>                 final FindIterable<Document> it =
>
> collection.find().skip(current).limit(context.getProperty(LIMIT).asInteger());
>
>                 final MongoCursor<Document> cursor = it.iterator();
>                 try {
>                     FlowFile flowFile = null;
>                     while (cursor.hasNext()) {
>                         flowFile = session.create();
>                         flowFile = session.write(flowFile, new
> OutputStreamCallback() {
>                             @Override
>                             public void process(OutputStream out) throws
> IOException {
>                                 IOUtils.write(cursor.next().toJson(), out);
>                             }
>                         });
>
>                         session.getProvenanceReporter().receive(flowFile,
> context.getProperty(URI).getValue());
>                         session.transfer(flowFile, REL_SUCCESS);
>                     }
>
>                     session.commit();
>
>                 } finally {
>                     cursor.close();
>                 }
>             } catch (final RuntimeException e) {
>                 context.yield();
>                 session.rollback();
>             }
>             current = current + next;
>         }
>     }
>
>
> I also modified the test and abstracts so Maven would compile.
>
> Any thoughts?
>
> I'm trying to make a processor that can traverse over an entire collection
> in the millions; and later /any/ size.
>
> If anyone has already made one and can share, that'd be great too! Thanks!
>
>
>
> --
> View this message in context:
> http://apache-nifi-developer-list.39713.n7.nabble.com/GetMongo-GC-Overflow-tp7729.html
> Sent from the Apache NiFi Developer List mailing list archive at
> Nabble.com.
>