You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@usergrid.apache.org by snoopdave <gi...@git.apache.org> on 2015/07/15 13:56:11 UTC

[GitHub] incubator-usergrid pull request: New ExportApp tool, using RxJava

GitHub user snoopdave opened a pull request:

    https://github.com/apache/incubator-usergrid/pull/307

    New ExportApp tool, using RxJava

    https://issues.apache.org/jira/browse/USERGRID-788


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/apache/incubator-usergrid rxportapp

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-usergrid/pull/307.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #307
    
----
commit 75ad454cf23bff8dd5ad8dc84563cc1a5520db7f
Author: Dave Johnson <sn...@apache.org>
Date:   2015-07-09T19:19:34Z

    Log total counts

commit b2bdbb5456301dbcf7131e780d968514cdd7e55d
Author: Dave Johnson <sn...@apache.org>
Date:   2015-07-13T14:21:09Z

    Add new RxJava based multi-threaded ExportApp tool, and upgrade to RxJava 1.0.12.

commit 2b65e619316b206720e574a21fe1b33462e0f2dd
Author: Dave Johnson <sn...@apache.org>
Date:   2015-07-14T12:03:18Z

    Add readThread and writeThread CLI parameters.

commit 7a870d6929cea76f5bbec9aa8f5a8caa8dee07e4
Author: Dave Johnson <sn...@apache.org>
Date:   2015-07-14T17:31:18Z

    Remove queues and make the whole thing one "stream"

commit a25f8ebc2877681897a5ef945d39b685c2d3f9fa
Author: Dave Johnson <sn...@apache.org>
Date:   2015-07-14T19:31:07Z

    Some reformatting. Also eliminating use of subscriber.unsubscribe(). All observables need to wrap up with onCompleted().

commit 7b168b91d99ba51da452a9c7980b0078f733df03
Author: Dave Johnson <sn...@apache.org>
Date:   2015-07-14T20:41:06Z

    Less test data and code to compare 1 read and 1 write thread vs. 100 read and 100 write threads.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-usergrid pull request: New ExportApp tool, using RxJava

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-usergrid/pull/307


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-usergrid pull request: New ExportApp tool, using RxJava

Posted by tnine <gi...@git.apache.org>.
Github user tnine commented on a diff in the pull request:

    https://github.com/apache/incubator-usergrid/pull/307#discussion_r34699260
  
    --- Diff: stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java ---
    @@ -0,0 +1,552 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.usergrid.tools;
    +
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.Option;
    +import org.apache.commons.cli.OptionBuilder;
    +import org.apache.commons.cli.Options;
    +import org.apache.usergrid.persistence.Entity;
    +import org.apache.usergrid.persistence.EntityManager;
    +import org.apache.usergrid.persistence.Query;
    +import org.apache.usergrid.persistence.Results;
    +import org.apache.usergrid.utils.StringUtils;
    +import org.codehaus.jackson.JsonGenerator;
    +import org.codehaus.jackson.map.ObjectMapper;
    +import org.codehaus.jackson.util.MinimalPrettyPrinter;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import rx.Observable;
    +import rx.Scheduler;
    +import rx.Subscriber;
    +import rx.functions.Action0;
    +import rx.functions.Action1;
    +import rx.functions.Func1;
    +import rx.schedulers.Schedulers;
    +
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.util.*;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +
    +/**
    + * Export all entities and connections of a Usergrid app.
    + * 
    + * Exports data files to specified directory.
    + * 
    + * Will create as many output files as there are writeThreads (by default: 10).
    + * 
    + * Will create two types of files: *.uge for Usegrird entities and *.ugc for entity to entity connections.
    + * 
    + * Every line of the data files is a complete JSON object.
    + */
    +public class ExportApp extends ExportingToolBase {
    +    static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
    +
    +    static final String APPLICATION_NAME = "application";
    +    private static final String READ_THREAD_COUNT = "readThreads";
    +    private static final String WRITE_THREAD_COUNT = "writeThreads";
    +   
    +    String applicationName;
    +    String organizationName;
    +
    +    AtomicInteger entitiesWritten = new AtomicInteger(0);
    +    AtomicInteger connectionsWritten = new AtomicInteger(0);
    +
    +    Scheduler readScheduler;
    +    Scheduler writeScheduler;
    +
    +    ObjectMapper mapper = new ObjectMapper();
    +    Map<Thread, JsonGenerator> entityGeneratorsByThread  = new HashMap<Thread, JsonGenerator>();
    +    Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
    +
    +    // set via CLI
    +    int readThreadCount = 80;
    +    int writeThreadCount = 10; // limiting write will limit output files 
    +
    +
    +    @Override
    +    @SuppressWarnings("static-access")
    +    public Options createOptions() {
    +
    +        Options options = super.createOptions();
    +
    +        Option appNameOption = OptionBuilder.hasArg().withType("")
    +                .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
    +        options.addOption( appNameOption );
    +
    +        Option readThreadsOption = OptionBuilder.hasArg().withType(0)
    +                .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
    +        options.addOption( readThreadsOption );
    +
    +        Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
    +                .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
    +        options.addOption( writeThreadsOption );
    +
    +        return options;
    +    }
    +
    +    
    +    /**
    +     * Tool entry point. 
    +     */
    +    @Override
    +    public void runTool(CommandLine line) throws Exception {
    +        
    +        applicationName = line.getOptionValue( APPLICATION_NAME );
    +
    +        if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
    +            try {
    +                readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
    +            } catch (NumberFormatException nfe) {
    +                logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
    +                return;
    +            }
    +        }
    +        
    +        if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
    +            try {
    +                writeThreadCount = Integer.parseInt( line.getOptionValue( WRITE_THREAD_COUNT ) );
    +            } catch (NumberFormatException nfe) {
    +                logger.error( "-" + WRITE_THREAD_COUNT + " must be specified as an integer. Aborting..." );
    +                return;
    +            }
    +        }
    +
    +        setVerbose( line );
    +
    +        applyOrgId( line );
    +        prepareBaseOutputFileName( line );
    +        outputDir = createOutputParentDir();
    +        logger.info( "Export directory: " + outputDir.getAbsolutePath() );
    +
    +        startSpring();
    +        
    +        UUID applicationId = emf.lookupApplication( applicationName );
    +        final EntityManager em = emf.getEntityManager( applicationId );
    +        organizationName = em.getApplication().getOrganizationName();
    +
    +        ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
    +        readScheduler = Schedulers.from( readThreadPoolExecutor );
    +
    +        ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
    +        writeScheduler = Schedulers.from( writeThreadPoolExecutor );
    +
    +        Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
    +        
    +        collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
    +            
    +            public Observable<ExportEntity> call(String collection) {
    +                return Observable.create( new EntityObservable( em, collection ))
    +                        .doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
    +            }
    +            
    +        }, 10).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
    +            
    +            public Observable<ExportConnection> call(ExportEntity exportEntity) {
    +                return Observable.create( new ConnectionsObservable( em, exportEntity ))
    +                        .doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
    +            }
    +            
    +        }, 10)
    --- End diff --
    
    Same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-usergrid pull request: New ExportApp tool, using RxJava

Posted by tnine <gi...@git.apache.org>.
Github user tnine commented on a diff in the pull request:

    https://github.com/apache/incubator-usergrid/pull/307#discussion_r34699249
  
    --- Diff: stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java ---
    @@ -0,0 +1,552 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.usergrid.tools;
    +
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.Option;
    +import org.apache.commons.cli.OptionBuilder;
    +import org.apache.commons.cli.Options;
    +import org.apache.usergrid.persistence.Entity;
    +import org.apache.usergrid.persistence.EntityManager;
    +import org.apache.usergrid.persistence.Query;
    +import org.apache.usergrid.persistence.Results;
    +import org.apache.usergrid.utils.StringUtils;
    +import org.codehaus.jackson.JsonGenerator;
    +import org.codehaus.jackson.map.ObjectMapper;
    +import org.codehaus.jackson.util.MinimalPrettyPrinter;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import rx.Observable;
    +import rx.Scheduler;
    +import rx.Subscriber;
    +import rx.functions.Action0;
    +import rx.functions.Action1;
    +import rx.functions.Func1;
    +import rx.schedulers.Schedulers;
    +
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.util.*;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +
    +/**
    + * Export all entities and connections of a Usergrid app.
    + * 
    + * Exports data files to specified directory.
    + * 
    + * Will create as many output files as there are writeThreads (by default: 10).
    + * 
    + * Will create two types of files: *.uge for Usegrird entities and *.ugc for entity to entity connections.
    + * 
    + * Every line of the data files is a complete JSON object.
    + */
    +public class ExportApp extends ExportingToolBase {
    +    static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
    +
    +    static final String APPLICATION_NAME = "application";
    +    private static final String READ_THREAD_COUNT = "readThreads";
    +    private static final String WRITE_THREAD_COUNT = "writeThreads";
    +   
    +    String applicationName;
    +    String organizationName;
    +
    +    AtomicInteger entitiesWritten = new AtomicInteger(0);
    +    AtomicInteger connectionsWritten = new AtomicInteger(0);
    +
    +    Scheduler readScheduler;
    +    Scheduler writeScheduler;
    +
    +    ObjectMapper mapper = new ObjectMapper();
    +    Map<Thread, JsonGenerator> entityGeneratorsByThread  = new HashMap<Thread, JsonGenerator>();
    +    Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
    +
    +    // set via CLI
    +    int readThreadCount = 80;
    +    int writeThreadCount = 10; // limiting write will limit output files 
    +
    +
    +    @Override
    +    @SuppressWarnings("static-access")
    +    public Options createOptions() {
    +
    +        Options options = super.createOptions();
    +
    +        Option appNameOption = OptionBuilder.hasArg().withType("")
    +                .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
    +        options.addOption( appNameOption );
    +
    +        Option readThreadsOption = OptionBuilder.hasArg().withType(0)
    +                .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
    +        options.addOption( readThreadsOption );
    +
    +        Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
    +                .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
    +        options.addOption( writeThreadsOption );
    +
    +        return options;
    +    }
    +
    +    
    +    /**
    +     * Tool entry point. 
    +     */
    +    @Override
    +    public void runTool(CommandLine line) throws Exception {
    +        
    +        applicationName = line.getOptionValue( APPLICATION_NAME );
    +
    +        if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
    +            try {
    +                readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
    +            } catch (NumberFormatException nfe) {
    +                logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
    +                return;
    +            }
    +        }
    +        
    +        if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
    +            try {
    +                writeThreadCount = Integer.parseInt( line.getOptionValue( WRITE_THREAD_COUNT ) );
    +            } catch (NumberFormatException nfe) {
    +                logger.error( "-" + WRITE_THREAD_COUNT + " must be specified as an integer. Aborting..." );
    +                return;
    +            }
    +        }
    +
    +        setVerbose( line );
    +
    +        applyOrgId( line );
    +        prepareBaseOutputFileName( line );
    +        outputDir = createOutputParentDir();
    +        logger.info( "Export directory: " + outputDir.getAbsolutePath() );
    +
    +        startSpring();
    +        
    +        UUID applicationId = emf.lookupApplication( applicationName );
    +        final EntityManager em = emf.getEntityManager( applicationId );
    +        organizationName = em.getApplication().getOrganizationName();
    +
    +        ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
    +        readScheduler = Schedulers.from( readThreadPoolExecutor );
    +
    +        ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
    +        writeScheduler = Schedulers.from( writeThreadPoolExecutor );
    +
    +        Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
    +        
    +        collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
    +            
    +            public Observable<ExportEntity> call(String collection) {
    +                return Observable.create( new EntityObservable( em, collection ))
    +                        .doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
    +            }
    +            
    +        }, 10).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
    --- End diff --
    
    Rather than have a static 10 here, we should probably use the Schedulers.io() scheduler, then use the write thread count as this value. Otherwise you'll just introduce contention on a thread pool.  If we use the unbounded Schedulers.io(), and us the size as the flatmap argument, we'll never exceed the set thread size.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-usergrid pull request: New ExportApp tool, using RxJava

Posted by snoopdave <gi...@git.apache.org>.
Github user snoopdave commented on the pull request:

    https://github.com/apache/incubator-usergrid/pull/307#issuecomment-121694547
  
    @tnine Please take another look. I made flatmap max observables match write thread count and use Schedulers.io() instead of a custom readScheduler.  But I had to keep the writeScheduler in place, otherwise my test fails because (I think) threads explode and Cassandra falls over.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---