You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/07/21 22:09:15 UTC

[01/10] incubator-usergrid git commit: Add new RxJava based multi-threaded ExportApp tool, and upgrade to RxJava 1.0.12.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/master 897fd5083 -> 9b9f55ac2


Add new RxJava based multi-threaded ExportApp tool, and upgrade to RxJava 1.0.12.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b2bdbb54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b2bdbb54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b2bdbb54

Branch: refs/heads/master
Commit: b2bdbb5456301dbcf7131e780d968514cdd7e55d
Parents: 75ad454
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Jul 13 10:21:09 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Jul 13 10:21:09 2015 -0400

----------------------------------------------------------------------
 stack/core/pom.xml                              |   9 +-
 stack/pom.xml                                   |   2 +-
 .../org/apache/usergrid/tools/ExportApp.java    | 687 +++++++++++++++++++
 .../apache/usergrid/tools/ExportAppTest.java    |  97 +++
 4 files changed, 790 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index 47fa840..f60dbc2 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -573,14 +573,15 @@
     </dependency>
 
     <dependency>
-      <groupId>com.netflix.rxjava</groupId>
-      <artifactId>rxjava-core</artifactId>
+      <groupId>io.reactivex</groupId>
+      <artifactId>rxjava</artifactId>
       <version>${rx.version}</version>
     </dependency>
+    
     <dependency>
-      <groupId>com.netflix.rxjava</groupId>
+      <groupId>io.reactivex</groupId>
       <artifactId>rxjava-math</artifactId>
-      <version>${rx.version}</version>
+      <version>1.0.0</version>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index bdc3549..da1b62c 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -108,7 +108,7 @@
     <antlr.version>3.4</antlr.version>
     <tika.version>1.4</tika.version>
     <metrics.version>3.0.0</metrics.version>
-    <rx.version>0.19.6</rx.version>
+    <rx.version>1.0.12</rx.version>
   </properties>
 
   <licenses>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
new file mode 100644
index 0000000..ceb3ecd
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.Results;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.util.MinimalPrettyPrinter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Export application's collections.
+ */
+public class ExportApp extends ExportingToolBase {
+    static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
+    
+    // we will write two types of files: entities and connections
+    BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
+    BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
+
+    static final String APPLICATION_NAME = "application";
+    
+    int pollTimeoutSeconds = 10;
+
+    // limiting output threads will limit output files 
+    final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(8);
+    final Scheduler scheduler = Schedulers.from( threadPoolExecutor );
+
+    Map<Thread, JsonGenerator> entityGeneratorsByThread  = new HashMap<Thread, JsonGenerator>();
+    Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
+
+    List<String> emptyFiles = new ArrayList<String>();
+    
+    AtomicInteger activePollers = new AtomicInteger(0);
+    AtomicInteger entitiesQueued = new AtomicInteger(0);
+    AtomicInteger entitiesWritten = new AtomicInteger(0);
+    AtomicInteger connectionsWritten = new AtomicInteger(0);
+    AtomicInteger connectionsQueued = new AtomicInteger(0);
+
+    ObjectMapper mapper = new ObjectMapper();
+    
+    /**
+     * Export admin users using multiple threads.
+     * <p/>
+     * How it works:
+     * In main thread we query for IDs of all admin users, add each ID to read queue.
+     * Read-queue workers read admin user data, add data to write queue.
+     * One write-queue worker reads data writes to file.
+     */
+    @Override
+    public void runTool(CommandLine line) throws Exception {
+        
+        String applicationName = line.getOptionValue( APPLICATION_NAME );
+
+//        if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
+//            try {
+//                readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
+//            } catch (NumberFormatException nfe) {
+//                logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+//                return;
+//            }
+//        } else {
+//            readThreadCount = 20;
+//        }
+        
+        startSpring();
+
+        setVerbose( line );
+
+        applyOrgId( line );
+        prepareBaseOutputFileName( line );
+        outputDir = createOutputParentDir();
+        logger.info( "Export directory: " + outputDir.getAbsolutePath() );
+       
+        UUID applicationId = emf.lookupApplication( applicationName );
+        final EntityManager em = emf.getEntityManager( applicationId );
+
+        // start write queue workers
+
+        EntityWritesOnSubscribe entityWritesOnSub = new EntityWritesOnSubscribe( entityWriteQueue );
+        rx.Observable entityWritesObservable = rx.Observable.create( entityWritesOnSub );
+        entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
+            public Observable<ExportEntity> call(ExportEntity exportEntity) {
+                return Observable.just(exportEntity).doOnNext( 
+                        new EntityWriteAction() ).subscribeOn( scheduler );
+            }
+        },10).subscribeOn( scheduler ).subscribe();
+
+        ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
+        rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
+        connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
+            public Observable<ExportConnection> call(ExportConnection connection ) {
+                return Observable.just(connection).doOnNext( 
+                        new ConnectionWriteAction()).subscribeOn( scheduler );
+            }
+        },10).subscribeOn( scheduler ).subscribe();
+
+        // start processing data and filling up write queues
+
+        CollectionsOnSubscribe onSubscribe = new CollectionsOnSubscribe( em );
+        rx.Observable collectionsObservable = rx.Observable.create( onSubscribe );
+        collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
+            public Observable<String> call(String collection) {
+                return Observable.just(collection).doOnNext( 
+                        new CollectionAction( em ) ).subscribeOn( Schedulers.io() );
+            }
+        },40).subscribeOn( Schedulers.io() ).subscribe();
+        
+        // wait for write thread pollers to get started
+
+        try { Thread.sleep( 1000 ); } catch (InterruptedException ignored) {}
+
+        // wait for write-thread pollers to stop
+
+        while ( activePollers.get() > 0 ) {
+            logger.info(
+                     "Active write threads: {}\n"
+                    +"Entities written:     {}\n"
+                    +"Entities queued:      {}\n"
+                    +"Connections written:  {}\n"
+                    +"Connections queued:   {}\n",
+                    new Object[] { 
+                        activePollers.get(),
+                        entitiesWritten.get(), 
+                        entitiesQueued.get(),
+                        connectionsWritten.get(), 
+                        connectionsQueued.get()} );
+            try { Thread.sleep( 5000 ); } catch (InterruptedException ignored) {}
+        }
+
+        // wrap up files
+
+        for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
+            //gen.writeEndArray();
+            gen.flush();
+            gen.close();
+        }
+
+        for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
+            //gen.writeEndArray();
+            gen.flush();
+            gen.close();
+        }
+
+        for ( String fileName : emptyFiles ) {
+            File emptyFile = new File(fileName);
+            emptyFile.deleteOnExit();
+        }
+
+    }
+
+    @Override
+    @SuppressWarnings("static-access")
+    public Options createOptions() {
+
+        Options options = super.createOptions();
+
+        Option readThreads = OptionBuilder.hasArg().withType("")
+                .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
+        options.addOption( readThreads );
+        
+//        Option readThreads = OptionBuilder
+//                .hasArg().withType(0).withDescription("Read Threads -" + READ_THREAD_COUNT).create(READ_THREAD_COUNT);
+//        options.addOption( readThreads );
+        
+        return options;
+    }
+
+    // ----------------------------------------------------------------------------------------
+    // reading data
+
+    /**
+     * Emits collection names found in application.
+     */
+    class CollectionsOnSubscribe implements rx.Observable.OnSubscribe<String> {
+        EntityManager em;
+                
+        public CollectionsOnSubscribe( EntityManager em ) {
+            this.em = em;
+        }
+
+        public void call(Subscriber<? super String> subscriber) {
+            
+            logger.info("Starting to read collections");
+            
+            int count = 0;
+            try {
+                Map<String, Object> collectionMetadata = em.getApplicationCollectionMetadata();
+                for ( String collection : collectionMetadata.keySet() ) {
+                    subscriber.onNext( collection );
+                    count++;
+                }
+                
+            } catch (Exception e) {
+                subscriber.onError( e );
+            }
+            logger.info("Done. Read {} collection names", count);
+            if ( count > 0 ) {
+                subscriber.onCompleted();
+            } else {
+                subscriber.unsubscribe();
+            }
+        }
+    }
+
+    /**
+     * Emits entities of collection.
+     */
+    class EntityOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
+        EntityManager em;
+        String collection;
+
+        public EntityOnSubscribe(EntityManager em, String collection) {
+            this.em = em;
+            this.collection = collection;
+        }
+
+        public void call(Subscriber<? super ExportEntity> subscriber) {
+
+            logger.info("Starting to read entities of collection {}", collection);
+            
+            try {
+                int count = 0;
+
+                Query query = new Query();
+                query.setLimit( MAX_ENTITY_FETCH );
+
+                Results results = em.searchCollection( em.getApplicationRef(), collection, query );
+
+                while (results.size() > 0) {
+                    for (Entity entity : results.getEntities()) {
+                        try {
+                            Set<String> dictionaries = em.getDictionaries( entity );
+                            Map dictionariesByName = new HashMap<String, Map<Object, Object>>();
+                            for (String dictionary : dictionaries) {
+                                Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary );
+                                if (dict.isEmpty()) {
+                                    continue;
+                                }
+                                dictionariesByName.put( dictionary, dict );
+                            }
+                            ExportEntity exportEntity = new ExportEntity( 
+                                    em.getApplication().getApplicationName(), 
+                                    entity, dictionariesByName );
+                            subscriber.onNext( exportEntity );
+                            count++;
+                            
+                        } catch (Exception e) {
+                            logger.error("Error reading entity " + entity.getUuid() +" from collection " + collection);
+                        }
+                    }
+                    if (results.getCursor() == null) {
+                        break;
+                    }
+                    query.setCursor( results.getCursor() );
+                    results = em.searchCollection( em.getApplicationRef(), collection, query );
+                }
+
+                logger.info("Done. Read {} entities", count);
+                if ( count > 0 ) {
+                    subscriber.onCompleted();
+                } else {
+                    subscriber.unsubscribe();
+                }
+                
+            } catch ( Exception e ) {
+                subscriber.onError(e);
+            }
+        }
+    }
+
+    /**
+     * Emits connections of an entity.
+     */
+    class ConnectionsOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
+        EntityManager em;
+        ExportEntity exportEntity;
+
+        public ConnectionsOnSubscribe(EntityManager em, ExportEntity exportEntity) {
+            this.em = em;
+            this.exportEntity = exportEntity;
+        }
+
+        public void call(Subscriber<? super ExportConnection> subscriber) {
+
+            logger.info("Starting to read connections for entity type {}", exportEntity.getEntity().getType());
+            
+            int count = 0;
+            
+            try {
+                Set<String> connectionTypes = em.getConnectionTypes( exportEntity.getEntity() );
+                for (String connectionType : connectionTypes) {
+
+                    Results results = em.getConnectedEntities( 
+                            exportEntity.getEntity().getUuid(), connectionType, null, Results.Level.CORE_PROPERTIES );
+
+                    for (Entity connectedEntity : results.getEntities()) {
+                        try {
+                            ExportConnection connection = new ExportConnection( 
+                                    em.getApplication().getApplicationName(), 
+                                    connectionType, 
+                                    exportEntity.getEntity().getUuid(), 
+                                    connectedEntity.getUuid());
+                            subscriber.onNext( connection );
+                            count++;
+
+                        } catch (Exception e) {
+                            logger.error( "Error reading connection entity " 
+                                + exportEntity.getEntity().getUuid() + " -> " + connectedEntity.getType());
+                        }
+                    }
+                }
+                
+            } catch (Exception e) {
+                subscriber.onError( e );
+            }
+
+            logger.info("Done. Read {} connections", count);
+            if ( count > 0 ) {
+                subscriber.onCompleted();
+            } else {
+                subscriber.unsubscribe();
+            }
+        }
+    }
+
+    /**
+     * Process collection by starting processing of its entities.
+     */
+    class CollectionAction implements Action1<String> {
+        EntityManager em;
+
+        public CollectionAction( EntityManager em ) {
+            this.em = em;
+        }
+
+        public void call(String collection) {
+
+            // process entities of collection in parallel            
+            EntityOnSubscribe onSubscribe = new EntityOnSubscribe( em, collection );
+            rx.Observable entityObservable = rx.Observable.create( onSubscribe );
+            entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
+                public Observable<ExportEntity> call(ExportEntity exportEntity) {
+                    return Observable.just(exportEntity).doOnNext( 
+                            new EntityAction( em ) ).subscribeOn( Schedulers.io() );
+                }
+            }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+        }
+    }
+
+    /**
+     * Process entity by adding it to entityWriteQueue and starting processing of its connections.
+     */
+    class EntityAction implements Action1<ExportEntity> {
+        EntityManager em;
+
+        public EntityAction( EntityManager em ) {
+            this.em = em;
+        }
+        
+        public void call(ExportEntity exportEntity) {
+            //logger.debug( "Processing entity: " + exportEntity.getEntity().getUuid() );
+
+            entityWriteQueue.add( exportEntity );
+            entitiesQueued.getAndIncrement();
+
+            // if entity has connections, process them in parallel
+            try {
+                Results connectedEntities = em.getConnectedEntities(
+                        exportEntity.getEntity().getUuid(), null, null, Results.Level.CORE_PROPERTIES );
+
+                if ( !connectedEntities.isEmpty() ) {
+                    ConnectionsOnSubscribe onSubscribe = new ConnectionsOnSubscribe( em, exportEntity );
+                    rx.Observable entityObservable = rx.Observable.create( onSubscribe );
+                    
+                    entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
+                        public Observable<ExportConnection> call(ExportConnection connection) {
+                            return Observable.just(connection).doOnNext( 
+                                    new ConnectionsAction() ).subscribeOn( Schedulers.io() );
+                        }
+                    }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+                }
+                
+            } catch (Exception e) {
+                throw new RuntimeException( "Error getting connections", e );
+            }
+        }
+    }
+
+    /**
+     * Process connection by adding it to connectionWriteQueue. 
+     */
+    class ConnectionsAction implements Action1<ExportConnection> {
+
+        public void call(ExportConnection conn) {
+            //logger.debug( "Processing connections for entity: " + conn.getSourceUuid() );
+            connectionWriteQueue.add(conn);
+            connectionsQueued.getAndIncrement();
+        }
+    }
+
+
+    // ----------------------------------------------------------------------------------------
+    // writing data
+
+    /**
+     * Emits entities to be written.
+     */
+    class EntityWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
+        BlockingQueue<ExportEntity> queue;
+
+        public EntityWritesOnSubscribe( BlockingQueue<ExportEntity> queue ) {
+            this.queue = queue;
+        }
+
+        public void call(Subscriber<? super ExportEntity> subscriber) {
+            int count = 0;
+            
+            while ( true ) {
+                ExportEntity entity = null;
+                try {
+                    //logger.debug( "Wrote {}. Polling for entity to write...", count );
+                    activePollers.getAndIncrement();
+                    entity = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
+                } catch (InterruptedException e) {
+                    logger.error("Entity poll interrupted", e);
+                    continue;
+                } finally {
+                    activePollers.getAndDecrement();
+                }
+                if ( entity == null ) {
+                    break;
+                }
+                subscriber.onNext( entity );
+                count++;
+            }
+
+            logger.info("Done. Wrote {} entities", count);
+            if ( count > 0 ) {
+                subscriber.onCompleted();
+            } else {
+                subscriber.unsubscribe();
+            }
+        }
+    }
+
+    /**
+     * Emits connections to be written.
+     */
+    class ConnectionWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
+        BlockingQueue<ExportConnection> queue;
+
+        public ConnectionWritesOnSubscribe( BlockingQueue<ExportConnection> queue ) {
+            this.queue = queue;
+        }
+
+        public void call(Subscriber<? super ExportConnection> subscriber) {
+            int count = 0;
+            
+            while ( true ) {
+                ExportConnection connection = null;
+                try {
+                    //logger.debug( "Wrote {}. Polling for connection to write", count );
+                    activePollers.getAndIncrement();
+                    connection = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
+                } catch (InterruptedException e) {
+                    logger.error("Connection poll interrupted", e);
+                    continue;
+                } finally {
+                    activePollers.getAndDecrement();
+                }
+                if ( connection == null ) {
+                    break;
+                }
+                subscriber.onNext( connection );
+                count++;
+            }
+
+            logger.info("Done. Wrote {} connections", count);
+            if ( count > 0 ) {
+                subscriber.onCompleted();
+            } else {
+                subscriber.unsubscribe();
+            }
+        }
+    }
+
+    /**
+     * Writes entities to JSON file.
+     */
+    class EntityWriteAction implements Action1<ExportEntity> {
+
+        public void call(ExportEntity entity) {
+
+            boolean wroteData = false;
+
+            String fileName = "target/" + Thread.currentThread().getName() + ".ude";
+
+            JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
+            if ( gen == null ) {
+
+                // no generator so we are opening new file and writing the start of an array
+                try {
+                    gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+                } catch (IOException e) {
+                    throw new RuntimeException("Error opening output file: " + fileName, e);
+                }
+                gen.setPrettyPrinter( new MinimalPrettyPrinter(""));
+                gen.setCodec( mapper );
+                entityGeneratorsByThread.put( Thread.currentThread(), gen );
+            }
+
+            try {
+                gen.writeObject( entity );
+                gen.writeRaw('\n');
+                entitiesWritten.getAndIncrement();
+                wroteData = true;
+
+            } catch (IOException e) {
+                throw new RuntimeException("Error writing to output file: " + fileName, e);
+            }
+
+            if ( !wroteData ) {
+                emptyFiles.add( fileName );
+            }
+        }
+    }
+
+    /**
+     * Writes connection to JSON file.
+     */
+    class ConnectionWriteAction implements Action1<ExportConnection> {
+
+        public void call(ExportConnection conn) {
+
+            boolean wroteData = false;
+
+            String fileName = "target/" + Thread.currentThread().getName() + ".ugc";
+
+            JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
+            if ( gen == null ) {
+
+                // no generator so we are opening new file and writing the start of an array
+                try {
+                    gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+                } catch (IOException e) {
+                    throw new RuntimeException("Error opening output file: " + fileName, e);
+                }
+                gen.setPrettyPrinter( new MinimalPrettyPrinter(""));
+                gen.setCodec( mapper );
+                connectionGeneratorsByThread.put( Thread.currentThread(), gen );
+            }
+
+            try {
+                gen.writeObject( conn );
+                gen.writeRaw('\n');
+                connectionsWritten.getAndIncrement();
+                wroteData = true;
+
+            } catch (IOException e) {
+                throw new RuntimeException("Error writing to output file: " + fileName, e);
+            }
+
+            if ( !wroteData ) {
+                emptyFiles.add( fileName );
+            }
+        }
+    }
+
+}
+
+class ExportEntity {
+    private String application;
+    private Entity entity;
+    private Map<String, Object> dictionaries;
+    public ExportEntity( String application, Entity entity, Map<String, Object> dictionaries ) {
+        this.application = application;
+        this.entity = entity;
+        this.dictionaries = dictionaries;
+    }
+
+    public String getApplication() {
+        return application;
+    }
+
+    public void setApplication(String application) {
+        this.application = application;
+    }
+
+    public Entity getEntity() {
+        return entity;
+    }
+
+    public void setEntity(Entity entity) {
+        this.entity = entity;
+    }
+
+    public Map<String, Object> getDictionaries() {
+        return dictionaries;
+    }
+
+    public void setDictionaries(Map<String, Object> dictionaries) {
+        this.dictionaries = dictionaries;
+    }
+}
+
+class ExportConnection {
+    private String application;
+    private String connectionType;
+    private UUID sourceUuid;
+    private UUID targetUuid;
+    public ExportConnection(String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+        this.application = application;
+        this.connectionType = connectionType;
+        this.sourceUuid = sourceUuid;
+        this.targetUuid = targetUuid;
+    }
+
+    public String getApplication() {
+        return application;
+    }
+
+    public void setApplication(String application) {
+        this.application = application;
+    }
+
+    public String getConnectionType() {
+        return connectionType;
+    }
+
+    public void setConnectionType(String connectionType) {
+        this.connectionType = connectionType;
+    }
+
+    public UUID getSourceUuid() {
+        return sourceUuid;
+    }
+
+    public void setSourceUuid(UUID sourceUuid) {
+        this.sourceUuid = sourceUuid;
+    }
+
+    public UUID getTargetUuid() {
+        return targetUuid;
+    }
+
+    public void setTargetUuid(UUID targetUuid) {
+        this.targetUuid = targetUuid;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
new file mode 100644
index 0000000..14b9311
--- /dev/null
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.ServiceITSetup;
+import org.apache.usergrid.ServiceITSetupImpl;
+import org.apache.usergrid.ServiceITSuite;
+import org.apache.usergrid.management.ApplicationInfo;
+import org.apache.usergrid.management.OrganizationOwnerInfo;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.junit.ClassRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+
+public class ExportAppTest {
+    static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
+
+    @ClassRule
+    public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
+
+    @org.junit.Test
+    public void testBasicOperation() throws Exception {
+       
+        String rand = RandomStringUtils.randomAlphanumeric( 10 );
+        
+        // create app with some data
+
+        OrganizationOwnerInfo orgInfo = setup.getMgmtSvc().createOwnerAndOrganization(
+                "org_" + rand, "user_" + rand, rand.toUpperCase(), rand + "@example.com", rand );
+
+        ApplicationInfo appInfo = setup.getMgmtSvc().createApplication(
+                orgInfo.getOrganization().getUuid(), "app_" + rand );
+
+        EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
+        
+        // create 10 connected things
+
+        List<Entity> connectedThings = new ArrayList<Entity>();
+        String connectedType = "connected_thing";
+        em.createApplicationCollection(connectedType);
+        for ( int j=0; j<10; j++) {
+            final String name = "connected_thing_" + j;
+            connectedThings.add( em.create( connectedType, new HashMap<String, Object>() {{
+                put( "name", name );
+            }} ) );
+        }
+       
+        // create 10 collections of 10 things, every other thing is connected to the connected things
+        
+        for ( int i=0; i<10; i++) {
+            String type = "thing_"+i;
+            em.createApplicationCollection(type);
+            for ( int j=0; j<10; j++) {
+                final String name = "thing_" + j;
+                Entity source = em.create(type, new HashMap<String, Object>() {{ put("name", name); }});
+                if ( j % 2 == 0 ) {
+                    for ( Entity target : connectedThings ) {
+                        em.createConnection( source, "has", target );
+                    }
+                }
+            }
+        }
+        
+        // export to file
+
+        String directoryName = "./target/export" + rand;
+
+        ExportApp exportApp = new ExportApp();
+        exportApp.startTool( new String[]{
+                "-application", appInfo.getName(),
+                "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
+                "-outputDir", directoryName
+        }, false );
+        
+    }
+}
\ No newline at end of file


[09/10] incubator-usergrid git commit: Added data creator to generate data for ExportApp testing.

Posted by sf...@apache.org.
Added data creator to generate data for ExportApp testing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a6e68a0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a6e68a0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a6e68a0a

Branch: refs/heads/master
Commit: a6e68a0adc571b0a30138d4cb71b5e0e49a42c0a
Parents: 2748347
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 21 14:59:59 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 21 14:59:59 2015 -0400

----------------------------------------------------------------------
 stack/pom.xml                                   |   2 +-
 stack/tools/pom.xml                             |   6 +
 .../org/apache/usergrid/tools/ExportApp.java    |  17 +-
 .../usergrid/tools/ExportDataCreator.java       | 244 +++++++++++++++----
 .../org/apache/usergrid/tools/ToolBase.java     |   2 +-
 .../apache/usergrid/tools/ExportAppTest.java    |  65 +----
 6 files changed, 223 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index da1b62c..0e1b32d 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -103,7 +103,7 @@
     <org.springframework.version>3.1.2.RELEASE</org.springframework.version>
     <shiro-version>1.2.0</shiro-version>
     <slf4j-version>1.6.1</slf4j-version>
-    <snakeyaml-version>1.8</snakeyaml-version>
+    <snakeyaml-version>1.9</snakeyaml-version>
     <tomcat-version>7.0.42</tomcat-version>
     <antlr.version>3.4</antlr.version>
     <tika.version>1.4</tika.version>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/pom.xml
----------------------------------------------------------------------
diff --git a/stack/tools/pom.xml b/stack/tools/pom.xml
index 4be3232..3402612 100644
--- a/stack/tools/pom.xml
+++ b/stack/tools/pom.xml
@@ -249,5 +249,11 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>io.codearte.jfairy</groupId>
+      <artifactId>jfairy</artifactId>
+      <version>0.4.3</version>
+    </dependency>
+
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index b2da0ea..db975e6 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -102,7 +102,7 @@ public class ExportApp extends ExportingToolBase {
      */
     @Override
     public void runTool(CommandLine line) throws Exception {
-        
+
         applicationName = line.getOptionValue( APPLICATION_NAME );
 
         if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
@@ -122,8 +122,11 @@ public class ExportApp extends ExportingToolBase {
         logger.info( "Export directory: " + outputDir.getAbsolutePath() );
 
         startSpring();
-        
+
         UUID applicationId = emf.lookupApplication( applicationName );
+        if (applicationId == null) {
+            throw new RuntimeException( "Cannot find application " + applicationName );
+        }
         final EntityManager em = emf.getEntityManager( applicationId );
         organizationName = em.getApplication().getOrganizationName();
 
@@ -133,8 +136,9 @@ public class ExportApp extends ExportingToolBase {
         Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
         
         collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
-            
+
             public Observable<ExportEntity> call(String collection) {
+
                 return Observable.create( new EntityObservable( em, collection ) )
                         .doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
             }
@@ -142,16 +146,17 @@ public class ExportApp extends ExportingToolBase {
         }, writeThreadCount ).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
 
             public Observable<ExportConnection> call(ExportEntity exportEntity) {
+
                 return Observable.create( new ConnectionsObservable( em, exportEntity ) )
                         .doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
             }
 
-        }, writeThreadCount)
+        }, writeThreadCount )
             .doOnCompleted( new FileWrapUpAction() )
             .toBlocking().last();
     }
-
-
+   
+    
     // ----------------------------------------------------------------------------------------
     // reading data
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java
index a3d6517..6fa4896 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java
@@ -17,90 +17,232 @@
 package org.apache.usergrid.tools;
 
 
-import java.util.UUID;
-
-import org.apache.usergrid.management.ManagementService;
+import io.codearte.jfairy.Fairy;
+import io.codearte.jfairy.producer.company.Company;
+import io.codearte.jfairy.producer.person.Person;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.management.ApplicationInfo;
+import org.apache.usergrid.management.OrganizationInfo;
 import org.apache.usergrid.management.OrganizationOwnerInfo;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.entities.Activity;
-import org.apache.usergrid.persistence.entities.User;
+import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
 
-import static org.junit.Assert.assertNotNull;
+import java.util.*;
 
 
 /**
- * Simple class to create test for for exporting
- *
- * @author tnine
+ * Create an app full of users and data.
  */
-public class ExportDataCreator {
+public class ExportDataCreator extends ToolBase {
+
+    public static final String APP_NAME = "application";
+    public static final String ORG_NAME = "organization";
+    public static final String NUM_USERS = "users";
+    public static final String NUM_COLLECTIONS = "collections";
+    public static final String NUM_ENTITIES = "entities";
+    public static final String ADMIN_USERNAME = "username";
+    public static final String ADMIN_PASSWORD = "password";
 
-    private EntityManagerFactory emf;
+    public String appName = "test-app";
+    public String orgName = "test-organization";
+    public int numUsers = 100;
+    public int numCollections = 20;
+    public int numEntities = 100;
+    public String adminUsername = "adminuser";
+    public String adminPassword = "test";
 
-    private ManagementService managementService;
 
+    @Override
+    public void runTool(CommandLine line) throws Exception {
 
-    /**
-     * @param emf
-     * @param managementService
-     */
-    public ExportDataCreator( EntityManagerFactory emf, ManagementService managementService ) {
-        super();
-        this.emf = emf;
-        this.managementService = managementService;
+        startSpring();
+
+        setVerbose( line );
+
+        if (line.hasOption( APP_NAME )) {
+            appName = line.getOptionValue( APP_NAME );
+        }
+        if (line.hasOption( ORG_NAME )) {
+            orgName = line.getOptionValue( ORG_NAME );
+        }
+        if (line.hasOption( NUM_USERS )) {
+            numUsers = Integer.parseInt( line.getOptionValue( NUM_USERS ) );
+        }
+        if (line.hasOption( NUM_COLLECTIONS )) {
+            numCollections = Integer.parseInt( line.getOptionValue( NUM_COLLECTIONS ) );
+        }
+        if (line.hasOption( NUM_ENTITIES )) {
+            numEntities = Integer.parseInt( line.getOptionValue( NUM_ENTITIES ) );
+        }
+        if (line.hasOption( ADMIN_USERNAME )) {
+            adminUsername = line.getOptionValue( ADMIN_USERNAME );
+        }
+        if (line.hasOption( ADMIN_PASSWORD )) {
+            adminPassword = line.getOptionValue( ADMIN_PASSWORD );
+        }
+
+        createTestData();
     }
 
 
-    public void createTestData() throws Exception {
+    @Override
+    @SuppressWarnings("static-access")
+    public Options createOptions() {
 
-        String orgName = "testexportorg";
+        Options options = super.createOptions();
 
-        //nothing to do 
-        if ( managementService.getOrganizationByName( orgName ) != null ) {
-            return;
-        }
+        Option appName = OptionBuilder.hasArg()
+                .withDescription( "Application name to use" ).create( APP_NAME );
 
-        OrganizationOwnerInfo orgInfo = managementService
-                .createOwnerAndOrganization( orgName, "textExportUser@apigee.com", "Test User",
-                        "textExportUser@apigee.com", "password", true, false );
+        Option orgName = OptionBuilder.hasArg()
+                .withDescription( "Organization to use (will create if not present)" ).create( ORG_NAME );
 
-        UUID appId = managementService.createApplication( orgInfo.getOrganization().getUuid(), "application" ).getId();
+        Option numUsers = OptionBuilder.hasArg()
+                .withDescription( "Number of users create (in addition to users)" ).create( NUM_USERS );
 
-        EntityManager em = emf.getEntityManager( appId );
+        Option numCollection = OptionBuilder.hasArg()
+                .withDescription( "Number of collections to create (in addition to users)" ).create( NUM_COLLECTIONS );
 
-        User first = new User();
-        first.setUsername( "first" );
-        first.setEmail( "first@usergrid.com" );
+        Option numEntities = OptionBuilder.hasArg()
+                .withDescription( "Number of entities to create per collection" ).create( NUM_ENTITIES );
 
-        Entity firstUserEntity = em.create( first );
+        Option adminUsername = OptionBuilder.hasArg()
+                .withDescription( "Admin Username" ).create( ADMIN_USERNAME );
 
-        assertNotNull( firstUserEntity );
+        Option adminPassword = OptionBuilder.hasArg()
+                .withDescription( "Admin Password" ).create( ADMIN_PASSWORD );
 
-        User second = new User();
-        second.setUsername( "second" );
-        second.setEmail( "second@usergrid.com" );
+        options.addOption( appName );
+        options.addOption( orgName );
+        options.addOption( numUsers );
+        options.addOption( numCollection );
+        options.addOption( numEntities );
+        options.addOption( adminUsername );
+        options.addOption( adminPassword );
 
-        Entity secondUserEntity = em.create( second );
+        return options;
+    }
 
-        assertNotNull( secondUserEntity );
 
-        em.createConnection( firstUserEntity, "likes", secondUserEntity );
+    public void createTestData() throws Exception {
 
-        em.createConnection( secondUserEntity, "dislikes", firstUserEntity );
+        OrganizationInfo orgInfo = managementService.getOrganizationByName( orgName );
 
-        // now create some activities and put them into the user stream
+        if (orgInfo == null) {
+            OrganizationOwnerInfo ownerInfo = managementService.createOwnerAndOrganization(
+                    orgName, adminUsername + "@example.com", adminUsername,
+                    adminUsername + "@example.com", adminPassword, true, false );
+            orgInfo = ownerInfo.getOrganization();
+        }
 
-        Activity activity = new Activity();
+        ApplicationInfo appInfo = managementService.getApplicationInfo( orgName + "/" + appName );
 
-        Activity.ActivityObject actor = new Activity.ActivityObject();
-        actor.setEntityType( "user" );
-        actor.setId( firstUserEntity.getUuid().toString() );
+        if (appInfo == null) {
+            UUID appId = managementService.createApplication( orgInfo.getUuid(), appName ).getId();
+            appInfo = managementService.getApplicationInfo( appId );
+        }
 
-        activity.setActor( actor );
-        activity.setVerb( "POST" );
+        EntityManager em = emf.getEntityManager( appInfo.getId() );
+
+        Fairy fairy = Fairy.create();
+
+        List<Entity> users = new ArrayList<Entity>( numUsers );
+
+        for (int i = 0; i < numUsers; i++) {
+
+            final Person person = fairy.person();
+            Entity userEntity = null;
+            try {
+                final Map<String, Object> userMap = new HashMap<String, Object>() {{
+                    put( "username", person.username() );
+                    put( "password", person.password() );
+                    put( "email", person.email() );
+                    put( "companyEmail", person.companyEmail() );
+                    put( "dateOfBirth", person.dateOfBirth() );
+                    put( "firstName", person.firstName() );
+                    put( "lastName", person.lastName() );
+                    put( "nationalIdentificationNumber", person.nationalIdentificationNumber() );
+                    put( "telephoneNumber", person.telephoneNumber() );
+                    put( "passportNumber", person.passportNumber() );
+                    put( "address", person.getAddress() );
+                }};
+
+                userEntity = em.create( "user", userMap );
+                users.add( userEntity );
+
+            } catch (DuplicateUniquePropertyExistsException e) {
+                logger.error( "Dup user generated: " + person.username() );
+                continue;
+            } catch (Exception e) {
+                logger.error("Error creating user", e);
+                continue;
+            }
+
+            final Company company = person.getCompany();
+            try {
+                EntityRef ref = em.getAlias( "company", company.name() );
+                Entity companyEntity = (ref == null) ? null : em.get( ref );
+              
+                // create company if it does not exist yet
+                if ( companyEntity == null ) {
+                    final Map<String, Object> companyMap = new HashMap<String, Object>() {{
+                        put( "name", company.name() );
+                        put( "domain", company.domain() );
+                        put( "email", company.email() );
+                        put( "url", company.url() );
+                        put( "vatIdentificationNumber", company.vatIdentificationNumber() );
+                    }};
+                    companyEntity = em.create( "company", companyMap );
+                } else {
+                    logger.info("Company {} already exists", company.name());
+                }
+
+                em.createConnection( userEntity, "employer", companyEntity );
+
+            } catch (DuplicateUniquePropertyExistsException e) {
+                logger.error( "Dup company generated {} property={}", company.name(), e.getPropertyName() );
+                continue;
+            } catch (Exception e) {
+                logger.error("Error creating or connecting company", e);
+                continue;
+            }
+            
+            try {
+                for (int j = 0; j < 5; j++) {
+                    Activity activity = new Activity();
+                    Activity.ActivityObject actor = new Activity.ActivityObject();
+                    actor.setEntityType( "user" );
+                    actor.setId( userEntity.getUuid().toString() );
+                    activity.setActor( actor );
+                    activity.setVerb( "POST" );
+                    activity.setContent( "User " + person.username() + " generated a random string "
+                            + RandomStringUtils.randomAlphanumeric( 5 ) );
+                    em.createItemInCollection( userEntity, "activities", "activity", activity.getProperties() );
+                }
+
+                if (users.size() > 10) {
+                    for (int j = 0; j < 5; j++) {
+                        try {
+                            em.createConnection( userEntity, "associate", users.get( (int) (Math.random() * users.size()) ) );
+                        } catch (Exception e) {
+                            logger.error( "Error connecting user to user: " + e.getMessage() );
+                        }
+                    }
+                }
+                
+            } catch (Exception e) {
+                logger.error("Error creating activities", e);
+                continue;
+            }
 
-        em.createItemInCollection( firstUserEntity, "activities", "activity", activity.getProperties() );
+        }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
index 63187a4..3c427e1 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
@@ -116,7 +116,7 @@ public abstract class ToolBase {
     public void printCliHelp( String message ) {
         System.out.println( message );
         HelpFormatter formatter = new HelpFormatter();
-        formatter.printHelp( "java -jar usergrid-tools-0.0.1-SNAPSHOT.jar " + getToolName(), createOptions() );
+        formatter.printHelp( "java -jar usergrid-tools-1.0.2.jar " + getToolName(), createOptions() );
         System.exit( -1 );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index a1e3f6b..446aa91 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -63,65 +63,23 @@ public class ExportAppTest {
         
         // create app with some data
 
-        OrganizationOwnerInfo orgInfo = setup.getMgmtSvc().createOwnerAndOrganization(
-                "org_" + rand, "user_" + rand, rand.toUpperCase(), rand + "@example.com", rand );
-
-        ApplicationInfo appInfo = setup.getMgmtSvc().createApplication(
-                orgInfo.getOrganization().getUuid(), "app_" + rand );
-
-        final EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
+        String orgName = "org_" + rand;
+        String appName = "app_" + rand;
+        
+        ExportDataCreator creator = new ExportDataCreator();
+        creator.startTool( new String[] {
+                "-organization", orgName,
+                "-application", appName,
+                "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort()
+        }, false);
         
-        // create connected things
-
-        final List<Entity> connectedThings = new ArrayList<Entity>();
-        String connectedType = "connected_thing";
-        em.createApplicationCollection(connectedType);
-        for ( int j=0; j<NUM_CONNECTIONS; j++) {
-            final String name = "connected_thing_" + j;
-            connectedThings.add( em.create( connectedType, new HashMap<String, Object>() {{
-                put( "name", name );
-            }} ) );
-        }
-       
-        // create collections of things, every other thing is connected to the connected things
-
-        final AtomicInteger entitiesCount = new AtomicInteger(0);
-        final AtomicInteger connectionCount = new AtomicInteger(0);
-
-        ExecutorService execService = Executors.newFixedThreadPool( 50);
-        final Scheduler scheduler = Schedulers.from( execService );
-
-        for (int i = 0; i < NUM_COLLECTIONS; i++) {
-
-            final String type = "thing_" + i;
-            em.createApplicationCollection( type );
-            connectionCount.getAndIncrement();
-
-            for (int j = 0; j < NUM_ENTITIES; j++) {
-                final String name = "thing_" + j;
-                final Entity source = em.create(
-                        type, new HashMap<String, Object>() {{
-                    put( "name", name );
-                }} );
-                entitiesCount.getAndIncrement();
-
-                for (Entity target : connectedThings) {
-                    em.createConnection( source, "has", target );
-                    connectionCount.getAndIncrement();
-                }
-            }
-        }
-
-        logger.info( "Done. Created {} entities and {} connections", entitiesCount.get(), connectionCount.get() );
-
         long start = System.currentTimeMillis();
         
         String directoryName = "target/export" + rand;
 
         ExportApp exportApp = new ExportApp();
         exportApp.startTool( new String[]{
-                "-application", appInfo.getName(),
-                "-readThreads", "100",
+                "-application", orgName + "/" + appName,
                 "-writeThreads", "100",
                 "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
                 "-outputDir", directoryName
@@ -137,8 +95,7 @@ public class ExportAppTest {
 
         File exportDir1 = new File(directoryName + "1");
         exportApp.startTool( new String[]{
-                "-application", appInfo.getName(),
-                "-readThreads", "1",
+                "-application", orgName + "/" + appName,
                 "-writeThreads", "1",
                 "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
                 "-outputDir", directoryName + "1"


[04/10] incubator-usergrid git commit: Some reformatting. Also eliminating use of subscriber.unsubscribe(). All observables need to wrap up with onCompleted().

Posted by sf...@apache.org.
Some reformatting. Also eliminating use of subscriber.unsubscribe(). All observables need to wrap up with onCompleted().


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a25f8ebc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a25f8ebc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a25f8ebc

Branch: refs/heads/master
Commit: a25f8ebc2877681897a5ef945d39b685c2d3f9fa
Parents: 7a870d6
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 15:31:07 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 15:31:07 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportApp.java    | 113 ++++++++++---------
 stack/tools/src/main/resources/log4j.properties |   5 +-
 .../apache/usergrid/tools/ExportAppTest.java    |  89 ++++-----------
 3 files changed, 81 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a25f8ebc/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index 59509c0..c302a74 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -16,7 +16,6 @@
  */
 package org.apache.usergrid.tools;
 
-
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
@@ -82,8 +81,30 @@ public class ExportApp extends ExportingToolBase {
     // set via CLI
     int readThreadCount = 80;
     int writeThreadCount = 10; // limiting write will limit output files 
-    
 
+
+    @Override
+    @SuppressWarnings("static-access")
+    public Options createOptions() {
+
+        Options options = super.createOptions();
+
+        Option appNameOption = OptionBuilder.hasArg().withType("")
+                .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
+        options.addOption( appNameOption );
+
+        Option readThreadsOption = OptionBuilder.hasArg().withType(0)
+                .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+        options.addOption( readThreadsOption );
+
+        Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
+                .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+        options.addOption( writeThreadsOption );
+
+        return options;
+    }
+
+    
     /**
      * Tool entry point. 
      */
@@ -110,25 +131,25 @@ public class ExportApp extends ExportingToolBase {
             }
         }
 
-        ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
-        readScheduler = Schedulers.from( readThreadPoolExecutor );
-        
-        ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
-        writeScheduler = Schedulers.from( writeThreadPoolExecutor );
-       
-        startSpring();
-
         setVerbose( line );
 
         applyOrgId( line );
         prepareBaseOutputFileName( line );
         outputDir = createOutputParentDir();
         logger.info( "Export directory: " + outputDir.getAbsolutePath() );
-       
+
+        startSpring();
+        
         UUID applicationId = emf.lookupApplication( applicationName );
         final EntityManager em = emf.getEntityManager( applicationId );
         organizationName = em.getApplication().getOrganizationName();
 
+        ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
+        readScheduler = Schedulers.from( readThreadPoolExecutor );
+
+        ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
+        writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+
         Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
         
         collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
@@ -151,30 +172,11 @@ public class ExportApp extends ExportingToolBase {
             .toBlocking().last();
     }
 
-    @Override
-    @SuppressWarnings("static-access")
-    public Options createOptions() {
-
-        Options options = super.createOptions();
-
-        Option appNameOption = OptionBuilder.hasArg().withType("")
-                .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
-        options.addOption( appNameOption );
-
-        Option readThreadsOption = OptionBuilder.hasArg().withType(0)
-                .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
-        options.addOption( readThreadsOption );
-
-        Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
-                .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
-        options.addOption( writeThreadsOption );
-
-        return options;
-    }
 
     // ----------------------------------------------------------------------------------------
     // reading data
 
+    
     /**
      * Emits collection names found in application.
      */
@@ -198,16 +200,13 @@ public class ExportApp extends ExportingToolBase {
             } catch (Exception e) {
                 subscriber.onError( e );
             }
-            if ( count > 0 ) {
-                subscriber.onCompleted();
-                logger.info( "Completed. Read {} collection names", count );
-            } else {
-                subscriber.unsubscribe();
-                logger.info( "No collections found" );
-            }
+            
+            subscriber.onCompleted();
+            logger.info( "Completed. Read {} collection names", count );
         }
     }
 
+    
     /**
      * Emits entities of collection.
      */
@@ -267,13 +266,8 @@ public class ExportApp extends ExportingToolBase {
                     results = em.searchCollection( em.getApplicationRef(), collection, query );
                 }
 
-                if ( count > 0 ) {
-                    subscriber.onCompleted();
-                    logger.info("Completed collection {}. Read {} entities", collection, count);
-                } else {
-                    logger.info("Completed collection {} empty", collection );
-                    subscriber.unsubscribe();
-                }
+                subscriber.onCompleted();
+                logger.info("Completed collection {}. Read {} entities", collection, count);
                 
             } catch ( Exception e ) {
                 subscriber.onError(e);
@@ -281,6 +275,7 @@ public class ExportApp extends ExportingToolBase {
         }
     }
 
+    
     /**
      * Emits connections of an entity.
      */
@@ -331,19 +326,17 @@ public class ExportApp extends ExportingToolBase {
                 subscriber.onError( e );
             }
             
-            if ( count > 0 ) {
-                subscriber.onCompleted();
-                logger.info("Completed entity {} type {} connections count {}",
-                        new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
-                
-            } else {
-                subscriber.unsubscribe();
-                logger.info( "Entity {} type {} has no connections",
-                        exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
-            }
+            subscriber.onCompleted();
+            logger.info("Completed entity {} type {} connections count {}",
+                new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
         }
     }
 
+    
+    // ----------------------------------------------------------------------------------------
+    // writing data
+    
+    
     /**
      * Writes entities to JSON file.
      */
@@ -381,6 +374,7 @@ public class ExportApp extends ExportingToolBase {
         }
     }
 
+    
     /**
      * Writes connection to JSON file.
      */
@@ -418,6 +412,7 @@ public class ExportApp extends ExportingToolBase {
         }
     }
 
+    
     private class FileWrapUpAction implements Action0 {
         @Override
         public void call() {
@@ -448,6 +443,10 @@ public class ExportApp extends ExportingToolBase {
     }
 }
 
+
+/**
+ * Represents entity data to be serialized to JSON.
+ */
 class ExportEntity {
     private String organization;
     private String application;
@@ -493,6 +492,10 @@ class ExportEntity {
     }
 }
 
+
+/**
+ * Represents connection data to be serialized to JSON.
+ */
 class ExportConnection {
     private String organization;
     private String application;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a25f8ebc/stack/tools/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/resources/log4j.properties b/stack/tools/src/main/resources/log4j.properties
index 6cf0a92..def47b4 100644
--- a/stack/tools/src/main/resources/log4j.properties
+++ b/stack/tools/src/main/resources/log4j.properties
@@ -18,7 +18,7 @@
 # and the pattern to %c instead of %l.  (%l is slower.)
 
 # output messages into a rolling log file as well as stdout
-log4j.rootLogger=WARN,stdout
+log4j.rootLogger=ERROR,stdout
 
 # stdout
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
@@ -26,7 +26,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) [%c] - %m%n
 
-log4j.logger.org.apache.usergrid.tools=INFO
+log4j.logger.org.apache.usergrid=INFO
+log4j.logger.org.apache.usergrid.tools=DEBUG
 
 log4j.logger.org.apache.usergrid.management.cassandra=WARN
 log4j.logger.org.apache.usergrid.persistence.cassandra.DB=WARN

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a25f8ebc/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index af8306f..d1c5c1f 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -27,10 +27,7 @@ import org.apache.usergrid.persistence.EntityManager;
 import org.junit.ClassRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import rx.Observable;
 import rx.Scheduler;
-import rx.functions.Action1;
-import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
 import java.util.ArrayList;
@@ -44,9 +41,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class ExportAppTest {
     static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
     
-    int NUM_COLLECTIONS = 5;
-    int NUM_ENTITIES = 10; 
-    int NUM_CONNECTIONS = 1;
+    int NUM_COLLECTIONS = 20;
+    int NUM_ENTITIES = 200; 
+    int NUM_CONNECTIONS = 5;
 
     @ClassRule
     public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
@@ -86,68 +83,25 @@ public class ExportAppTest {
         ExecutorService execService = Executors.newFixedThreadPool( 50);
         final Scheduler scheduler = Schedulers.from( execService );
 
-        Observable.range( 0, NUM_COLLECTIONS ).flatMap( new Func1<Integer, Observable<?>>() {
-            @Override
-            public Observable<?> call(Integer i) {
-                
-                return Observable.just( i ).doOnNext( new Action1<Integer>() {
-                    @Override
-                    public void call(Integer i) {
-                        
-                        final String type = "thing_"+i;
-                        try {
-                            em.createApplicationCollection( type );
-                            connectionCount.getAndIncrement();
-                            
-                        } catch (Exception e) {
-                            throw new RuntimeException( "Error creating collection", e );
-                        }
-                       
-                        Observable.range( 0, NUM_ENTITIES ).flatMap( new Func1<Integer, Observable<?>>() {
-                            @Override
-                            public Observable<?> call(Integer j) {
-                                return Observable.just( j ).doOnNext( new Action1<Integer>() {
-                                    @Override
-                                    public void call(Integer j) {
-                                        
-                                        final String name = "thing_" + j;
-                                        try {
-                                            final Entity source = em.create( 
-                                                    type, new HashMap<String, Object>() {{ put("name", name); }});
-                                            entitiesCount.getAndIncrement();
-                                            logger.info( "Created entity {} type {}", name, type );
-                                            
-                                            for ( Entity target : connectedThings ) {
-                                                em.createConnection( source, "has", target );
-                                                connectionCount.getAndIncrement();
-                                                logger.info( "Created connection from entity {} type {} to {}",
-                                                        new Object[]{name, type, target.getName()} );
-                                            }
-
-
-                                        } catch (Exception e) {
-                                            throw new RuntimeException( "Error creating collection", e );
-                                        }
-                                        
-                                        
-                                    }
-                                    
-                                } );
-
-                            }
-                        }, 50 ).subscribeOn( scheduler ).subscribe(); // toBlocking().last();
-                        
-                    }
-                } );
-                
+        for (int i = 0; i < NUM_COLLECTIONS; i++) {
 
-            }
-        }, 30 ).subscribeOn( scheduler ).toBlocking().last();
+            final String type = "thing_" + i;
+            em.createApplicationCollection( type );
+            connectionCount.getAndIncrement();
+
+            for (int j = 0; j < NUM_ENTITIES; j++) {
+                final String name = "thing_" + j;
+                final Entity source = em.create(
+                        type, new HashMap<String, Object>() {{
+                    put( "name", name );
+                }} );
+                entitiesCount.getAndIncrement();
 
-        while ( entitiesCount.get() < NUM_COLLECTIONS * NUM_ENTITIES ) {
-            Thread.sleep( 5000 );
-            logger.info( "Still working. Created {} entities and {} connections", 
-                    entitiesCount.get(), connectionCount.get() );
+                for (Entity target : connectedThings) {
+                    em.createConnection( source, "has", target );
+                    connectionCount.getAndIncrement();
+                }
+            }
         }
 
         logger.info( "Done. Created {} entities and {} connections", entitiesCount.get(), connectionCount.get() );
@@ -166,8 +120,5 @@ public class ExportAppTest {
         }, false );
         
         logger.info("time = " + (System.currentTimeMillis() - start)/1000 + "s");
-
-
-        
     }
 }
\ No newline at end of file


[07/10] incubator-usergrid git commit: Make flatmap max observables match write thread count and use Schedulers.io() instead of a custom readScheduler.

Posted by sf...@apache.org.
Make flatmap max observables match write thread count and use Schedulers.io() instead of a custom readScheduler.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b58390d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b58390d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b58390d9

Branch: refs/heads/master
Commit: b58390d96eb423287247774533b18e9c4ee43843
Parents: dab84e9
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 15 13:53:55 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 15 13:53:55 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportApp.java    | 39 +++++---------------
 .../apache/usergrid/tools/ExportAppTest.java    | 12 +++---
 2 files changed, 14 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b58390d9/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index c302a74..b2da0ea 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -54,7 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * 
  * Will create as many output files as there are writeThreads (by default: 10).
  * 
- * Will create two types of files: *.uge for Usegrird entities and *.ugc for entity to entity connections.
+ * Will create two types of files: *.entities for Usegrird entities and *.collections for entity to entity connections.
  * 
  * Every line of the data files is a complete JSON object.
  */
@@ -62,7 +62,6 @@ public class ExportApp extends ExportingToolBase {
     static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
 
     static final String APPLICATION_NAME = "application";
-    private static final String READ_THREAD_COUNT = "readThreads";
     private static final String WRITE_THREAD_COUNT = "writeThreads";
    
     String applicationName;
@@ -71,16 +70,13 @@ public class ExportApp extends ExportingToolBase {
     AtomicInteger entitiesWritten = new AtomicInteger(0);
     AtomicInteger connectionsWritten = new AtomicInteger(0);
 
-    Scheduler readScheduler;
     Scheduler writeScheduler;
 
     ObjectMapper mapper = new ObjectMapper();
     Map<Thread, JsonGenerator> entityGeneratorsByThread  = new HashMap<Thread, JsonGenerator>();
     Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
 
-    // set via CLI
-    int readThreadCount = 80;
-    int writeThreadCount = 10; // limiting write will limit output files 
+    int writeThreadCount = 10; // set via CLI option; limiting write will limit output files 
 
 
     @Override
@@ -93,10 +89,6 @@ public class ExportApp extends ExportingToolBase {
                 .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
         options.addOption( appNameOption );
 
-        Option readThreadsOption = OptionBuilder.hasArg().withType(0)
-                .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
-        options.addOption( readThreadsOption );
-
         Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
                 .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
         options.addOption( writeThreadsOption );
@@ -113,15 +105,6 @@ public class ExportApp extends ExportingToolBase {
         
         applicationName = line.getOptionValue( APPLICATION_NAME );
 
-        if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
-            try {
-                readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
-            } catch (NumberFormatException nfe) {
-                logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
-                return;
-            }
-        }
-        
         if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
             try {
                 writeThreadCount = Integer.parseInt( line.getOptionValue( WRITE_THREAD_COUNT ) );
@@ -144,9 +127,6 @@ public class ExportApp extends ExportingToolBase {
         final EntityManager em = emf.getEntityManager( applicationId );
         organizationName = em.getApplication().getOrganizationName();
 
-        ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
-        readScheduler = Schedulers.from( readThreadPoolExecutor );
-
         ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
         writeScheduler = Schedulers.from( writeThreadPoolExecutor );
 
@@ -155,19 +135,18 @@ public class ExportApp extends ExportingToolBase {
         collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
             
             public Observable<ExportEntity> call(String collection) {
-                return Observable.create( new EntityObservable( em, collection ))
+                return Observable.create( new EntityObservable( em, collection ) )
                         .doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
             }
-            
-        }, 10).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
-            
+
+        }, writeThreadCount ).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
+
             public Observable<ExportConnection> call(ExportEntity exportEntity) {
-                return Observable.create( new ConnectionsObservable( em, exportEntity ))
+                return Observable.create( new ConnectionsObservable( em, exportEntity ) )
                         .doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
             }
-            
-        }, 10)
-            .subscribeOn( readScheduler )
+
+        }, writeThreadCount)
             .doOnCompleted( new FileWrapUpAction() )
             .toBlocking().last();
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b58390d9/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index c5411fd..a1e3f6b 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -24,7 +24,6 @@ import org.apache.usergrid.management.ApplicationInfo;
 import org.apache.usergrid.management.OrganizationOwnerInfo;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
-import org.junit.Assert;
 import org.junit.ClassRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -132,9 +131,9 @@ public class ExportAppTest {
         
         File exportDir = new File(directoryName);
         assertTrue( getFileCount( exportDir, "entities"    ) > 0 );
-        assertTrue( getFileCount( exportDir, "collections" ) > 0 );
-        assertTrue( getFileCount( exportDir, "entities" ) >= 100 );
-        assertTrue( getFileCount( exportDir, "collections" ) >= 100 );
+        assertTrue( getFileCount( exportDir, "connections" ) > 0 );
+        assertTrue( getFileCount( exportDir, "entities"    ) <= 100 );
+        assertTrue( getFileCount( exportDir, "connections" ) <= 100 );
 
         File exportDir1 = new File(directoryName + "1");
         exportApp.startTool( new String[]{
@@ -147,9 +146,8 @@ public class ExportAppTest {
 
         logger.info( "1 thread time = " + (System.currentTimeMillis() - start) / 1000 + "s" );
 
-        exportDir = new File(directoryName);
-        assertEquals( 1, getFileCount( exportDir, "entities" ));
-        assertEquals( 1, getFileCount( exportDir, "collections" ));
+        assertEquals( 1, getFileCount( exportDir1, "entities" ));
+        assertEquals( 1, getFileCount( exportDir1, "connections" ));
     }
 
     private static int getFileCount(File exportDir, final String ext ) {


[03/10] incubator-usergrid git commit: Remove queues and make the whole thing one "stream"

Posted by sf...@apache.org.
Remove queues and make the whole thing one "stream"


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7a870d69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7a870d69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7a870d69

Branch: refs/heads/master
Commit: 7a870d6929cea76f5bbec9aa8f5a8caa8dee07e4
Parents: 2b65e61
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 13:31:18 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 13:31:18 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportApp.java    | 375 +++++--------------
 .../usergrid/tools/ExportingToolBase.java       |   2 +-
 .../apache/usergrid/tools/ExportAppTest.java    | 114 +++++-
 3 files changed, 185 insertions(+), 306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index 21c63a0..59509c0 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.Scheduler;
 import rx.Subscriber;
+import rx.functions.Action0;
 import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
@@ -42,7 +43,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
@@ -59,40 +61,29 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class ExportApp extends ExportingToolBase {
     static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
-    
-    private static final String READ_THREAD_COUNT = "readThreads";
-    private static final String WRITE_THREAD_COUNT = "writeThreads";
-
-    // we will write two types of files: entities and connections
-    BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
-    BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
 
     static final String APPLICATION_NAME = "application";
-    
-    int pollTimeoutSeconds = 10;
-    
-    int readThreadCount = 80; 
-    int writeThreadCount = 10;
-    
+    private static final String READ_THREAD_COUNT = "readThreads";
+    private static final String WRITE_THREAD_COUNT = "writeThreads";
+   
     String applicationName;
     String organizationName;
 
-    // limiting output threads will limit output files 
+    AtomicInteger entitiesWritten = new AtomicInteger(0);
+    AtomicInteger connectionsWritten = new AtomicInteger(0);
+
     Scheduler readScheduler;
+    Scheduler writeScheduler;
 
+    ObjectMapper mapper = new ObjectMapper();
     Map<Thread, JsonGenerator> entityGeneratorsByThread  = new HashMap<Thread, JsonGenerator>();
     Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
 
-    List<String> emptyFiles = new ArrayList<String>();
+    // set via CLI
+    int readThreadCount = 80;
+    int writeThreadCount = 10; // limiting write will limit output files 
     
-    AtomicInteger activePollers = new AtomicInteger(0);
-    AtomicInteger entitiesQueued = new AtomicInteger(0);
-    AtomicInteger entitiesWritten = new AtomicInteger(0);
-    AtomicInteger connectionsWritten = new AtomicInteger(0);
-    AtomicInteger connectionsQueued = new AtomicInteger(0);
 
-    ObjectMapper mapper = new ObjectMapper();
-    
     /**
      * Tool entry point. 
      */
@@ -123,7 +114,7 @@ public class ExportApp extends ExportingToolBase {
         readScheduler = Schedulers.from( readThreadPoolExecutor );
         
         ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
-        final Scheduler writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+        writeScheduler = Schedulers.from( writeThreadPoolExecutor );
        
         startSpring();
 
@@ -138,78 +129,26 @@ public class ExportApp extends ExportingToolBase {
         final EntityManager em = emf.getEntityManager( applicationId );
         organizationName = em.getApplication().getOrganizationName();
 
-        // start write queue workers
-
-        EntityWritesOnSubscribe entityWritesOnSub = new EntityWritesOnSubscribe( entityWriteQueue );
-        rx.Observable entityWritesObservable = rx.Observable.create( entityWritesOnSub );
-        entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
-            public Observable<ExportEntity> call(ExportEntity exportEntity) {
-                return Observable.just(exportEntity).doOnNext( 
-                        new EntityWriteAction() ).subscribeOn( writeScheduler );
-            }
-        },10).subscribeOn( writeScheduler ).subscribe();
-
-        ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
-        rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
-        connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
-            public Observable<ExportConnection> call(ExportConnection connection ) {
-                return Observable.just(connection).doOnNext( 
-                        new ConnectionWriteAction()).subscribeOn( writeScheduler );
+        Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
+        
+        collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
+            
+            public Observable<ExportEntity> call(String collection) {
+                return Observable.create( new EntityObservable( em, collection ))
+                        .doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
             }
-        },10).subscribeOn( writeScheduler ).subscribe();
-
-        // start processing data and filling up write queues
-
-        CollectionsOnSubscribe onSubscribe = new CollectionsOnSubscribe( em );
-        rx.Observable collectionsObservable = rx.Observable.create( onSubscribe );
-        collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
-            public Observable<String> call(String collection) {
-                return Observable.just(collection).doOnNext( 
-                        new CollectionAction( em ) ).subscribeOn( readScheduler );
+            
+        }, 10).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
+            
+            public Observable<ExportConnection> call(ExportEntity exportEntity) {
+                return Observable.create( new ConnectionsObservable( em, exportEntity ))
+                        .doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
             }
-        },40).subscribeOn( readScheduler ).subscribe();
-        
-        // wait for write thread pollers to get started
-
-        try { Thread.sleep( 1000 ); } catch (InterruptedException ignored) {}
-
-        // wait for write-thread pollers to stop
-
-        while ( activePollers.get() > 0 ) {
-            logger.info(
-                     "Active write threads: {}\n"
-                    +"Entities written:     {}\n"
-                    +"Entities queued:      {}\n"
-                    +"Connections written:  {}\n"
-                    +"Connections queued:   {}\n",
-                    new Object[] { 
-                        activePollers.get(),
-                        entitiesWritten.get(), 
-                        entitiesQueued.get(),
-                        connectionsWritten.get(), 
-                        connectionsQueued.get()} );
-            try { Thread.sleep( 5000 ); } catch (InterruptedException ignored) {}
-        }
-
-        // wrap up files
-
-        for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
-            //gen.writeEndArray();
-            gen.flush();
-            gen.close();
-        }
-
-        for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
-            //gen.writeEndArray();
-            gen.flush();
-            gen.close();
-        }
-
-        for ( String fileName : emptyFiles ) {
-            File emptyFile = new File(fileName);
-            emptyFile.deleteOnExit();
-        }
-
+            
+        }, 10)
+            .subscribeOn( readScheduler )
+            .doOnCompleted( new FileWrapUpAction() )
+            .toBlocking().last();
     }
 
     @Override
@@ -222,12 +161,12 @@ public class ExportApp extends ExportingToolBase {
                 .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
         options.addOption( appNameOption );
 
-        Option readThreadsOption = OptionBuilder
-                .hasArg().withType(0).withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+        Option readThreadsOption = OptionBuilder.hasArg().withType(0)
+                .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
         options.addOption( readThreadsOption );
 
-        Option writeThreadsOption = OptionBuilder
-                .hasArg().withType(0).withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+        Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
+                .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
         options.addOption( writeThreadsOption );
 
         return options;
@@ -239,17 +178,15 @@ public class ExportApp extends ExportingToolBase {
     /**
      * Emits collection names found in application.
      */
-    class CollectionsOnSubscribe implements rx.Observable.OnSubscribe<String> {
+    class CollectionsObservable implements rx.Observable.OnSubscribe<String> {
         EntityManager em;
                 
-        public CollectionsOnSubscribe( EntityManager em ) {
+        public CollectionsObservable(EntityManager em) {
             this.em = em;
         }
 
         public void call(Subscriber<? super String> subscriber) {
             
-            logger.info("Starting to read collections");
-            
             int count = 0;
             try {
                 Map<String, Object> collectionMetadata = em.getApplicationCollectionMetadata();
@@ -261,11 +198,12 @@ public class ExportApp extends ExportingToolBase {
             } catch (Exception e) {
                 subscriber.onError( e );
             }
-            logger.info("Done. Read {} collection names", count);
             if ( count > 0 ) {
                 subscriber.onCompleted();
+                logger.info( "Completed. Read {} collection names", count );
             } else {
                 subscriber.unsubscribe();
+                logger.info( "No collections found" );
             }
         }
     }
@@ -273,11 +211,11 @@ public class ExportApp extends ExportingToolBase {
     /**
      * Emits entities of collection.
      */
-    class EntityOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
+    class EntityObservable implements rx.Observable.OnSubscribe<ExportEntity> {
         EntityManager em;
         String collection;
 
-        public EntityOnSubscribe(EntityManager em, String collection) {
+        public EntityObservable(EntityManager em, String collection) {
             this.em = em;
             this.collection = collection;
         }
@@ -286,6 +224,8 @@ public class ExportApp extends ExportingToolBase {
 
             logger.info("Starting to read entities of collection {}", collection);
             
+            subscriber.onStart();
+            
             try {
                 int count = 0;
 
@@ -327,10 +267,11 @@ public class ExportApp extends ExportingToolBase {
                     results = em.searchCollection( em.getApplicationRef(), collection, query );
                 }
 
-                logger.info("Done. Read {} entities", count);
                 if ( count > 0 ) {
                     subscriber.onCompleted();
+                    logger.info("Completed collection {}. Read {} entities", collection, count);
                 } else {
+                    logger.info("Completed collection {} empty", collection );
                     subscriber.unsubscribe();
                 }
                 
@@ -343,18 +284,19 @@ public class ExportApp extends ExportingToolBase {
     /**
      * Emits connections of an entity.
      */
-    class ConnectionsOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
+    class ConnectionsObservable implements rx.Observable.OnSubscribe<ExportConnection> {
         EntityManager em;
         ExportEntity exportEntity;
 
-        public ConnectionsOnSubscribe(EntityManager em, ExportEntity exportEntity) {
+        public ConnectionsObservable(EntityManager em, ExportEntity exportEntity) {
             this.em = em;
             this.exportEntity = exportEntity;
         }
 
         public void call(Subscriber<? super ExportConnection> subscriber) {
 
-            logger.info("Starting to read connections for entity type {}", exportEntity.getEntity().getType());
+            logger.info( "Starting to read connections for entity {} type {}",
+                    exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
             
             int count = 0;
             
@@ -388,173 +330,16 @@ public class ExportApp extends ExportingToolBase {
             } catch (Exception e) {
                 subscriber.onError( e );
             }
-
-            logger.info("Done. Read {} connections", count);
-            if ( count > 0 ) {
-                subscriber.onCompleted();
-            } else {
-                subscriber.unsubscribe();
-            }
-        }
-    }
-
-    /**
-     * Process collection by starting processing of its entities.
-     */
-    class CollectionAction implements Action1<String> {
-        EntityManager em;
-
-        public CollectionAction( EntityManager em ) {
-            this.em = em;
-        }
-
-        public void call(String collection) {
-
-            // process entities of collection in parallel            
-            EntityOnSubscribe onSubscribe = new EntityOnSubscribe( em, collection );
-            rx.Observable entityObservable = rx.Observable.create( onSubscribe );
-            entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
-                public Observable<ExportEntity> call(ExportEntity exportEntity) {
-                    return Observable.just(exportEntity).doOnNext( 
-                            new EntityAction( em ) ).subscribeOn( readScheduler );
-                }
-            }, 8).subscribeOn(readScheduler).toBlocking().last();
-        }
-    }
-
-    /**
-     * Process entity by adding it to entityWriteQueue and starting processing of its connections.
-     */
-    class EntityAction implements Action1<ExportEntity> {
-        EntityManager em;
-
-        public EntityAction( EntityManager em ) {
-            this.em = em;
-        }
-        
-        public void call(ExportEntity exportEntity) {
-            //logger.debug( "Processing entity: " + exportEntity.getEntity().getUuid() );
-
-            entityWriteQueue.add( exportEntity );
-            entitiesQueued.getAndIncrement();
-
-            // if entity has connections, process them in parallel
-            try {
-                Results connectedEntities = em.getConnectedEntities(
-                        exportEntity.getEntity().getUuid(), null, null, Results.Level.CORE_PROPERTIES );
-
-                if ( !connectedEntities.isEmpty() ) {
-                    ConnectionsOnSubscribe onSubscribe = new ConnectionsOnSubscribe( em, exportEntity );
-                    rx.Observable entityObservable = rx.Observable.create( onSubscribe );
-                    
-                    entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
-                        public Observable<ExportConnection> call(ExportConnection connection) {
-                            return Observable.just(connection).doOnNext( 
-                                    new ConnectionsAction() ).subscribeOn(readScheduler);
-                        }
-                    }, 8).subscribeOn(readScheduler).toBlocking().last();
-                }
-                
-            } catch (Exception e) {
-                throw new RuntimeException( "Error getting connections", e );
-            }
-        }
-    }
-
-    /**
-     * Process connection by adding it to connectionWriteQueue. 
-     */
-    class ConnectionsAction implements Action1<ExportConnection> {
-
-        public void call(ExportConnection conn) {
-            //logger.debug( "Processing connections for entity: " + conn.getSourceUuid() );
-            connectionWriteQueue.add(conn);
-            connectionsQueued.getAndIncrement();
-        }
-    }
-
-
-    // ----------------------------------------------------------------------------------------
-    // writing data
-
-    /**
-     * Emits entities to be written.
-     */
-    class EntityWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
-        BlockingQueue<ExportEntity> queue;
-
-        public EntityWritesOnSubscribe( BlockingQueue<ExportEntity> queue ) {
-            this.queue = queue;
-        }
-
-        public void call(Subscriber<? super ExportEntity> subscriber) {
-            int count = 0;
-            
-            while ( true ) {
-                ExportEntity entity = null;
-                try {
-                    //logger.debug( "Wrote {}. Polling for entity to write...", count );
-                    activePollers.getAndIncrement();
-                    entity = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
-                } catch (InterruptedException e) {
-                    logger.error("Entity poll interrupted", e);
-                    continue;
-                } finally {
-                    activePollers.getAndDecrement();
-                }
-                if ( entity == null ) {
-                    break;
-                }
-                subscriber.onNext( entity );
-                count++;
-            }
-
-            logger.info("Done. De-queued {} entities", count);
-            if ( count > 0 ) {
-                subscriber.onCompleted();
-            } else {
-                subscriber.unsubscribe();
-            }
-        }
-    }
-
-    /**
-     * Emits connections to be written.
-     */
-    class ConnectionWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
-        BlockingQueue<ExportConnection> queue;
-
-        public ConnectionWritesOnSubscribe( BlockingQueue<ExportConnection> queue ) {
-            this.queue = queue;
-        }
-
-        public void call(Subscriber<? super ExportConnection> subscriber) {
-            int count = 0;
             
-            while ( true ) {
-                ExportConnection connection = null;
-                try {
-                    //logger.debug( "Wrote {}. Polling for connection to write", count );
-                    activePollers.getAndIncrement();
-                    connection = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
-                } catch (InterruptedException e) {
-                    logger.error("Connection poll interrupted", e);
-                    continue;
-                } finally {
-                    activePollers.getAndDecrement();
-                }
-                if ( connection == null ) {
-                    break;
-                }
-                subscriber.onNext( connection );
-                count++;
-            }
-
-            logger.info("Done. De-queued {} connections", count);
             if ( count > 0 ) {
                 subscriber.onCompleted();
+                logger.info("Completed entity {} type {} connections count {}",
+                        new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
+                
             } else {
                 subscriber.unsubscribe();
+                logger.info( "Entity {} type {} has no connections",
+                        exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
             }
         }
     }
@@ -566,10 +351,9 @@ public class ExportApp extends ExportingToolBase {
 
         public void call(ExportEntity entity) {
 
-            boolean wroteData = false;
-
             String [] parts = Thread.currentThread().getName().split("-");
-            String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".entities";
+            String fileName = outputDir.getAbsolutePath() + File.separator
+                    + applicationName.replace('/','-') + "-" + parts[3] + ".entities";
 
             JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
             if ( gen == null ) {
@@ -577,6 +361,7 @@ public class ExportApp extends ExportingToolBase {
                 // no generator so we are opening new file and writing the start of an array
                 try {
                     gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+                    logger.info("Opened output file {}", fileName);
                 } catch (IOException e) {
                     throw new RuntimeException("Error opening output file: " + fileName, e);
                 }
@@ -589,15 +374,10 @@ public class ExportApp extends ExportingToolBase {
                 gen.writeObject( entity );
                 gen.writeRaw('\n');
                 entitiesWritten.getAndIncrement();
-                wroteData = true;
 
             } catch (IOException e) {
                 throw new RuntimeException("Error writing to output file: " + fileName, e);
             }
-
-            if ( !wroteData ) {
-                emptyFiles.add( fileName );
-            }
         }
     }
 
@@ -608,10 +388,9 @@ public class ExportApp extends ExportingToolBase {
 
         public void call(ExportConnection conn) {
 
-            boolean wroteData = false;
-
             String [] parts = Thread.currentThread().getName().split("-");
-            String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".connections";
+            String fileName = outputDir.getAbsolutePath() + File.separator
+                    + applicationName.replace('/','-') + "-" + parts[3] + ".connections";
 
             JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
             if ( gen == null ) {
@@ -619,6 +398,7 @@ public class ExportApp extends ExportingToolBase {
                 // no generator so we are opening new file and writing the start of an array
                 try {
                     gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+                    logger.info("Opened output file {}", fileName);
                 } catch (IOException e) {
                     throw new RuntimeException("Error opening output file: " + fileName, e);
                 }
@@ -631,18 +411,41 @@ public class ExportApp extends ExportingToolBase {
                 gen.writeObject( conn );
                 gen.writeRaw('\n');
                 connectionsWritten.getAndIncrement();
-                wroteData = true;
 
             } catch (IOException e) {
                 throw new RuntimeException("Error writing to output file: " + fileName, e);
             }
+        }
+    }
+
+    private class FileWrapUpAction implements Action0 {
+        @Override
+        public void call() {
+
+            logger.info("-------------------------------------------------------------------");
+            logger.info("DONE! Entities: {} Connections: {}", entitiesWritten.get(), connectionsWritten.get());
+            logger.info("-------------------------------------------------------------------");
 
-            if ( !wroteData ) {
-                emptyFiles.add( fileName );
+            for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
+                try {
+                    //gen.writeEndArray();
+                    gen.flush();
+                    gen.close();
+                } catch (IOException e) {
+                    logger.error("Error closing output file", e);
+                }
+            }
+            for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
+                try {
+                    //gen.writeEndArray();
+                    gen.flush();
+                    gen.close();
+                } catch (IOException e) {
+                    logger.error("Error closing output file", e);
+                }
             }
         }
     }
-
 }
 
 class ExportEntity {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
index 3de220c..a7c3905 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
@@ -138,7 +138,7 @@ public abstract class ExportingToolBase extends ToolBase {
 
         if ( !file.mkdirs() ) {
 
-            throw new RuntimeException( String.format( "Unable to create diretory %s", dirName ) );
+            throw new RuntimeException( String.format( "Unable to create directory %s", dirName ) );
         }
 
         return file;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index 14b9311..af8306f 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -27,14 +27,26 @@ import org.apache.usergrid.persistence.EntityManager;
 import org.junit.ClassRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 public class ExportAppTest {
     static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
+    
+    int NUM_COLLECTIONS = 5;
+    int NUM_ENTITIES = 10; 
+    int NUM_CONNECTIONS = 1;
 
     @ClassRule
     public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
@@ -52,46 +64,110 @@ public class ExportAppTest {
         ApplicationInfo appInfo = setup.getMgmtSvc().createApplication(
                 orgInfo.getOrganization().getUuid(), "app_" + rand );
 
-        EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
+        final EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
         
-        // create 10 connected things
+        // create connected things
 
-        List<Entity> connectedThings = new ArrayList<Entity>();
+        final List<Entity> connectedThings = new ArrayList<Entity>();
         String connectedType = "connected_thing";
         em.createApplicationCollection(connectedType);
-        for ( int j=0; j<10; j++) {
+        for ( int j=0; j<NUM_CONNECTIONS; j++) {
             final String name = "connected_thing_" + j;
             connectedThings.add( em.create( connectedType, new HashMap<String, Object>() {{
                 put( "name", name );
             }} ) );
         }
        
-        // create 10 collections of 10 things, every other thing is connected to the connected things
-        
-        for ( int i=0; i<10; i++) {
-            String type = "thing_"+i;
-            em.createApplicationCollection(type);
-            for ( int j=0; j<10; j++) {
-                final String name = "thing_" + j;
-                Entity source = em.create(type, new HashMap<String, Object>() {{ put("name", name); }});
-                if ( j % 2 == 0 ) {
-                    for ( Entity target : connectedThings ) {
-                        em.createConnection( source, "has", target );
+        // create collections of things, every other thing is connected to the connected things
+
+        final AtomicInteger entitiesCount = new AtomicInteger(0);
+        final AtomicInteger connectionCount = new AtomicInteger(0);
+
+        ExecutorService execService = Executors.newFixedThreadPool( 50);
+        final Scheduler scheduler = Schedulers.from( execService );
+
+        Observable.range( 0, NUM_COLLECTIONS ).flatMap( new Func1<Integer, Observable<?>>() {
+            @Override
+            public Observable<?> call(Integer i) {
+                
+                return Observable.just( i ).doOnNext( new Action1<Integer>() {
+                    @Override
+                    public void call(Integer i) {
+                        
+                        final String type = "thing_"+i;
+                        try {
+                            em.createApplicationCollection( type );
+                            connectionCount.getAndIncrement();
+                            
+                        } catch (Exception e) {
+                            throw new RuntimeException( "Error creating collection", e );
+                        }
+                       
+                        Observable.range( 0, NUM_ENTITIES ).flatMap( new Func1<Integer, Observable<?>>() {
+                            @Override
+                            public Observable<?> call(Integer j) {
+                                return Observable.just( j ).doOnNext( new Action1<Integer>() {
+                                    @Override
+                                    public void call(Integer j) {
+                                        
+                                        final String name = "thing_" + j;
+                                        try {
+                                            final Entity source = em.create( 
+                                                    type, new HashMap<String, Object>() {{ put("name", name); }});
+                                            entitiesCount.getAndIncrement();
+                                            logger.info( "Created entity {} type {}", name, type );
+                                            
+                                            for ( Entity target : connectedThings ) {
+                                                em.createConnection( source, "has", target );
+                                                connectionCount.getAndIncrement();
+                                                logger.info( "Created connection from entity {} type {} to {}",
+                                                        new Object[]{name, type, target.getName()} );
+                                            }
+
+
+                                        } catch (Exception e) {
+                                            throw new RuntimeException( "Error creating collection", e );
+                                        }
+                                        
+                                        
+                                    }
+                                    
+                                } );
+
+                            }
+                        }, 50 ).subscribeOn( scheduler ).subscribe(); // toBlocking().last();
+                        
                     }
-                }
+                } );
+                
+
             }
+        }, 30 ).subscribeOn( scheduler ).toBlocking().last();
+
+        while ( entitiesCount.get() < NUM_COLLECTIONS * NUM_ENTITIES ) {
+            Thread.sleep( 5000 );
+            logger.info( "Still working. Created {} entities and {} connections", 
+                    entitiesCount.get(), connectionCount.get() );
         }
-        
-        // export to file
 
-        String directoryName = "./target/export" + rand;
+        logger.info( "Done. Created {} entities and {} connections", entitiesCount.get(), connectionCount.get() );
+
+        long start = System.currentTimeMillis();
+        
+        String directoryName = "target/export" + rand;
 
         ExportApp exportApp = new ExportApp();
         exportApp.startTool( new String[]{
                 "-application", appInfo.getName(),
+                "-readThreads", "50",
+                "-writeThreads", "10",
                 "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
                 "-outputDir", directoryName
         }, false );
         
+        logger.info("time = " + (System.currentTimeMillis() - start)/1000 + "s");
+
+
+        
     }
 }
\ No newline at end of file


[08/10] incubator-usergrid git commit: Merge branch 'master' into rxportapp

Posted by sf...@apache.org.
Merge branch 'master' into rxportapp


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2748347a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2748347a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2748347a

Branch: refs/heads/master
Commit: 2748347a3e3e19b4db3467e429ab9e27f5742892
Parents: b58390d e1b352e
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Jul 20 16:00:55 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Jul 20 16:00:55 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportAdmins.java | 37 ++++++++------------
 .../org/apache/usergrid/tools/ImportAdmins.java |  5 +--
 2 files changed, 17 insertions(+), 25 deletions(-)
----------------------------------------------------------------------



[05/10] incubator-usergrid git commit: Less test data and code to compare 1 read and 1 write thread vs. 100 read and 100 write threads.

Posted by sf...@apache.org.
Less test data and code to compare 1 read and 1 write thread vs. 100 read and 100 write threads.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7b168b91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7b168b91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7b168b91

Branch: refs/heads/master
Commit: 7b168b91d99ba51da452a9c7980b0078f733df03
Parents: a25f8eb
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 16:41:06 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 16:41:06 2015 -0400

----------------------------------------------------------------------
 .../apache/usergrid/tools/ExportAppTest.java    | 24 ++++++++++++++------
 1 file changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b168b91/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index d1c5c1f..eeaae13 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -41,9 +41,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class ExportAppTest {
     static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
     
-    int NUM_COLLECTIONS = 20;
-    int NUM_ENTITIES = 200; 
-    int NUM_CONNECTIONS = 5;
+    int NUM_COLLECTIONS = 10;
+    int NUM_ENTITIES = 50; 
+    int NUM_CONNECTIONS = 3;
 
     @ClassRule
     public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
@@ -113,12 +113,22 @@ public class ExportAppTest {
         ExportApp exportApp = new ExportApp();
         exportApp.startTool( new String[]{
                 "-application", appInfo.getName(),
-                "-readThreads", "50",
-                "-writeThreads", "10",
+                "-readThreads", "100",
+                "-writeThreads", "100",
                 "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
                 "-outputDir", directoryName
         }, false );
-        
-        logger.info("time = " + (System.currentTimeMillis() - start)/1000 + "s");
+
+        logger.info("100 read and 100 write threads = " + (System.currentTimeMillis() - start)/1000 + "s");
+
+        exportApp.startTool( new String[]{
+                "-application", appInfo.getName(),
+                "-readThreads", "1",
+                "-writeThreads", "1",
+                "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
+                "-outputDir", directoryName + "1"
+        }, false );
+
+        logger.info("1 thread time = " + (System.currentTimeMillis() - start)/1000 + "s");
     }
 }
\ No newline at end of file


[02/10] incubator-usergrid git commit: Add readThread and writeThread CLI parameters.

Posted by sf...@apache.org.
Add readThread and writeThread CLI parameters.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2b65e619
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2b65e619
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2b65e619

Branch: refs/heads/master
Commit: 2b65e619316b206720e574a21fe1b33462e0f2dd
Parents: b2bdbb5
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 08:03:18 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 08:03:18 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportApp.java    | 153 +++++++++++++------
 1 file changed, 106 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2b65e619/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index ceb3ecd..21c63a0 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.utils.StringUtils;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.util.MinimalPrettyPrinter;
@@ -46,11 +47,22 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 
 /**
- * Export application's collections.
+ * Export all entities and connections of a Usergrid app.
+ * 
+ * Exports data files to specified directory.
+ * 
+ * Will create as many output files as there are writeThreads (by default: 10).
+ * 
+ * Will create two types of files: *.uge for Usegrird entities and *.ugc for entity to entity connections.
+ * 
+ * Every line of the data files is a complete JSON object.
  */
 public class ExportApp extends ExportingToolBase {
     static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
     
+    private static final String READ_THREAD_COUNT = "readThreads";
+    private static final String WRITE_THREAD_COUNT = "writeThreads";
+
     // we will write two types of files: entities and connections
     BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
     BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
@@ -58,10 +70,15 @@ public class ExportApp extends ExportingToolBase {
     static final String APPLICATION_NAME = "application";
     
     int pollTimeoutSeconds = 10;
+    
+    int readThreadCount = 80; 
+    int writeThreadCount = 10;
+    
+    String applicationName;
+    String organizationName;
 
     // limiting output threads will limit output files 
-    final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(8);
-    final Scheduler scheduler = Schedulers.from( threadPoolExecutor );
+    Scheduler readScheduler;
 
     Map<Thread, JsonGenerator> entityGeneratorsByThread  = new HashMap<Thread, JsonGenerator>();
     Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
@@ -77,29 +94,37 @@ public class ExportApp extends ExportingToolBase {
     ObjectMapper mapper = new ObjectMapper();
     
     /**
-     * Export admin users using multiple threads.
-     * <p/>
-     * How it works:
-     * In main thread we query for IDs of all admin users, add each ID to read queue.
-     * Read-queue workers read admin user data, add data to write queue.
-     * One write-queue worker reads data writes to file.
+     * Tool entry point. 
      */
     @Override
     public void runTool(CommandLine line) throws Exception {
         
-        String applicationName = line.getOptionValue( APPLICATION_NAME );
-
-//        if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
-//            try {
-//                readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
-//            } catch (NumberFormatException nfe) {
-//                logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
-//                return;
-//            }
-//        } else {
-//            readThreadCount = 20;
-//        }
+        applicationName = line.getOptionValue( APPLICATION_NAME );
+
+        if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
+            try {
+                readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
+            } catch (NumberFormatException nfe) {
+                logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+                return;
+            }
+        }
         
+        if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
+            try {
+                writeThreadCount = Integer.parseInt( line.getOptionValue( WRITE_THREAD_COUNT ) );
+            } catch (NumberFormatException nfe) {
+                logger.error( "-" + WRITE_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+                return;
+            }
+        }
+
+        ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
+        readScheduler = Schedulers.from( readThreadPoolExecutor );
+        
+        ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
+        final Scheduler writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+       
         startSpring();
 
         setVerbose( line );
@@ -111,6 +136,7 @@ public class ExportApp extends ExportingToolBase {
        
         UUID applicationId = emf.lookupApplication( applicationName );
         final EntityManager em = emf.getEntityManager( applicationId );
+        organizationName = em.getApplication().getOrganizationName();
 
         // start write queue workers
 
@@ -119,18 +145,18 @@ public class ExportApp extends ExportingToolBase {
         entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
             public Observable<ExportEntity> call(ExportEntity exportEntity) {
                 return Observable.just(exportEntity).doOnNext( 
-                        new EntityWriteAction() ).subscribeOn( scheduler );
+                        new EntityWriteAction() ).subscribeOn( writeScheduler );
             }
-        },10).subscribeOn( scheduler ).subscribe();
+        },10).subscribeOn( writeScheduler ).subscribe();
 
         ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
         rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
         connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
             public Observable<ExportConnection> call(ExportConnection connection ) {
                 return Observable.just(connection).doOnNext( 
-                        new ConnectionWriteAction()).subscribeOn( scheduler );
+                        new ConnectionWriteAction()).subscribeOn( writeScheduler );
             }
-        },10).subscribeOn( scheduler ).subscribe();
+        },10).subscribeOn( writeScheduler ).subscribe();
 
         // start processing data and filling up write queues
 
@@ -139,9 +165,9 @@ public class ExportApp extends ExportingToolBase {
         collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
             public Observable<String> call(String collection) {
                 return Observable.just(collection).doOnNext( 
-                        new CollectionAction( em ) ).subscribeOn( Schedulers.io() );
+                        new CollectionAction( em ) ).subscribeOn( readScheduler );
             }
-        },40).subscribeOn( Schedulers.io() ).subscribe();
+        },40).subscribeOn( readScheduler ).subscribe();
         
         // wait for write thread pollers to get started
 
@@ -192,14 +218,18 @@ public class ExportApp extends ExportingToolBase {
 
         Options options = super.createOptions();
 
-        Option readThreads = OptionBuilder.hasArg().withType("")
+        Option appNameOption = OptionBuilder.hasArg().withType("")
                 .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
-        options.addOption( readThreads );
-        
-//        Option readThreads = OptionBuilder
-//                .hasArg().withType(0).withDescription("Read Threads -" + READ_THREAD_COUNT).create(READ_THREAD_COUNT);
-//        options.addOption( readThreads );
-        
+        options.addOption( appNameOption );
+
+        Option readThreadsOption = OptionBuilder
+                .hasArg().withType(0).withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+        options.addOption( readThreadsOption );
+
+        Option writeThreadsOption = OptionBuilder
+                .hasArg().withType(0).withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+        options.addOption( writeThreadsOption );
+
         return options;
     }
 
@@ -276,9 +306,13 @@ public class ExportApp extends ExportingToolBase {
                                 }
                                 dictionariesByName.put( dictionary, dict );
                             }
+                            
                             ExportEntity exportEntity = new ExportEntity( 
-                                    em.getApplication().getApplicationName(), 
-                                    entity, dictionariesByName );
+                                    organizationName, 
+                                    applicationName, 
+                                    entity, 
+                                    dictionariesByName );
+                            
                             subscriber.onNext( exportEntity );
                             count++;
                             
@@ -333,11 +367,14 @@ public class ExportApp extends ExportingToolBase {
 
                     for (Entity connectedEntity : results.getEntities()) {
                         try {
+                            
                             ExportConnection connection = new ExportConnection( 
-                                    em.getApplication().getApplicationName(), 
+                                    applicationName,
+                                    organizationName,
                                     connectionType, 
                                     exportEntity.getEntity().getUuid(), 
                                     connectedEntity.getUuid());
+                            
                             subscriber.onNext( connection );
                             count++;
 
@@ -379,9 +416,9 @@ public class ExportApp extends ExportingToolBase {
             entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
                 public Observable<ExportEntity> call(ExportEntity exportEntity) {
                     return Observable.just(exportEntity).doOnNext( 
-                            new EntityAction( em ) ).subscribeOn( Schedulers.io() );
+                            new EntityAction( em ) ).subscribeOn( readScheduler );
                 }
-            }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+            }, 8).subscribeOn(readScheduler).toBlocking().last();
         }
     }
 
@@ -413,9 +450,9 @@ public class ExportApp extends ExportingToolBase {
                     entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
                         public Observable<ExportConnection> call(ExportConnection connection) {
                             return Observable.just(connection).doOnNext( 
-                                    new ConnectionsAction() ).subscribeOn( Schedulers.io() );
+                                    new ConnectionsAction() ).subscribeOn(readScheduler);
                         }
-                    }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+                    }, 8).subscribeOn(readScheduler).toBlocking().last();
                 }
                 
             } catch (Exception e) {
@@ -472,7 +509,7 @@ public class ExportApp extends ExportingToolBase {
                 count++;
             }
 
-            logger.info("Done. Wrote {} entities", count);
+            logger.info("Done. De-queued {} entities", count);
             if ( count > 0 ) {
                 subscriber.onCompleted();
             } else {
@@ -513,7 +550,7 @@ public class ExportApp extends ExportingToolBase {
                 count++;
             }
 
-            logger.info("Done. Wrote {} connections", count);
+            logger.info("Done. De-queued {} connections", count);
             if ( count > 0 ) {
                 subscriber.onCompleted();
             } else {
@@ -531,7 +568,8 @@ public class ExportApp extends ExportingToolBase {
 
             boolean wroteData = false;
 
-            String fileName = "target/" + Thread.currentThread().getName() + ".ude";
+            String [] parts = Thread.currentThread().getName().split("-");
+            String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".entities";
 
             JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
             if ( gen == null ) {
@@ -572,7 +610,8 @@ public class ExportApp extends ExportingToolBase {
 
             boolean wroteData = false;
 
-            String fileName = "target/" + Thread.currentThread().getName() + ".ugc";
+            String [] parts = Thread.currentThread().getName().split("-");
+            String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".connections";
 
             JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
             if ( gen == null ) {
@@ -607,10 +646,12 @@ public class ExportApp extends ExportingToolBase {
 }
 
 class ExportEntity {
+    private String organization;
     private String application;
     private Entity entity;
     private Map<String, Object> dictionaries;
-    public ExportEntity( String application, Entity entity, Map<String, Object> dictionaries ) {
+    public ExportEntity( String organization, String application, Entity entity, Map<String, Object> dictionaries ) {
+        this.organization = organization;
         this.application = application;
         this.entity = entity;
         this.dictionaries = dictionaries;
@@ -639,14 +680,24 @@ class ExportEntity {
     public void setDictionaries(Map<String, Object> dictionaries) {
         this.dictionaries = dictionaries;
     }
+
+    public String getOrganization() {
+        return organization;
+    }
+
+    public void setOrganization(String organization) {
+        this.organization = organization;
+    }
 }
 
 class ExportConnection {
+    private String organization;
     private String application;
     private String connectionType;
     private UUID sourceUuid;
     private UUID targetUuid;
-    public ExportConnection(String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+    public ExportConnection(String organization, String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+        this.organization= organization;
         this.application = application;
         this.connectionType = connectionType;
         this.sourceUuid = sourceUuid;
@@ -684,4 +735,12 @@ class ExportConnection {
     public void setTargetUuid(UUID targetUuid) {
         this.targetUuid = targetUuid;
     }
+
+    public String getOrganization() {
+        return organization;
+    }
+
+    public void setOrganization(String organization) {
+        this.organization = organization;
+    }
 }


[10/10] incubator-usergrid git commit: merge

Posted by sf...@apache.org.
merge


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9b9f55ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9b9f55ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9b9f55ac

Branch: refs/heads/master
Commit: 9b9f55ac20f669451dadc7cdeeb4643ac0614ad4
Parents: a6e68a0 897fd50
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Jul 21 14:09:05 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Jul 21 14:09:05 2015 -0600

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportAdmins.java | 118 ++++++----
 .../org/apache/usergrid/tools/ImportAdmins.java | 226 ++++++++++++++-----
 .../usergrid/tools/ExportImportAdminsTest.java  |  71 ++++--
 ...adata.usergrid-management.1433331614293.json |  52 +++++
 ...users.usergrid-management.1433331614293.json |  12 +
 5 files changed, 358 insertions(+), 121 deletions(-)
----------------------------------------------------------------------



[06/10] incubator-usergrid git commit: Minor test fixes.

Posted by sf...@apache.org.
Minor test fixes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/dab84e95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/dab84e95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/dab84e95

Branch: refs/heads/master
Commit: dab84e95b54ff9ac573cfa291708938b5e181b61
Parents: 7b168b9
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 15 12:34:26 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 15 12:34:26 2015 -0400

----------------------------------------------------------------------
 .../apache/usergrid/tools/ExportAppTest.java    | 33 ++++++++++++++++++--
 1 file changed, 31 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dab84e95/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index eeaae13..c5411fd 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -24,12 +24,15 @@ import org.apache.usergrid.management.ApplicationInfo;
 import org.apache.usergrid.management.OrganizationOwnerInfo;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
+import org.junit.Assert;
 import org.junit.ClassRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Scheduler;
 import rx.schedulers.Schedulers;
 
+import java.io.File;
+import java.io.FileFilter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -37,7 +40,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
+
+/**
+ * TODO: better test, this is really just a smoke test.
+ */
 public class ExportAppTest {
     static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
     
@@ -119,8 +128,15 @@ public class ExportAppTest {
                 "-outputDir", directoryName
         }, false );
 
-        logger.info("100 read and 100 write threads = " + (System.currentTimeMillis() - start)/1000 + "s");
+        logger.info( "100 read and 100 write threads = " + (System.currentTimeMillis() - start) / 1000 + "s" );
+        
+        File exportDir = new File(directoryName);
+        assertTrue( getFileCount( exportDir, "entities"    ) > 0 );
+        assertTrue( getFileCount( exportDir, "collections" ) > 0 );
+        assertTrue( getFileCount( exportDir, "entities" ) >= 100 );
+        assertTrue( getFileCount( exportDir, "collections" ) >= 100 );
 
+        File exportDir1 = new File(directoryName + "1");
         exportApp.startTool( new String[]{
                 "-application", appInfo.getName(),
                 "-readThreads", "1",
@@ -129,6 +145,19 @@ public class ExportAppTest {
                 "-outputDir", directoryName + "1"
         }, false );
 
-        logger.info("1 thread time = " + (System.currentTimeMillis() - start)/1000 + "s");
+        logger.info( "1 thread time = " + (System.currentTimeMillis() - start) / 1000 + "s" );
+
+        exportDir = new File(directoryName);
+        assertEquals( 1, getFileCount( exportDir, "entities" ));
+        assertEquals( 1, getFileCount( exportDir, "collections" ));
+    }
+
+    private static int getFileCount(File exportDir, final String ext ) {
+        return exportDir.listFiles( new FileFilter() {
+            @Override
+            public boolean accept(File pathname) {
+                return pathname.getAbsolutePath().endsWith("." + ext);
+            }
+        } ).length;
     }
 }
\ No newline at end of file