You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/13 16:22:19 UTC

[2/2] incubator-usergrid git commit: Add new RxJava based multi-threaded ExportApp tool, and upgrade to RxJava 1.0.12.

Add new RxJava based multi-threaded ExportApp tool, and upgrade to RxJava 1.0.12.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b2bdbb54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b2bdbb54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b2bdbb54

Branch: refs/heads/rxportapp
Commit: b2bdbb5456301dbcf7131e780d968514cdd7e55d
Parents: 75ad454
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Jul 13 10:21:09 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Jul 13 10:21:09 2015 -0400

----------------------------------------------------------------------
 stack/core/pom.xml                              |   9 +-
 stack/pom.xml                                   |   2 +-
 .../org/apache/usergrid/tools/ExportApp.java    | 687 +++++++++++++++++++
 .../apache/usergrid/tools/ExportAppTest.java    |  97 +++
 4 files changed, 790 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index 47fa840..f60dbc2 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -573,14 +573,15 @@
     </dependency>
 
     <dependency>
-      <groupId>com.netflix.rxjava</groupId>
-      <artifactId>rxjava-core</artifactId>
+      <groupId>io.reactivex</groupId>
+      <artifactId>rxjava</artifactId>
       <version>${rx.version}</version>
     </dependency>
+    
     <dependency>
-      <groupId>com.netflix.rxjava</groupId>
+      <groupId>io.reactivex</groupId>
       <artifactId>rxjava-math</artifactId>
-      <version>${rx.version}</version>
+      <version>1.0.0</version>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index bdc3549..da1b62c 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -108,7 +108,7 @@
     <antlr.version>3.4</antlr.version>
     <tika.version>1.4</tika.version>
     <metrics.version>3.0.0</metrics.version>
-    <rx.version>0.19.6</rx.version>
+    <rx.version>1.0.12</rx.version>
   </properties>
 
   <licenses>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
new file mode 100644
index 0000000..ceb3ecd
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.Results;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.util.MinimalPrettyPrinter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Export application's collections.
+ */
+public class ExportApp extends ExportingToolBase {
+    static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
+    
+    // we will write two types of files: entities and connections
+    BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
+    BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
+
+    static final String APPLICATION_NAME = "application";
+    
+    int pollTimeoutSeconds = 10;
+
+    // limiting output threads will limit output files 
+    final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(8);
+    final Scheduler scheduler = Schedulers.from( threadPoolExecutor );
+
+    Map<Thread, JsonGenerator> entityGeneratorsByThread  = new HashMap<Thread, JsonGenerator>();
+    Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
+
+    List<String> emptyFiles = new ArrayList<String>();
+    
+    AtomicInteger activePollers = new AtomicInteger(0);
+    AtomicInteger entitiesQueued = new AtomicInteger(0);
+    AtomicInteger entitiesWritten = new AtomicInteger(0);
+    AtomicInteger connectionsWritten = new AtomicInteger(0);
+    AtomicInteger connectionsQueued = new AtomicInteger(0);
+
+    ObjectMapper mapper = new ObjectMapper();
+    
+    /**
+     * Export admin users using multiple threads.
+     * <p/>
+     * How it works:
+     * In main thread we query for IDs of all admin users, add each ID to read queue.
+     * Read-queue workers read admin user data, add data to write queue.
+     * One write-queue worker reads data writes to file.
+     */
+    @Override
+    public void runTool(CommandLine line) throws Exception {
+        
+        String applicationName = line.getOptionValue( APPLICATION_NAME );
+
+//        if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
+//            try {
+//                readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
+//            } catch (NumberFormatException nfe) {
+//                logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+//                return;
+//            }
+//        } else {
+//            readThreadCount = 20;
+//        }
+        
+        startSpring();
+
+        setVerbose( line );
+
+        applyOrgId( line );
+        prepareBaseOutputFileName( line );
+        outputDir = createOutputParentDir();
+        logger.info( "Export directory: " + outputDir.getAbsolutePath() );
+       
+        UUID applicationId = emf.lookupApplication( applicationName );
+        final EntityManager em = emf.getEntityManager( applicationId );
+
+        // start write queue workers
+
+        EntityWritesOnSubscribe entityWritesOnSub = new EntityWritesOnSubscribe( entityWriteQueue );
+        rx.Observable entityWritesObservable = rx.Observable.create( entityWritesOnSub );
+        entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
+            public Observable<ExportEntity> call(ExportEntity exportEntity) {
+                return Observable.just(exportEntity).doOnNext( 
+                        new EntityWriteAction() ).subscribeOn( scheduler );
+            }
+        },10).subscribeOn( scheduler ).subscribe();
+
+        ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
+        rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
+        connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
+            public Observable<ExportConnection> call(ExportConnection connection ) {
+                return Observable.just(connection).doOnNext( 
+                        new ConnectionWriteAction()).subscribeOn( scheduler );
+            }
+        },10).subscribeOn( scheduler ).subscribe();
+
+        // start processing data and filling up write queues
+
+        CollectionsOnSubscribe onSubscribe = new CollectionsOnSubscribe( em );
+        rx.Observable collectionsObservable = rx.Observable.create( onSubscribe );
+        collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
+            public Observable<String> call(String collection) {
+                return Observable.just(collection).doOnNext( 
+                        new CollectionAction( em ) ).subscribeOn( Schedulers.io() );
+            }
+        },40).subscribeOn( Schedulers.io() ).subscribe();
+        
+        // wait for write thread pollers to get started
+
+        try { Thread.sleep( 1000 ); } catch (InterruptedException ignored) {}
+
+        // wait for write-thread pollers to stop
+
+        while ( activePollers.get() > 0 ) {
+            logger.info(
+                     "Active write threads: {}\n"
+                    +"Entities written:     {}\n"
+                    +"Entities queued:      {}\n"
+                    +"Connections written:  {}\n"
+                    +"Connections queued:   {}\n",
+                    new Object[] { 
+                        activePollers.get(),
+                        entitiesWritten.get(), 
+                        entitiesQueued.get(),
+                        connectionsWritten.get(), 
+                        connectionsQueued.get()} );
+            try { Thread.sleep( 5000 ); } catch (InterruptedException ignored) {}
+        }
+
+        // wrap up files
+
+        for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
+            //gen.writeEndArray();
+            gen.flush();
+            gen.close();
+        }
+
+        for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
+            //gen.writeEndArray();
+            gen.flush();
+            gen.close();
+        }
+
+        for ( String fileName : emptyFiles ) {
+            File emptyFile = new File(fileName);
+            emptyFile.deleteOnExit();
+        }
+
+    }
+
+    @Override
+    @SuppressWarnings("static-access")
+    public Options createOptions() {
+
+        Options options = super.createOptions();
+
+        Option readThreads = OptionBuilder.hasArg().withType("")
+                .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
+        options.addOption( readThreads );
+        
+//        Option readThreads = OptionBuilder
+//                .hasArg().withType(0).withDescription("Read Threads -" + READ_THREAD_COUNT).create(READ_THREAD_COUNT);
+//        options.addOption( readThreads );
+        
+        return options;
+    }
+
+    // ----------------------------------------------------------------------------------------
+    // reading data
+
+    /**
+     * Emits collection names found in application.
+     */
+    class CollectionsOnSubscribe implements rx.Observable.OnSubscribe<String> {
+        EntityManager em;
+                
+        public CollectionsOnSubscribe( EntityManager em ) {
+            this.em = em;
+        }
+
+        public void call(Subscriber<? super String> subscriber) {
+            
+            logger.info("Starting to read collections");
+            
+            int count = 0;
+            try {
+                Map<String, Object> collectionMetadata = em.getApplicationCollectionMetadata();
+                for ( String collection : collectionMetadata.keySet() ) {
+                    subscriber.onNext( collection );
+                    count++;
+                }
+                
+            } catch (Exception e) {
+                subscriber.onError( e );
+            }
+            logger.info("Done. Read {} collection names", count);
+            if ( count > 0 ) {
+                subscriber.onCompleted();
+            } else {
+                subscriber.unsubscribe();
+            }
+        }
+    }
+
+    /**
+     * Emits entities of collection.
+     */
+    class EntityOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
+        EntityManager em;
+        String collection;
+
+        public EntityOnSubscribe(EntityManager em, String collection) {
+            this.em = em;
+            this.collection = collection;
+        }
+
+        public void call(Subscriber<? super ExportEntity> subscriber) {
+
+            logger.info("Starting to read entities of collection {}", collection);
+            
+            try {
+                int count = 0;
+
+                Query query = new Query();
+                query.setLimit( MAX_ENTITY_FETCH );
+
+                Results results = em.searchCollection( em.getApplicationRef(), collection, query );
+
+                while (results.size() > 0) {
+                    for (Entity entity : results.getEntities()) {
+                        try {
+                            Set<String> dictionaries = em.getDictionaries( entity );
+                            Map dictionariesByName = new HashMap<String, Map<Object, Object>>();
+                            for (String dictionary : dictionaries) {
+                                Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary );
+                                if (dict.isEmpty()) {
+                                    continue;
+                                }
+                                dictionariesByName.put( dictionary, dict );
+                            }
+                            ExportEntity exportEntity = new ExportEntity( 
+                                    em.getApplication().getApplicationName(), 
+                                    entity, dictionariesByName );
+                            subscriber.onNext( exportEntity );
+                            count++;
+                            
+                        } catch (Exception e) {
+                            logger.error("Error reading entity " + entity.getUuid() +" from collection " + collection);
+                        }
+                    }
+                    if (results.getCursor() == null) {
+                        break;
+                    }
+                    query.setCursor( results.getCursor() );
+                    results = em.searchCollection( em.getApplicationRef(), collection, query );
+                }
+
+                logger.info("Done. Read {} entities", count);
+                if ( count > 0 ) {
+                    subscriber.onCompleted();
+                } else {
+                    subscriber.unsubscribe();
+                }
+                
+            } catch ( Exception e ) {
+                subscriber.onError(e);
+            }
+        }
+    }
+
+    /**
+     * Emits connections of an entity.
+     */
+    class ConnectionsOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
+        EntityManager em;
+        ExportEntity exportEntity;
+
+        public ConnectionsOnSubscribe(EntityManager em, ExportEntity exportEntity) {
+            this.em = em;
+            this.exportEntity = exportEntity;
+        }
+
+        public void call(Subscriber<? super ExportConnection> subscriber) {
+
+            logger.info("Starting to read connections for entity type {}", exportEntity.getEntity().getType());
+            
+            int count = 0;
+            
+            try {
+                Set<String> connectionTypes = em.getConnectionTypes( exportEntity.getEntity() );
+                for (String connectionType : connectionTypes) {
+
+                    Results results = em.getConnectedEntities( 
+                            exportEntity.getEntity().getUuid(), connectionType, null, Results.Level.CORE_PROPERTIES );
+
+                    for (Entity connectedEntity : results.getEntities()) {
+                        try {
+                            ExportConnection connection = new ExportConnection( 
+                                    em.getApplication().getApplicationName(), 
+                                    connectionType, 
+                                    exportEntity.getEntity().getUuid(), 
+                                    connectedEntity.getUuid());
+                            subscriber.onNext( connection );
+                            count++;
+
+                        } catch (Exception e) {
+                            logger.error( "Error reading connection entity " 
+                                + exportEntity.getEntity().getUuid() + " -> " + connectedEntity.getType());
+                        }
+                    }
+                }
+                
+            } catch (Exception e) {
+                subscriber.onError( e );
+            }
+
+            logger.info("Done. Read {} connections", count);
+            if ( count > 0 ) {
+                subscriber.onCompleted();
+            } else {
+                subscriber.unsubscribe();
+            }
+        }
+    }
+
+    /**
+     * Process collection by starting processing of its entities.
+     */
+    class CollectionAction implements Action1<String> {
+        EntityManager em;
+
+        public CollectionAction( EntityManager em ) {
+            this.em = em;
+        }
+
+        public void call(String collection) {
+
+            // process entities of collection in parallel            
+            EntityOnSubscribe onSubscribe = new EntityOnSubscribe( em, collection );
+            rx.Observable entityObservable = rx.Observable.create( onSubscribe );
+            entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
+                public Observable<ExportEntity> call(ExportEntity exportEntity) {
+                    return Observable.just(exportEntity).doOnNext( 
+                            new EntityAction( em ) ).subscribeOn( Schedulers.io() );
+                }
+            }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+        }
+    }
+
+    /**
+     * Process entity by adding it to entityWriteQueue and starting processing of its connections.
+     */
+    class EntityAction implements Action1<ExportEntity> {
+        EntityManager em;
+
+        public EntityAction( EntityManager em ) {
+            this.em = em;
+        }
+        
+        public void call(ExportEntity exportEntity) {
+            //logger.debug( "Processing entity: " + exportEntity.getEntity().getUuid() );
+
+            entityWriteQueue.add( exportEntity );
+            entitiesQueued.getAndIncrement();
+
+            // if entity has connections, process them in parallel
+            try {
+                Results connectedEntities = em.getConnectedEntities(
+                        exportEntity.getEntity().getUuid(), null, null, Results.Level.CORE_PROPERTIES );
+
+                if ( !connectedEntities.isEmpty() ) {
+                    ConnectionsOnSubscribe onSubscribe = new ConnectionsOnSubscribe( em, exportEntity );
+                    rx.Observable entityObservable = rx.Observable.create( onSubscribe );
+                    
+                    entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
+                        public Observable<ExportConnection> call(ExportConnection connection) {
+                            return Observable.just(connection).doOnNext( 
+                                    new ConnectionsAction() ).subscribeOn( Schedulers.io() );
+                        }
+                    }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+                }
+                
+            } catch (Exception e) {
+                throw new RuntimeException( "Error getting connections", e );
+            }
+        }
+    }
+
+    /**
+     * Process connection by adding it to connectionWriteQueue. 
+     */
+    class ConnectionsAction implements Action1<ExportConnection> {
+
+        public void call(ExportConnection conn) {
+            //logger.debug( "Processing connections for entity: " + conn.getSourceUuid() );
+            connectionWriteQueue.add(conn);
+            connectionsQueued.getAndIncrement();
+        }
+    }
+
+
+    // ----------------------------------------------------------------------------------------
+    // writing data
+
+    /**
+     * Emits entities to be written.
+     */
+    class EntityWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
+        BlockingQueue<ExportEntity> queue;
+
+        public EntityWritesOnSubscribe( BlockingQueue<ExportEntity> queue ) {
+            this.queue = queue;
+        }
+
+        public void call(Subscriber<? super ExportEntity> subscriber) {
+            int count = 0;
+            
+            while ( true ) {
+                ExportEntity entity = null;
+                try {
+                    //logger.debug( "Wrote {}. Polling for entity to write...", count );
+                    activePollers.getAndIncrement();
+                    entity = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
+                } catch (InterruptedException e) {
+                    logger.error("Entity poll interrupted", e);
+                    continue;
+                } finally {
+                    activePollers.getAndDecrement();
+                }
+                if ( entity == null ) {
+                    break;
+                }
+                subscriber.onNext( entity );
+                count++;
+            }
+
+            logger.info("Done. Wrote {} entities", count);
+            if ( count > 0 ) {
+                subscriber.onCompleted();
+            } else {
+                subscriber.unsubscribe();
+            }
+        }
+    }
+
+    /**
+     * Emits connections to be written.
+     */
+    class ConnectionWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
+        BlockingQueue<ExportConnection> queue;
+
+        public ConnectionWritesOnSubscribe( BlockingQueue<ExportConnection> queue ) {
+            this.queue = queue;
+        }
+
+        public void call(Subscriber<? super ExportConnection> subscriber) {
+            int count = 0;
+            
+            while ( true ) {
+                ExportConnection connection = null;
+                try {
+                    //logger.debug( "Wrote {}. Polling for connection to write", count );
+                    activePollers.getAndIncrement();
+                    connection = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
+                } catch (InterruptedException e) {
+                    logger.error("Connection poll interrupted", e);
+                    continue;
+                } finally {
+                    activePollers.getAndDecrement();
+                }
+                if ( connection == null ) {
+                    break;
+                }
+                subscriber.onNext( connection );
+                count++;
+            }
+
+            logger.info("Done. Wrote {} connections", count);
+            if ( count > 0 ) {
+                subscriber.onCompleted();
+            } else {
+                subscriber.unsubscribe();
+            }
+        }
+    }
+
+    /**
+     * Writes entities to JSON file.
+     */
+    class EntityWriteAction implements Action1<ExportEntity> {
+
+        public void call(ExportEntity entity) {
+
+            boolean wroteData = false;
+
+            String fileName = "target/" + Thread.currentThread().getName() + ".ude";
+
+            JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
+            if ( gen == null ) {
+
+                // no generator so we are opening new file and writing the start of an array
+                try {
+                    gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+                } catch (IOException e) {
+                    throw new RuntimeException("Error opening output file: " + fileName, e);
+                }
+                gen.setPrettyPrinter( new MinimalPrettyPrinter(""));
+                gen.setCodec( mapper );
+                entityGeneratorsByThread.put( Thread.currentThread(), gen );
+            }
+
+            try {
+                gen.writeObject( entity );
+                gen.writeRaw('\n');
+                entitiesWritten.getAndIncrement();
+                wroteData = true;
+
+            } catch (IOException e) {
+                throw new RuntimeException("Error writing to output file: " + fileName, e);
+            }
+
+            if ( !wroteData ) {
+                emptyFiles.add( fileName );
+            }
+        }
+    }
+
+    /**
+     * Writes connection to JSON file.
+     */
+    class ConnectionWriteAction implements Action1<ExportConnection> {
+
+        public void call(ExportConnection conn) {
+
+            boolean wroteData = false;
+
+            String fileName = "target/" + Thread.currentThread().getName() + ".ugc";
+
+            JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
+            if ( gen == null ) {
+
+                // no generator so we are opening new file and writing the start of an array
+                try {
+                    gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+                } catch (IOException e) {
+                    throw new RuntimeException("Error opening output file: " + fileName, e);
+                }
+                gen.setPrettyPrinter( new MinimalPrettyPrinter(""));
+                gen.setCodec( mapper );
+                connectionGeneratorsByThread.put( Thread.currentThread(), gen );
+            }
+
+            try {
+                gen.writeObject( conn );
+                gen.writeRaw('\n');
+                connectionsWritten.getAndIncrement();
+                wroteData = true;
+
+            } catch (IOException e) {
+                throw new RuntimeException("Error writing to output file: " + fileName, e);
+            }
+
+            if ( !wroteData ) {
+                emptyFiles.add( fileName );
+            }
+        }
+    }
+
+}
+
+class ExportEntity {
+    private String application;
+    private Entity entity;
+    private Map<String, Object> dictionaries;
+    public ExportEntity( String application, Entity entity, Map<String, Object> dictionaries ) {
+        this.application = application;
+        this.entity = entity;
+        this.dictionaries = dictionaries;
+    }
+
+    public String getApplication() {
+        return application;
+    }
+
+    public void setApplication(String application) {
+        this.application = application;
+    }
+
+    public Entity getEntity() {
+        return entity;
+    }
+
+    public void setEntity(Entity entity) {
+        this.entity = entity;
+    }
+
+    public Map<String, Object> getDictionaries() {
+        return dictionaries;
+    }
+
+    public void setDictionaries(Map<String, Object> dictionaries) {
+        this.dictionaries = dictionaries;
+    }
+}
+
+class ExportConnection {
+    private String application;
+    private String connectionType;
+    private UUID sourceUuid;
+    private UUID targetUuid;
+    public ExportConnection(String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+        this.application = application;
+        this.connectionType = connectionType;
+        this.sourceUuid = sourceUuid;
+        this.targetUuid = targetUuid;
+    }
+
+    public String getApplication() {
+        return application;
+    }
+
+    public void setApplication(String application) {
+        this.application = application;
+    }
+
+    public String getConnectionType() {
+        return connectionType;
+    }
+
+    public void setConnectionType(String connectionType) {
+        this.connectionType = connectionType;
+    }
+
+    public UUID getSourceUuid() {
+        return sourceUuid;
+    }
+
+    public void setSourceUuid(UUID sourceUuid) {
+        this.sourceUuid = sourceUuid;
+    }
+
+    public UUID getTargetUuid() {
+        return targetUuid;
+    }
+
+    public void setTargetUuid(UUID targetUuid) {
+        this.targetUuid = targetUuid;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
new file mode 100644
index 0000000..14b9311
--- /dev/null
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.ServiceITSetup;
+import org.apache.usergrid.ServiceITSetupImpl;
+import org.apache.usergrid.ServiceITSuite;
+import org.apache.usergrid.management.ApplicationInfo;
+import org.apache.usergrid.management.OrganizationOwnerInfo;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.junit.ClassRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+
+public class ExportAppTest {
+    static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
+
+    @ClassRule
+    public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
+
+    @org.junit.Test
+    public void testBasicOperation() throws Exception {
+       
+        String rand = RandomStringUtils.randomAlphanumeric( 10 );
+        
+        // create app with some data
+
+        OrganizationOwnerInfo orgInfo = setup.getMgmtSvc().createOwnerAndOrganization(
+                "org_" + rand, "user_" + rand, rand.toUpperCase(), rand + "@example.com", rand );
+
+        ApplicationInfo appInfo = setup.getMgmtSvc().createApplication(
+                orgInfo.getOrganization().getUuid(), "app_" + rand );
+
+        EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
+        
+        // create 10 connected things
+
+        List<Entity> connectedThings = new ArrayList<Entity>();
+        String connectedType = "connected_thing";
+        em.createApplicationCollection(connectedType);
+        for ( int j=0; j<10; j++) {
+            final String name = "connected_thing_" + j;
+            connectedThings.add( em.create( connectedType, new HashMap<String, Object>() {{
+                put( "name", name );
+            }} ) );
+        }
+       
+        // create 10 collections of 10 things, every other thing is connected to the connected things
+        
+        for ( int i=0; i<10; i++) {
+            String type = "thing_"+i;
+            em.createApplicationCollection(type);
+            for ( int j=0; j<10; j++) {
+                final String name = "thing_" + j;
+                Entity source = em.create(type, new HashMap<String, Object>() {{ put("name", name); }});
+                if ( j % 2 == 0 ) {
+                    for ( Entity target : connectedThings ) {
+                        em.createConnection( source, "has", target );
+                    }
+                }
+            }
+        }
+        
+        // export to file
+
+        String directoryName = "./target/export" + rand;
+
+        ExportApp exportApp = new ExportApp();
+        exportApp.startTool( new String[]{
+                "-application", appInfo.getName(),
+                "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
+                "-outputDir", directoryName
+        }, false );
+        
+    }
+}
\ No newline at end of file