You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/22 17:18:40 UTC

[01/18] incubator-usergrid git commit: Add new RxJava based multi-threaded ExportApp tool, and upgrade to RxJava 1.0.12.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev b1393b4ed -> 89dd0ad98


Add new RxJava based multi-threaded ExportApp tool, and upgrade to RxJava 1.0.12.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b2bdbb54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b2bdbb54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b2bdbb54

Branch: refs/heads/two-dot-o-dev
Commit: b2bdbb5456301dbcf7131e780d968514cdd7e55d
Parents: 75ad454
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Jul 13 10:21:09 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Jul 13 10:21:09 2015 -0400

----------------------------------------------------------------------
 stack/core/pom.xml                              |   9 +-
 stack/pom.xml                                   |   2 +-
 .../org/apache/usergrid/tools/ExportApp.java    | 687 +++++++++++++++++++
 .../apache/usergrid/tools/ExportAppTest.java    |  97 +++
 4 files changed, 790 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index 47fa840..f60dbc2 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -573,14 +573,15 @@
     </dependency>
 
     <dependency>
-      <groupId>com.netflix.rxjava</groupId>
-      <artifactId>rxjava-core</artifactId>
+      <groupId>io.reactivex</groupId>
+      <artifactId>rxjava</artifactId>
       <version>${rx.version}</version>
     </dependency>
+    
     <dependency>
-      <groupId>com.netflix.rxjava</groupId>
+      <groupId>io.reactivex</groupId>
       <artifactId>rxjava-math</artifactId>
-      <version>${rx.version}</version>
+      <version>1.0.0</version>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index bdc3549..da1b62c 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -108,7 +108,7 @@
     <antlr.version>3.4</antlr.version>
     <tika.version>1.4</tika.version>
     <metrics.version>3.0.0</metrics.version>
-    <rx.version>0.19.6</rx.version>
+    <rx.version>1.0.12</rx.version>
   </properties>
 
   <licenses>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
new file mode 100644
index 0000000..ceb3ecd
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.Results;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.util.MinimalPrettyPrinter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Export application's collections.
+ */
+public class ExportApp extends ExportingToolBase {
+    static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
+    
+    // we will write two types of files: entities and connections
+    BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
+    BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
+
+    static final String APPLICATION_NAME = "application";
+    
+    int pollTimeoutSeconds = 10;
+
+    // limiting output threads will limit output files 
+    final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(8);
+    final Scheduler scheduler = Schedulers.from( threadPoolExecutor );
+
+    Map<Thread, JsonGenerator> entityGeneratorsByThread  = new HashMap<Thread, JsonGenerator>();
+    Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
+
+    List<String> emptyFiles = new ArrayList<String>();
+    
+    AtomicInteger activePollers = new AtomicInteger(0);
+    AtomicInteger entitiesQueued = new AtomicInteger(0);
+    AtomicInteger entitiesWritten = new AtomicInteger(0);
+    AtomicInteger connectionsWritten = new AtomicInteger(0);
+    AtomicInteger connectionsQueued = new AtomicInteger(0);
+
+    ObjectMapper mapper = new ObjectMapper();
+    
+    /**
+     * Export admin users using multiple threads.
+     * <p/>
+     * How it works:
+     * In main thread we query for IDs of all admin users, add each ID to read queue.
+     * Read-queue workers read admin user data, add data to write queue.
+     * One write-queue worker reads data writes to file.
+     */
+    @Override
+    public void runTool(CommandLine line) throws Exception {
+        
+        String applicationName = line.getOptionValue( APPLICATION_NAME );
+
+//        if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
+//            try {
+//                readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
+//            } catch (NumberFormatException nfe) {
+//                logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+//                return;
+//            }
+//        } else {
+//            readThreadCount = 20;
+//        }
+        
+        startSpring();
+
+        setVerbose( line );
+
+        applyOrgId( line );
+        prepareBaseOutputFileName( line );
+        outputDir = createOutputParentDir();
+        logger.info( "Export directory: " + outputDir.getAbsolutePath() );
+       
+        UUID applicationId = emf.lookupApplication( applicationName );
+        final EntityManager em = emf.getEntityManager( applicationId );
+
+        // start write queue workers
+
+        EntityWritesOnSubscribe entityWritesOnSub = new EntityWritesOnSubscribe( entityWriteQueue );
+        rx.Observable entityWritesObservable = rx.Observable.create( entityWritesOnSub );
+        entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
+            public Observable<ExportEntity> call(ExportEntity exportEntity) {
+                return Observable.just(exportEntity).doOnNext( 
+                        new EntityWriteAction() ).subscribeOn( scheduler );
+            }
+        },10).subscribeOn( scheduler ).subscribe();
+
+        ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
+        rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
+        connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
+            public Observable<ExportConnection> call(ExportConnection connection ) {
+                return Observable.just(connection).doOnNext( 
+                        new ConnectionWriteAction()).subscribeOn( scheduler );
+            }
+        },10).subscribeOn( scheduler ).subscribe();
+
+        // start processing data and filling up write queues
+
+        CollectionsOnSubscribe onSubscribe = new CollectionsOnSubscribe( em );
+        rx.Observable collectionsObservable = rx.Observable.create( onSubscribe );
+        collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
+            public Observable<String> call(String collection) {
+                return Observable.just(collection).doOnNext( 
+                        new CollectionAction( em ) ).subscribeOn( Schedulers.io() );
+            }
+        },40).subscribeOn( Schedulers.io() ).subscribe();
+        
+        // wait for write thread pollers to get started
+
+        try { Thread.sleep( 1000 ); } catch (InterruptedException ignored) {}
+
+        // wait for write-thread pollers to stop
+
+        while ( activePollers.get() > 0 ) {
+            logger.info(
+                     "Active write threads: {}\n"
+                    +"Entities written:     {}\n"
+                    +"Entities queued:      {}\n"
+                    +"Connections written:  {}\n"
+                    +"Connections queued:   {}\n",
+                    new Object[] { 
+                        activePollers.get(),
+                        entitiesWritten.get(), 
+                        entitiesQueued.get(),
+                        connectionsWritten.get(), 
+                        connectionsQueued.get()} );
+            try { Thread.sleep( 5000 ); } catch (InterruptedException ignored) {}
+        }
+
+        // wrap up files
+
+        for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
+            //gen.writeEndArray();
+            gen.flush();
+            gen.close();
+        }
+
+        for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
+            //gen.writeEndArray();
+            gen.flush();
+            gen.close();
+        }
+
+        for ( String fileName : emptyFiles ) {
+            File emptyFile = new File(fileName);
+            emptyFile.deleteOnExit();
+        }
+
+    }
+
+    @Override
+    @SuppressWarnings("static-access")
+    public Options createOptions() {
+
+        Options options = super.createOptions();
+
+        Option readThreads = OptionBuilder.hasArg().withType("")
+                .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
+        options.addOption( readThreads );
+        
+//        Option readThreads = OptionBuilder
+//                .hasArg().withType(0).withDescription("Read Threads -" + READ_THREAD_COUNT).create(READ_THREAD_COUNT);
+//        options.addOption( readThreads );
+        
+        return options;
+    }
+
+    // ----------------------------------------------------------------------------------------
+    // reading data
+
+    /**
+     * Emits collection names found in application.
+     */
+    class CollectionsOnSubscribe implements rx.Observable.OnSubscribe<String> {
+        EntityManager em;
+                
+        public CollectionsOnSubscribe( EntityManager em ) {
+            this.em = em;
+        }
+
+        public void call(Subscriber<? super String> subscriber) {
+            
+            logger.info("Starting to read collections");
+            
+            int count = 0;
+            try {
+                Map<String, Object> collectionMetadata = em.getApplicationCollectionMetadata();
+                for ( String collection : collectionMetadata.keySet() ) {
+                    subscriber.onNext( collection );
+                    count++;
+                }
+                
+            } catch (Exception e) {
+                subscriber.onError( e );
+            }
+            logger.info("Done. Read {} collection names", count);
+            if ( count > 0 ) {
+                subscriber.onCompleted();
+            } else {
+                subscriber.unsubscribe();
+            }
+        }
+    }
+
+    /**
+     * Emits entities of collection.
+     */
+    class EntityOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
+        EntityManager em;
+        String collection;
+
+        public EntityOnSubscribe(EntityManager em, String collection) {
+            this.em = em;
+            this.collection = collection;
+        }
+
+        public void call(Subscriber<? super ExportEntity> subscriber) {
+
+            logger.info("Starting to read entities of collection {}", collection);
+            
+            try {
+                int count = 0;
+
+                Query query = new Query();
+                query.setLimit( MAX_ENTITY_FETCH );
+
+                Results results = em.searchCollection( em.getApplicationRef(), collection, query );
+
+                while (results.size() > 0) {
+                    for (Entity entity : results.getEntities()) {
+                        try {
+                            Set<String> dictionaries = em.getDictionaries( entity );
+                            Map dictionariesByName = new HashMap<String, Map<Object, Object>>();
+                            for (String dictionary : dictionaries) {
+                                Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary );
+                                if (dict.isEmpty()) {
+                                    continue;
+                                }
+                                dictionariesByName.put( dictionary, dict );
+                            }
+                            ExportEntity exportEntity = new ExportEntity( 
+                                    em.getApplication().getApplicationName(), 
+                                    entity, dictionariesByName );
+                            subscriber.onNext( exportEntity );
+                            count++;
+                            
+                        } catch (Exception e) {
+                            logger.error("Error reading entity " + entity.getUuid() +" from collection " + collection);
+                        }
+                    }
+                    if (results.getCursor() == null) {
+                        break;
+                    }
+                    query.setCursor( results.getCursor() );
+                    results = em.searchCollection( em.getApplicationRef(), collection, query );
+                }
+
+                logger.info("Done. Read {} entities", count);
+                if ( count > 0 ) {
+                    subscriber.onCompleted();
+                } else {
+                    subscriber.unsubscribe();
+                }
+                
+            } catch ( Exception e ) {
+                subscriber.onError(e);
+            }
+        }
+    }
+
+    /**
+     * Emits connections of an entity.
+     */
+    class ConnectionsOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
+        EntityManager em;
+        ExportEntity exportEntity;
+
+        public ConnectionsOnSubscribe(EntityManager em, ExportEntity exportEntity) {
+            this.em = em;
+            this.exportEntity = exportEntity;
+        }
+
+        public void call(Subscriber<? super ExportConnection> subscriber) {
+
+            logger.info("Starting to read connections for entity type {}", exportEntity.getEntity().getType());
+            
+            int count = 0;
+            
+            try {
+                Set<String> connectionTypes = em.getConnectionTypes( exportEntity.getEntity() );
+                for (String connectionType : connectionTypes) {
+
+                    Results results = em.getConnectedEntities( 
+                            exportEntity.getEntity().getUuid(), connectionType, null, Results.Level.CORE_PROPERTIES );
+
+                    for (Entity connectedEntity : results.getEntities()) {
+                        try {
+                            ExportConnection connection = new ExportConnection( 
+                                    em.getApplication().getApplicationName(), 
+                                    connectionType, 
+                                    exportEntity.getEntity().getUuid(), 
+                                    connectedEntity.getUuid());
+                            subscriber.onNext( connection );
+                            count++;
+
+                        } catch (Exception e) {
+                            logger.error( "Error reading connection entity " 
+                                + exportEntity.getEntity().getUuid() + " -> " + connectedEntity.getType());
+                        }
+                    }
+                }
+                
+            } catch (Exception e) {
+                subscriber.onError( e );
+            }
+
+            logger.info("Done. Read {} connections", count);
+            if ( count > 0 ) {
+                subscriber.onCompleted();
+            } else {
+                subscriber.unsubscribe();
+            }
+        }
+    }
+
+    /**
+     * Process collection by starting processing of its entities.
+     */
+    class CollectionAction implements Action1<String> {
+        EntityManager em;
+
+        public CollectionAction( EntityManager em ) {
+            this.em = em;
+        }
+
+        public void call(String collection) {
+
+            // process entities of collection in parallel            
+            EntityOnSubscribe onSubscribe = new EntityOnSubscribe( em, collection );
+            rx.Observable entityObservable = rx.Observable.create( onSubscribe );
+            entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
+                public Observable<ExportEntity> call(ExportEntity exportEntity) {
+                    return Observable.just(exportEntity).doOnNext( 
+                            new EntityAction( em ) ).subscribeOn( Schedulers.io() );
+                }
+            }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+        }
+    }
+
+    /**
+     * Process entity by adding it to entityWriteQueue and starting processing of its connections.
+     */
+    class EntityAction implements Action1<ExportEntity> {
+        EntityManager em;
+
+        public EntityAction( EntityManager em ) {
+            this.em = em;
+        }
+        
+        public void call(ExportEntity exportEntity) {
+            //logger.debug( "Processing entity: " + exportEntity.getEntity().getUuid() );
+
+            entityWriteQueue.add( exportEntity );
+            entitiesQueued.getAndIncrement();
+
+            // if entity has connections, process them in parallel
+            try {
+                Results connectedEntities = em.getConnectedEntities(
+                        exportEntity.getEntity().getUuid(), null, null, Results.Level.CORE_PROPERTIES );
+
+                if ( !connectedEntities.isEmpty() ) {
+                    ConnectionsOnSubscribe onSubscribe = new ConnectionsOnSubscribe( em, exportEntity );
+                    rx.Observable entityObservable = rx.Observable.create( onSubscribe );
+                    
+                    entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
+                        public Observable<ExportConnection> call(ExportConnection connection) {
+                            return Observable.just(connection).doOnNext( 
+                                    new ConnectionsAction() ).subscribeOn( Schedulers.io() );
+                        }
+                    }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+                }
+                
+            } catch (Exception e) {
+                throw new RuntimeException( "Error getting connections", e );
+            }
+        }
+    }
+
+    /**
+     * Process connection by adding it to connectionWriteQueue. 
+     */
+    class ConnectionsAction implements Action1<ExportConnection> {
+
+        public void call(ExportConnection conn) {
+            //logger.debug( "Processing connections for entity: " + conn.getSourceUuid() );
+            connectionWriteQueue.add(conn);
+            connectionsQueued.getAndIncrement();
+        }
+    }
+
+
+    // ----------------------------------------------------------------------------------------
+    // writing data
+
+    /**
+     * Emits entities to be written.
+     */
+    class EntityWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
+        BlockingQueue<ExportEntity> queue;
+
+        public EntityWritesOnSubscribe( BlockingQueue<ExportEntity> queue ) {
+            this.queue = queue;
+        }
+
+        public void call(Subscriber<? super ExportEntity> subscriber) {
+            int count = 0;
+            
+            while ( true ) {
+                ExportEntity entity = null;
+                try {
+                    //logger.debug( "Wrote {}. Polling for entity to write...", count );
+                    activePollers.getAndIncrement();
+                    entity = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
+                } catch (InterruptedException e) {
+                    logger.error("Entity poll interrupted", e);
+                    continue;
+                } finally {
+                    activePollers.getAndDecrement();
+                }
+                if ( entity == null ) {
+                    break;
+                }
+                subscriber.onNext( entity );
+                count++;
+            }
+
+            logger.info("Done. Wrote {} entities", count);
+            if ( count > 0 ) {
+                subscriber.onCompleted();
+            } else {
+                subscriber.unsubscribe();
+            }
+        }
+    }
+
+    /**
+     * Emits connections to be written.
+     */
+    class ConnectionWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
+        BlockingQueue<ExportConnection> queue;
+
+        public ConnectionWritesOnSubscribe( BlockingQueue<ExportConnection> queue ) {
+            this.queue = queue;
+        }
+
+        public void call(Subscriber<? super ExportConnection> subscriber) {
+            int count = 0;
+            
+            while ( true ) {
+                ExportConnection connection = null;
+                try {
+                    //logger.debug( "Wrote {}. Polling for connection to write", count );
+                    activePollers.getAndIncrement();
+                    connection = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
+                } catch (InterruptedException e) {
+                    logger.error("Connection poll interrupted", e);
+                    continue;
+                } finally {
+                    activePollers.getAndDecrement();
+                }
+                if ( connection == null ) {
+                    break;
+                }
+                subscriber.onNext( connection );
+                count++;
+            }
+
+            logger.info("Done. Wrote {} connections", count);
+            if ( count > 0 ) {
+                subscriber.onCompleted();
+            } else {
+                subscriber.unsubscribe();
+            }
+        }
+    }
+
+    /**
+     * Writes entities to JSON file.
+     */
+    class EntityWriteAction implements Action1<ExportEntity> {
+
+        public void call(ExportEntity entity) {
+
+            boolean wroteData = false;
+
+            String fileName = "target/" + Thread.currentThread().getName() + ".ude";
+
+            JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
+            if ( gen == null ) {
+
+                // no generator so we are opening new file and writing the start of an array
+                try {
+                    gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+                } catch (IOException e) {
+                    throw new RuntimeException("Error opening output file: " + fileName, e);
+                }
+                gen.setPrettyPrinter( new MinimalPrettyPrinter(""));
+                gen.setCodec( mapper );
+                entityGeneratorsByThread.put( Thread.currentThread(), gen );
+            }
+
+            try {
+                gen.writeObject( entity );
+                gen.writeRaw('\n');
+                entitiesWritten.getAndIncrement();
+                wroteData = true;
+
+            } catch (IOException e) {
+                throw new RuntimeException("Error writing to output file: " + fileName, e);
+            }
+
+            if ( !wroteData ) {
+                emptyFiles.add( fileName );
+            }
+        }
+    }
+
+    /**
+     * Writes connection to JSON file.
+     */
+    class ConnectionWriteAction implements Action1<ExportConnection> {
+
+        public void call(ExportConnection conn) {
+
+            boolean wroteData = false;
+
+            String fileName = "target/" + Thread.currentThread().getName() + ".ugc";
+
+            JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
+            if ( gen == null ) {
+
+                // no generator so we are opening new file and writing the start of an array
+                try {
+                    gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+                } catch (IOException e) {
+                    throw new RuntimeException("Error opening output file: " + fileName, e);
+                }
+                gen.setPrettyPrinter( new MinimalPrettyPrinter(""));
+                gen.setCodec( mapper );
+                connectionGeneratorsByThread.put( Thread.currentThread(), gen );
+            }
+
+            try {
+                gen.writeObject( conn );
+                gen.writeRaw('\n');
+                connectionsWritten.getAndIncrement();
+                wroteData = true;
+
+            } catch (IOException e) {
+                throw new RuntimeException("Error writing to output file: " + fileName, e);
+            }
+
+            if ( !wroteData ) {
+                emptyFiles.add( fileName );
+            }
+        }
+    }
+
+}
+
+class ExportEntity {
+    private String application;
+    private Entity entity;
+    private Map<String, Object> dictionaries;
+    public ExportEntity( String application, Entity entity, Map<String, Object> dictionaries ) {
+        this.application = application;
+        this.entity = entity;
+        this.dictionaries = dictionaries;
+    }
+
+    public String getApplication() {
+        return application;
+    }
+
+    public void setApplication(String application) {
+        this.application = application;
+    }
+
+    public Entity getEntity() {
+        return entity;
+    }
+
+    public void setEntity(Entity entity) {
+        this.entity = entity;
+    }
+
+    public Map<String, Object> getDictionaries() {
+        return dictionaries;
+    }
+
+    public void setDictionaries(Map<String, Object> dictionaries) {
+        this.dictionaries = dictionaries;
+    }
+}
+
+class ExportConnection {
+    private String application;
+    private String connectionType;
+    private UUID sourceUuid;
+    private UUID targetUuid;
+    public ExportConnection(String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+        this.application = application;
+        this.connectionType = connectionType;
+        this.sourceUuid = sourceUuid;
+        this.targetUuid = targetUuid;
+    }
+
+    public String getApplication() {
+        return application;
+    }
+
+    public void setApplication(String application) {
+        this.application = application;
+    }
+
+    public String getConnectionType() {
+        return connectionType;
+    }
+
+    public void setConnectionType(String connectionType) {
+        this.connectionType = connectionType;
+    }
+
+    public UUID getSourceUuid() {
+        return sourceUuid;
+    }
+
+    public void setSourceUuid(UUID sourceUuid) {
+        this.sourceUuid = sourceUuid;
+    }
+
+    public UUID getTargetUuid() {
+        return targetUuid;
+    }
+
+    public void setTargetUuid(UUID targetUuid) {
+        this.targetUuid = targetUuid;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b2bdbb54/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
new file mode 100644
index 0000000..14b9311
--- /dev/null
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.ServiceITSetup;
+import org.apache.usergrid.ServiceITSetupImpl;
+import org.apache.usergrid.ServiceITSuite;
+import org.apache.usergrid.management.ApplicationInfo;
+import org.apache.usergrid.management.OrganizationOwnerInfo;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.junit.ClassRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+
+public class ExportAppTest {
+    static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
+
+    @ClassRule
+    public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
+
+    @org.junit.Test
+    public void testBasicOperation() throws Exception {
+       
+        String rand = RandomStringUtils.randomAlphanumeric( 10 );
+        
+        // create app with some data
+
+        OrganizationOwnerInfo orgInfo = setup.getMgmtSvc().createOwnerAndOrganization(
+                "org_" + rand, "user_" + rand, rand.toUpperCase(), rand + "@example.com", rand );
+
+        ApplicationInfo appInfo = setup.getMgmtSvc().createApplication(
+                orgInfo.getOrganization().getUuid(), "app_" + rand );
+
+        EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
+        
+        // create 10 connected things
+
+        List<Entity> connectedThings = new ArrayList<Entity>();
+        String connectedType = "connected_thing";
+        em.createApplicationCollection(connectedType);
+        for ( int j=0; j<10; j++) {
+            final String name = "connected_thing_" + j;
+            connectedThings.add( em.create( connectedType, new HashMap<String, Object>() {{
+                put( "name", name );
+            }} ) );
+        }
+       
+        // create 10 collections of 10 things, every other thing is connected to the connected things
+        
+        for ( int i=0; i<10; i++) {
+            String type = "thing_"+i;
+            em.createApplicationCollection(type);
+            for ( int j=0; j<10; j++) {
+                final String name = "thing_" + j;
+                Entity source = em.create(type, new HashMap<String, Object>() {{ put("name", name); }});
+                if ( j % 2 == 0 ) {
+                    for ( Entity target : connectedThings ) {
+                        em.createConnection( source, "has", target );
+                    }
+                }
+            }
+        }
+        
+        // export to file
+
+        String directoryName = "./target/export" + rand;
+
+        ExportApp exportApp = new ExportApp();
+        exportApp.startTool( new String[]{
+                "-application", appInfo.getName(),
+                "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
+                "-outputDir", directoryName
+        }, false );
+        
+    }
+}
\ No newline at end of file


[15/18] incubator-usergrid git commit: Merge branch 'master' into two-dot-o

Posted by sn...@apache.org.
Merge branch 'master' into two-dot-o

Conflicts:
	stack/core/pom.xml
	stack/pom.xml
	stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
	stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
	stack/tools/src/main/resources/log4j.properties


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/066d7db4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/066d7db4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/066d7db4

Branch: refs/heads/two-dot-o-dev
Commit: 066d7db46786cc042cdc4fa28b56eb2f729561d1
Parents: 764a7c8 9b9f55a
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 22 09:44:58 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 22 09:44:58 2015 -0400

----------------------------------------------------------------------
 stack/core/pom.xml                              |   1 -
 stack/pom.xml                                   |   2 +-
 stack/tools/pom.xml                             |   6 +
 .../org/apache/usergrid/tools/ExportAdmins.java | 117 ++--
 .../org/apache/usergrid/tools/ExportApp.java    | 536 +++++++++++++++++++
 .../usergrid/tools/ExportDataCreator.java       | 244 +++++++--
 .../usergrid/tools/ExportingToolBase.java       |   2 +-
 .../org/apache/usergrid/tools/ImportAdmins.java | 226 ++++++--
 .../org/apache/usergrid/tools/ToolBase.java     |   2 +-
 stack/tools/src/main/resources/log4j.properties |   5 +
 .../apache/usergrid/tools/ExportAppTest.java    | 118 ++++
 .../usergrid/tools/ExportImportAdminsTest.java  |  71 ++-
 ...adata.usergrid-management.1433331614293.json |  52 ++
 ...users.usergrid-management.1433331614293.json |  12 +
 14 files changed, 1225 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/066d7db4/stack/core/pom.xml
----------------------------------------------------------------------
diff --cc stack/core/pom.xml
index d91208d,f60dbc2..5e65ac3
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@@ -473,23 -573,16 +473,22 @@@
      </dependency>
  
      <dependency>
 -      <groupId>io.reactivex</groupId>
 -      <artifactId>rxjava</artifactId>
 +      <groupId>com.netflix.rxjava</groupId>
 +      <artifactId>rxjava-core</artifactId>
        <version>${rx.version}</version>
      </dependency>
- 
 -    
      <dependency>
 -      <groupId>io.reactivex</groupId>
 +      <groupId>com.netflix.rxjava</groupId>
        <artifactId>rxjava-math</artifactId>
 -      <version>1.0.0</version>
 +      <version>${rx.version}</version>
      </dependency>
 +
 +    <dependency>
 +      <groupId>com.clearspring.analytics</groupId>
 +      <artifactId>stream</artifactId>
 +      <version>2.7.0</version>
 +    </dependency>
 +
    </dependencies>
  
  </project>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/066d7db4/stack/pom.xml
----------------------------------------------------------------------
diff --cc stack/pom.xml
index cc61a7c,0e1b32d..cc39e04
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@@ -32,107 -32,84 +32,107 @@@
    <description>Parent module for the Apache Usergrid Project</description>
    <packaging>pom</packaging>
  
 -  <properties>
 -    <!-- =================================================================== -->
 -    <!-- Properties: Deployment Setting Defaults                             -->
 -    <!-- =================================================================== -->
 -    <!-- NOTE: override from the CLI or settings.xml                 -->
 -    <!-- NOTE: add server credentials config via settings            -->
 -    <!-- NOTE: <settings>                                            -->
 -    <!-- NOTE:   <servers>                                           -->
 -    <!-- NOTE:     <server>                                          -->
 -    <!-- NOTE:       <id>usergrid.releases</id>                      -->
 -    <!-- NOTE:       <username>akarasulu</username>                  -->
 -    <!-- NOTE:       <password>*********</password>                  -->
 -    <!-- NOTE:     </server>                                         -->
 -    <!-- NOTE:     <server>                                          -->
 -    <!-- NOTE:       <id>usergrid.snapshots</id>                     -->
 -    <!-- NOTE:       <username>akarasulu</username>                  -->
 -    <!-- NOTE:       <password>*********</password>                  -->
 -    <!-- NOTE:     </server>                                         -->
 -    <!-- NOTE:   </servers>                                          -->
 -    <!-- NOTE:                                                       -->
 -    <!-- NOTE:   <profiles>                                          -->
 -    <!-- NOTE:     <profile>                                         -->
 -    <!-- NOTE:       <id>deployment</id>                             -->
 -    <!-- NOTE:       <properties>                                    -->
 -    <!-- NOTE:         <release.repository.url>                      -->
 -    <!-- NOTE:           https://to/your/custom/releases/repository  -->
 -    <!-- NOTE:         </release.repository.url>                     -->
 -    <!-- NOTE:         <snapshot.repository.url>                     -->
 -    <!-- NOTE:           https://to/your/custom/snapshots/repository -->
 -    <!-- NOTE:         </shapshot.repository.url>                    -->
 -    <!-- NOTE:       </properties>                                   -->
 -    <!-- NOTE:     </profile>                                        -->
 -    <!-- NOTE:   </profiles>                                         -->
 -    <!-- NOTE:                                                       -->
 -    <!-- NOTE:   <activeProfiles>                                    -->
 -    <!-- NOTE:     <activeProfile>deployment</activeProfile>         -->
 -    <!-- NOTE:   </activeProfiles>                                   -->
 -    <!-- NOTE: </settings>                                           -->
 -
 -    <!-- =================================================================== -->
 -    <!-- Properties: General Settings -->
 -    <!-- =================================================================== -->
 -
 -    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 -
 -    <!-- you can override these via MAVEN_OPTS -->
 -    <ug.heapmax>2048m</ug.heapmax>
 -    <ug.heapmin>2048m</ug.heapmin>
 -	<ug.argline>-Djava.awt.headless=true</ug.argline>
 -
 -    <usergrid-custom-spring-properties>classpath:/usergrid-custom.properties</usergrid-custom-spring-properties>
 -    <usergrid-custom-spring-test-properties>classpath:/usergrid-custom-test.properties</usergrid-custom-spring-test-properties>
 -
 -    <!-- =================================================================== -->
 -    <!-- Properties: Dependency Settings -->
 -    <!-- =================================================================== -->
 -
 -    <amber-version>0.22-incubating</amber-version>
 -    <cassandra-version>1.2.12</cassandra-version>
 -    <hector-om-version>3.0-03</hector-om-version>
 -    <hector-version>1.1-4</hector-version>
 -    <hector-test-version>1.1-4</hector-test-version>
 -    <jackson-version>1.9.9</jackson-version>
 -    <jclouds.version>1.7.1</jclouds.version>
 -    <jersey-version>1.18</jersey-version>
 -    <junit-version>4.11</junit-version>
 -    <log4j-version>1.2.16</log4j-version>
 -    <metrics-version>2.1.2</metrics-version>
 -    <org.springframework.version>3.1.2.RELEASE</org.springframework.version>
 -    <shiro-version>1.2.0</shiro-version>
 -    <slf4j-version>1.6.1</slf4j-version>
 -    <snakeyaml-version>1.9</snakeyaml-version>
 -    <tomcat-version>7.0.42</tomcat-version>
 -    <antlr.version>3.4</antlr.version>
 -    <tika.version>1.4</tika.version>
 -    <metrics.version>3.0.0</metrics.version>
 -    <rx.version>1.0.12</rx.version>
 -  </properties>
 +    <properties>
 +      <!-- =================================================================== -->
 +      <!-- Properties: Deployment Setting Defaults                             -->
 +      <!-- =================================================================== -->
 +      <!-- NOTE: override from the CLI or settings.xml                 -->
 +      <!-- NOTE: add server credentials config via settings            -->
 +      <!-- NOTE: <settings>                                            -->
 +      <!-- NOTE:   <servers>                                           -->
 +      <!-- NOTE:     <server>                                          -->
 +      <!-- NOTE:       <id>usergrid.releases</id>                      -->
 +      <!-- NOTE:       <username>akarasulu</username>                  -->
 +      <!-- NOTE:       <password>*********</password>                  -->
 +      <!-- NOTE:     </server>                                         -->
 +      <!-- NOTE:     <server>                                          -->
 +      <!-- NOTE:       <id>usergrid.snapshots</id>                     -->
 +      <!-- NOTE:       <username>akarasulu</username>                  -->
 +      <!-- NOTE:       <password>*********</password>                  -->
 +      <!-- NOTE:     </server>                                         -->
 +      <!-- NOTE:   </servers>                                          -->
 +      <!-- NOTE:                                                       -->
 +      <!-- NOTE:   <profiles>                                          -->
 +      <!-- NOTE:     <profile>                                         -->
 +      <!-- NOTE:       <id>deployment</id>                             -->
 +      <!-- NOTE:       <properties>                                    -->
 +      <!-- NOTE:         <release.repository.url>                      -->
 +      <!-- NOTE:           https://to/your/custom/releases/repository  -->
 +      <!-- NOTE:         </release.repository.url>                     -->
 +      <!-- NOTE:         <snapshot.repository.url>                     -->
 +      <!-- NOTE:           https://to/your/custom/snapshots/repository -->
 +      <!-- NOTE:         </shapshot.repository.url>                    -->
 +      <!-- NOTE:       </properties>                                   -->
 +      <!-- NOTE:     </profile>                                        -->
 +      <!-- NOTE:   </profiles>                                         -->
 +      <!-- NOTE:                                                       -->
 +      <!-- NOTE:   <activeProfiles>                                    -->
 +      <!-- NOTE:     <activeProfile>deployment</activeProfile>         -->
 +      <!-- NOTE:   </activeProfiles>                                   -->
 +      <!-- NOTE: </settings>                                           -->
 +
 +      <snapshot.repository.url>
 +        https://repository.apache.org/content/repositories/snapshots
 +      </snapshot.repository.url>
 +      <release.repository.url>
 +        https://repository.apache.org/service/local/staging/deploy/maven2
 +      </release.repository.url>
 +
 +      <!-- =================================================================== -->
 +      <!-- Properties: General Settings -->
 +      <!-- =================================================================== -->
 +
 +      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 +
 +      <!-- you can override these via MAVEN_OPTS -->
 +      <ug.heapmax>4096m</ug.heapmax>
 +      <ug.heapmin>2048m</ug.heapmin>
 +  	  <ug.argline>-Djava.awt.headless=true</ug.argline>
 +
 +      <usergrid-custom-spring-properties>classpath:/usergrid-deployment.properties</usergrid-custom-spring-properties>
 +      <usergrid-custom-spring-test-properties>classpath:/usergrid-custom-test.properties</usergrid-custom-spring-test-properties>
 +
 +      <!-- =================================================================== -->
 +      <!-- Properties: Dependency Settings -->
 +      <!-- =================================================================== -->
 +
 +      <amber-version>0.22-incubating</amber-version>
 +      <cassandra-version>1.2.18</cassandra-version>
 +      <guava.version>18.0</guava.version>
 +      <guice.version>4.0-beta5</guice.version>
 +      <hector-om-version>3.0-03</hector-om-version>
 +      <hector-version>1.1-4</hector-version>
 +      <hector-test-version>1.1-4</hector-test-version>
 +      <jackson-version>1.9.9</jackson-version>
 +      <jackson-2-version>2.3.3</jackson-2-version>
 +      <jclouds.version>1.8.0</jclouds.version>
 +      <jersey-version>1.18.1</jersey-version>
 +      <junit-version>4.12</junit-version>
 +      <log4j-version>1.2.16</log4j-version>
 +      <org.springframework.version>3.1.2.RELEASE</org.springframework.version>
 +      <shiro-version>1.2.3</shiro-version>
 +      <slf4j-version>1.6.1</slf4j-version>
-       <snakeyaml-version>1.8</snakeyaml-version>
++      <snakeyaml-version>1.9</snakeyaml-version>
 +      <tomcat-version>7.0.59</tomcat-version>
 +      <antlr.version>3.4</antlr.version>
 +      <tika.version>1.4</tika.version>
 +      <mockito.version>1.10.8</mockito.version>
 +
 +      <!-- only use half the cores on the machine for testing -->
 +      <usergrid.it.parallel>methods</usergrid.it.parallel>
 +      <usergrid.it.reuseForks>true</usergrid.it.reuseForks>
 +      <usergrid.it.forkCount>1</usergrid.it.forkCount>
 +      <usergrid.it.threads>8</usergrid.it.threads>
 +
 +      <metrics.version>3.0.0</metrics.version>
 +      <rx.version>0.19.6</rx.version>
 +        <surefire.plugin.artifactName>surefire-junit47</surefire.plugin.artifactName>
 +      <surefire.plugin.version>2.18.1</surefire.plugin.version>
 +      <powermock.version>1.6.1</powermock.version>
 +
 +      <maven.build.timestamp.format>yyyy-MM-dd'T'HH-mm-ss'Z'</maven.build.timestamp.format>
 +
 +    </properties>
  
    <licenses>
      <license>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/066d7db4/stack/tools/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/066d7db4/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
----------------------------------------------------------------------
diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
index e175d01,d5dd42c..0bb74ab
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
@@@ -22,12 -23,13 +22,10 @@@ import org.apache.commons.cli.CommandLi
  import org.apache.commons.cli.Option;
  import org.apache.commons.cli.OptionBuilder;
  import org.apache.commons.cli.Options;
 -import org.apache.usergrid.management.UserInfo;
 +import org.apache.usergrid.corepersistence.util.CpNamingUtils;
- import org.apache.usergrid.management.UserInfo;
  import org.apache.usergrid.persistence.Entity;
  import org.apache.usergrid.persistence.EntityManager;
 -import org.apache.usergrid.persistence.Query;
  import org.apache.usergrid.persistence.Results;
- import org.apache.usergrid.persistence.index.query.Query;
 -import org.apache.usergrid.persistence.Results.Level;
 -import org.apache.usergrid.persistence.cassandra.CassandraService;
  import org.apache.usergrid.utils.StringUtils;
  import org.codehaus.jackson.JsonGenerator;
  import org.slf4j.Logger;
@@@ -57,17 -61,29 +55,29 @@@ import java.util.concurrent.atomic.Atom
   *    cassandra.lock.keyspace=My_Usergrid_Locks
   */
  public class ExportAdmins extends ExportingToolBase {
 -    
 +
      static final Logger logger = LoggerFactory.getLogger( ExportAdmins.class );
 -    
++
      public static final String ADMIN_USERS_PREFIX = "admin-users";
      public static final String ADMIN_USER_METADATA_PREFIX = "admin-user-metadata";
 -   
++
+     // map admin user UUID to list of organizations to which user belongs
+     private Map<UUID, List<Org>> userToOrgsMap = new HashMap<UUID, List<Org>>(50000);
+ 
+     private Map<String, UUID> orgNameToUUID = new HashMap<String, UUID>(50000);
 -    
++
+     private Set<UUID> orgsWritten = new HashSet<UUID>(50000);
 -    
++
+     private Set<UUID> duplicateOrgs = new HashSet<UUID>();
 -    
++
      private static final String READ_THREAD_COUNT = "readThreads";
-     private Map<String, List<Org>> orgMap = new HashMap<String, List<Org>>(80000);
      private int readThreadCount;
  
-     AtomicInteger count = new AtomicInteger( 0 );
+     AtomicInteger userCount = new AtomicInteger( 0 );
 -    
++
+     boolean ignoreInvalidUsers = false; // true to ignore users with no credentials or orgs
 -   
 -    
 +
 +
      /**
       * Represents an AdminUser that has been read and is ready for export.
       */
@@@ -169,10 -185,10 +179,10 @@@
          while ( !done ) {
              writeThread.join( 10000, 0 );
              done = !writeThread.isAlive();
-             logger.info( "Wrote {} users", count.get() );
+             logger.info( "Wrote {} users", userCount.get() );
          }
      }
 -    
 +
  
      @Override
      @SuppressWarnings("static-access")
@@@ -207,10 -223,11 +217,11 @@@
              organizations = em.searchCollection( em.getApplicationRef(), "groups", query );
              for ( Entity organization : organizations.getEntities() ) {
                  execService.submit( new OrgMapWorker( organization ) );
+                 count++;
              }
-             count++;
 -             
++
              if ( count % 1000 == 0 ) {
-                 logger.info("Processed {} orgs for org map", count);
+                 logger.info("Queued {} org map workers", count);
              }
              query.setCursor( organizations.getCursor() );
          }
@@@ -218,8 -235,10 +229,10 @@@
  
          execService.shutdown();
          while ( !execService.awaitTermination( 10, TimeUnit.SECONDS ) ) {
-             logger.info("Processed {} orgs for map", orgMap.size() );
+             logger.info( "Processed {} orgs for map", userToOrgsMap.size() );
          }
 -        
++
+         logger.info("Org map complete, counted {} organizations", count);
      }
  
  
@@@ -235,6 -254,7 +248,7 @@@
              try {
                  final String orgName = orgEntity.getProperty( "path" ).toString();
                  final UUID orgId = orgEntity.getUuid();
 -                
++
                  for (UserInfo user : managementService.getAdminUsersForOrganization( orgEntity.getUuid() )) {
                      try {
                          Entity admin = managementService.getAdminUserEntityByUuid( user.getUuid() );
@@@ -297,10 -332,34 +326,34 @@@
                      AdminUserWriteTask task = new AdminUserWriteTask();
                      task.adminUser = entity;
  
-                     addDictionariesToTask(  task, entity );
+                     addDictionariesToTask( task, entity );
                      addOrganizationsToTask( task );
  
-                     writeQueue.add( task );
+                     String actionTaken = "Processed";
+ 
+                     if (ignoreInvalidUsers && (task.orgNamesByUuid.isEmpty()
+                             || task.dictionariesByName.isEmpty()
+                             || task.dictionariesByName.get( "credentials" ).isEmpty())) {
 -                        
++
+                         actionTaken = "Ignored";
 -                        
++
+                     } else {
+                         writeQueue.add( task );
+                     }
+ 
 -                    Map<String, Object> creds = (Map<String, Object>) (task.dictionariesByName.isEmpty() ? 
++                    Map<String, Object> creds = (Map<String, Object>) (task.dictionariesByName.isEmpty() ?
+                                                 0 : task.dictionariesByName.get( "credentials" ));
 -                    
++
+                     logger.error( "{} admin user {}:{}:{} has organizations={} dictionaries={} credentials={}",
+                             new Object[]{
+                                     actionTaken,
+                                     task.adminUser.getProperty( "username" ),
+                                     task.adminUser.getProperty( "email" ),
+                                     task.adminUser.getUuid(),
+                                     task.orgNamesByUuid.size(),
+                                     task.dictionariesByName.size(),
+                                     creds == null ? 0 : creds.size()
 -                            } ); 
++                            } );
  
                  } catch ( Exception e ) {
                      logger.error("Error reading data for user " + uuid, e );
@@@ -344,22 -391,17 +385,32 @@@
  
              task.orgNamesByUuid = managementService.getOrganizationsForAdminUser( task.adminUser.getUuid() );
  
++<<<<<<< HEAD
 +            List<Org> orgs = orgMap.get( task.adminUser.getProperty( "username" ).toString().toLowerCase() );
 +
++=======
+             List<Org> orgs = userToOrgsMap.get( task.adminUser.getProperty( "username" ).toString().toLowerCase() );
 -            
++
++>>>>>>> master
              if ( orgs != null && task.orgNamesByUuid.size() < orgs.size() ) {
 -                
++
+                 // list of orgs from getOrganizationsForAdminUser() is less than expected, use userToOrgsMap
                  BiMap<UUID, String> bimap = HashBiMap.create();
                  for (Org org : orgs) {
                      bimap.put( org.orgId, org.orgName );
                  }
                  task.orgNamesByUuid = bimap;
              }
++<<<<<<< HEAD
 +
 +            if ( task.orgNamesByUuid.isEmpty() ) {
 +                logger.error("{}:{}:{} has no orgs", new Object[] {
 +                        task.adminUser.getProperty("username"),
 +                        task.adminUser.getProperty("email"),
 +                        task.adminUser.getUuid() } );
 +            }
++=======
++>>>>>>> master
          }
      }
  
@@@ -485,6 -527,10 +536,10 @@@
                  jg.writeObject( orgs.get( uuid ) );
  
                  jg.writeEndObject();
 -                
++
+                 synchronized (orgsWritten) {
+                     orgsWritten.add( uuid );
+                 }
              }
  
              jg.writeEndArray();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/066d7db4/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
----------------------------------------------------------------------
diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
index 7f8dd1b,3c427e1..c97dc9c
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
@@@ -50,8 -50,8 +50,8 @@@ import static org.apache.usergrid.utils
  
  
  /**
-  * Base class for Usergrid Tools commands. Any class that implements this can be called with 
 - * Base class for Usergrid Tools commands. Any class that implements this can be called with java -jar {jarname}
 - * org.apache.usergrid.tools.{classname}.
++ * Base class for Usergrid Tools commands. Any class that implements this can be called with
 + * java -jar {jarname} org.apache.usergrid.tools.{classname}.
   */
  public abstract class ToolBase {
  

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/066d7db4/stack/tools/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --cc stack/tools/src/main/resources/log4j.properties
index 00834cf,def47b4..18ebcc4
--- a/stack/tools/src/main/resources/log4j.properties
+++ b/stack/tools/src/main/resources/log4j.properties
@@@ -26,23 -26,8 +26,28 @@@ log4j.appender.stdout=org.apache.log4j.
  log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) [%c] - %m%n
  
++<<<<<<< HEAD
 +log4j.category.org.apache.usergrid.tools=TRACE
 +log4j.category.org.apache.usergrid=ERROR
 +
 +log4j.logger.org.apache.usergrid.persistence.cassandra.DB=WARN, stdout
 +log4j.logger.org.apache.usergrid.persistence.cassandra.BATCH=WARN, stdout
 +log4j.logger.org.apache.usergrid.persistence.cassandra.EntityManagerFactoryImpl=WARN, stdout
 +log4j.logger.org.apache.usergrid.persistence.cassandra.DaoUtils=WARN, stdout
 +log4j.logger.org.apache.usergrid.persistence.cassandra.EntityManagerImpl=WARN, stdout
 +log4j.logger.org.apache.usergrid.persistence.cassandra.ConnectionRefImpl=WARN, stdout
 +log4j.logger.me.prettyprint.cassandra.hector.TimingLogger=WARN, stdout
 +log4j.logger.org.apache.usergrid.rest.security.AllowAjaxFilter=WARN, stdout
 +log4j.logger.me.prettyprint.hector.api.beans.AbstractComposite=ERROR, stdout
 +#log4j.logger.org.apache.usergrid.locking.singlenode.SingleNodeLockManagerImpl=DEBUG, stdout
 +#log4j.logger.org.apache.usergrid.persistence.hector.CountingMutator=INFO, stdout
 +
 +log4j.logger.org.apache.usergrid.management.cassandra=DEBUB
 +log4j.logger.org.apache.usergrid.tools=INFO
++=======
+ log4j.logger.org.apache.usergrid=INFO
+ log4j.logger.org.apache.usergrid.tools=DEBUG
++>>>>>>> master
  
  log4j.logger.org.apache.usergrid.management.cassandra=WARN
  log4j.logger.org.apache.usergrid.persistence.cassandra.DB=WARN


[12/18] incubator-usergrid git commit: Merge branch 'master' into rxportapp

Posted by sn...@apache.org.
Merge branch 'master' into rxportapp


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2748347a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2748347a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2748347a

Branch: refs/heads/two-dot-o-dev
Commit: 2748347a3e3e19b4db3467e429ab9e27f5742892
Parents: b58390d e1b352e
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Jul 20 16:00:55 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Jul 20 16:00:55 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportAdmins.java | 37 ++++++++------------
 .../org/apache/usergrid/tools/ImportAdmins.java |  5 +--
 2 files changed, 17 insertions(+), 25 deletions(-)
----------------------------------------------------------------------



[08/18] incubator-usergrid git commit: Duplicate user merge.

Posted by sn...@apache.org.
Duplicate user merge.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/16de78d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/16de78d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/16de78d9

Branch: refs/heads/two-dot-o-dev
Commit: 16de78d9d5a42941c76907ffe2986b16c7195976
Parents: e1b352e
Author: Dave Johnson <sn...@apache.org>
Authored: Thu Jul 16 13:56:17 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Thu Jul 16 13:56:17 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportAdmins.java |   2 +
 .../org/apache/usergrid/tools/ImportAdmins.java | 203 ++++++++++++++-----
 stack/tools/src/main/resources/log4j.properties |   2 +-
 .../usergrid/tools/ExportImportAdminsTest.java  |  71 ++++---
 ...adata.usergrid-management.1433331614293.json |  52 +++++
 ...users.usergrid-management.1433331614293.json |  12 ++
 6 files changed, 263 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16de78d9/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
index 2c14da1..ae9c16b 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
@@ -489,6 +489,8 @@ public class ExportAdmins extends ExportingToolBase {
                 jg.writeObject( orgs.get( uuid ) );
 
                 jg.writeEndObject();
+                
+                logger.debug( "Exported organization {}:{}", uuid, orgs.get( uuid ) );
             }
 
             jg.writeEndArray();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16de78d9/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
index 0b693c8..c6aada7 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
@@ -17,6 +17,7 @@
 package org.apache.usergrid.tools;
 
 
+import com.sun.org.apache.bcel.internal.generic.DUP;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
@@ -26,6 +27,8 @@ import org.apache.usergrid.management.OrganizationInfo;
 import org.apache.usergrid.management.UserInfo;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Identifier;
+import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.entities.User;
 import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
 import org.codehaus.jackson.JsonFactory;
@@ -84,6 +87,7 @@ public class ImportAdmins extends ToolBase {
     private Map<Stoppable, Thread> adminAuditThreads = new HashMap<Stoppable, Thread>();
     private Map<Stoppable, Thread> metadataWorkerThreadMap = new HashMap<Stoppable, Thread>();
 
+    Map<UUID, DuplicateUser> dupsByDupUuid = new HashMap<UUID, DuplicateUser>(200);
 
     JsonFactory jsonFactory = new JsonFactory();
 
@@ -94,6 +98,19 @@ public class ImportAdmins extends ToolBase {
     AtomicInteger auditEmptyCount = new AtomicInteger( 0 );
     AtomicInteger metadataEmptyCount = new AtomicInteger( 0 );
     
+    
+    static class DuplicateUser {
+        String email;
+        String username;
+        public DuplicateUser( String propName, Map<String, Object> user ) {
+            if ( "email".equals(propName)) {
+                email = user.get("email").toString();
+            } else {
+                username = user.get("username").toString();
+            }
+        }
+    }
+    
 
 
     @Override
@@ -382,7 +399,7 @@ public class ImportAdmins extends ToolBase {
                 String entityOwnerId = jp.getCurrentName();
 
                 try {
-                    EntityRef entityRef = em.get( UUID.fromString( entityOwnerId ) );
+                    EntityRef entityRef = new SimpleEntityRef( "user", UUID.fromString( entityOwnerId ) );
                     Map<String, Object> metadata = (Map<String, Object>) jp.readValueAs( Map.class );
                     
                     workQueue.put( new ImportMetadataTask( entityRef, metadata ) );
@@ -408,31 +425,33 @@ public class ImportAdmins extends ToolBase {
     private void importEntityMetadata(
             EntityManager em, EntityRef entityRef, Map<String, Object> metadata) throws Exception {
 
-        List<Object> organizationsList = (List<Object>) metadata.get("organizations");
-        if (organizationsList != null && !organizationsList.isEmpty()) {
+        DuplicateUser dup = dupsByDupUuid.get( entityRef.getUuid() );
+        
+        if ( dup == null ) { // not a duplicate
 
-            User user = em.get(entityRef, User.class);
-            
-            if (user == null) {
-                logger.error("User not found, not adding to organizations: " 
-                    + (entityRef == null ? null : entityRef.getUuid()));
+            User user = em.get( entityRef, User.class );
+            final UserInfo userInfo = managementService.getAdminUserByEmail( user.getEmail() );
 
-            } else {
+            if (user == null || userInfo == null) {
+                logger.error( "User {} does not exist, not processing metadata", entityRef.getUuid() );
+                return;
+            }
 
-                final UserInfo userInfo = managementService.getAdminUserByEmail(user.getEmail());
+            List<Object> organizationsList = (List<Object>) metadata.get("organizations");
+            if (organizationsList != null && !organizationsList.isEmpty()) {
 
                 for (Object orgObject : organizationsList) {
 
                     Map<String, Object> orgMap = (Map<String, Object>) orgObject;
-                    UUID orgUuid = UUID.fromString((String) orgMap.get("uuid"));
-                    String orgName = (String) orgMap.get("name");
+                    UUID orgUuid = UUID.fromString( (String) orgMap.get( "uuid" ) );
+                    String orgName = (String) orgMap.get( "name" );
 
-                    // create org only if it does not exist
-                    OrganizationInfo orgInfo = managementService.getOrganizationByUuid(orgUuid);
-                    if (orgInfo == null) {
+                    OrganizationInfo orgInfo = managementService.getOrganizationByUuid( orgUuid );
+
+                    if (orgInfo == null) { // org does not exist yet, create it and add user
                         try {
-                            managementService.createOrganization(orgUuid, orgName, userInfo, false);
-                            orgInfo = managementService.getOrganizationByUuid(orgUuid);
+                            managementService.createOrganization( orgUuid, orgName, userInfo, false );
+                            orgInfo = managementService.getOrganizationByUuid( orgUuid );
 
                             logger.debug( "Created new org {} for user {}",
                                     new Object[]{orgInfo.getName(), user.getEmail()} );
@@ -440,49 +459,107 @@ public class ImportAdmins extends ToolBase {
                         } catch (DuplicateUniquePropertyExistsException dpee) {
                             logger.debug( "Org {} already exists", orgName );
                         }
-                    } else {
+                    } else { // org exists, add original user to it
                         try {
                             managementService.addAdminUserToOrganization( userInfo, orgInfo, false );
                             logger.debug( "Added user {} to org {}", new Object[]{user.getEmail(), orgName} );
-                            
-                        } catch ( Exception e ) {
+
+                        } catch (Exception e) {
                             logger.error( "Error Adding user {} to org {}", new Object[]{user.getEmail(), orgName} );
                         }
                     }
                 }
             }
+            
+            Map<String, Object> dictionariesMap = (Map<String, Object>) metadata.get("dictionaries");
+            if (dictionariesMap != null && !dictionariesMap.isEmpty()) {
+                for (String name : dictionariesMap.keySet()) {
+                    try {
+                        Map<String, Object> dictionary = (Map<String, Object>) dictionariesMap.get(name);
+                        em.addMapToDictionary( entityRef, name, dictionary);
 
-        } else {
-            logger.warn("User {} has no organizations", entityRef.getUuid() );
-        }
-
-        Map<String, Object> dictionariesMap = (Map<String, Object>) metadata.get("dictionaries");
-
-        if (dictionariesMap != null && !dictionariesMap.isEmpty()) {
-            for (String name : dictionariesMap.keySet()) {
-                try {
-                    Map<String, Object> dictionary = (Map<String, Object>) dictionariesMap.get(name);
-                    em.addMapToDictionary( entityRef, name, dictionary);
-
-                    logger.debug( "Creating dictionary for {} name {}",
-                            new Object[]{entityRef, name} );
+                        logger.debug( "Creating dictionary for {} name {}",
+                                new Object[]{entityRef, name} );
 
-                } catch (Exception e) {
-                    if (logger.isDebugEnabled()) {
-                        logger.error("Error importing dictionary name "
-                                + name + " for user " + entityRef.getUuid(), e);
-                    } else {
-                        logger.error("Error importing dictionary name "
-                                + name + " for user " + entityRef.getUuid());
+                    } catch (Exception e) {
+                        if (logger.isDebugEnabled()) {
+                            logger.error("Error importing dictionary name "
+                                    + name + " for user " + entityRef.getUuid(), e);
+                        } else {
+                            logger.error("Error importing dictionary name "
+                                    + name + " for user " + entityRef.getUuid());
+                        }
                     }
                 }
+
+            } else {
+                logger.warn("User {} has no dictionaries", entityRef.getUuid() );
             }
             
-        } else {
-            logger.warn("User {} has no dictionaries", entityRef.getUuid() );
-        }
+        } else { // this is a duplicate user, so merge orgs
+
+            logger.info("Processing duplicate username={} email={}", dup.email, dup.username );
+           
+            Identifier identifier = dup.email != null ? 
+                Identifier.fromEmail( dup.email ) : Identifier.from( dup.username );
+            User originalUser = em.get( em.getUserByIdentifier(identifier), User.class );
+
+            // get map of original user's orgs
+            
+            UserInfo originalUserInfo = managementService.getAdminUserByEmail( originalUser.getEmail() );
+            Map<String, Object> originalUserOrgData =
+                    managementService.getAdminUserOrganizationData( originalUser.getUuid() );
+            Map<String, Map<String, Object>> originalUserOrgs =
+                    (Map<String, Map<String, Object>>) originalUserOrgData.get( "organizations" );
 
+            // loop through duplicate user's orgs and give orgs to original user
 
+            List<Object> organizationsList = (List<Object>) metadata.get("organizations");
+            for (Object orgObject : organizationsList) {
+
+                Map<String, Object> orgMap = (Map<String, Object>) orgObject;
+                UUID orgUuid = UUID.fromString( (String) orgMap.get( "uuid" ) );
+                String orgName = (String) orgMap.get( "name" );
+
+                if (originalUserOrgs.get( orgName ) == null) { // original user does not have this org
+
+                    OrganizationInfo orgInfo = managementService.getOrganizationByUuid( orgUuid );
+                    
+                    if (orgInfo == null) { // org does not exist yet, create it and add original user to it
+                        try {
+                            managementService.createOrganization( orgUuid, orgName, originalUserInfo, false );
+                            orgInfo = managementService.getOrganizationByUuid( orgUuid );
+
+                            logger.debug( "Created new org {} for user {}:{}:{} from duplicate user {}:{}",
+                                new Object[]{
+                                        orgInfo.getName(),
+                                        originalUser.getUuid(), originalUser.getUsername(), originalUser.getEmail(),
+                                        dup.username, dup.email
+                                });
+
+                        } catch (DuplicateUniquePropertyExistsException dpee) {
+                            logger.debug( "Org {} already exists", orgName );
+                        }
+                    } else { // org exists so add original user to it
+                        try {
+                            managementService.addAdminUserToOrganization( originalUserInfo, orgInfo, false );
+                            logger.debug( "Added to org user {}:{}:{} from duplicate user {}:{}",
+                                    new Object[]{
+                                            orgInfo.getName(),
+                                            originalUser.getUuid(), originalUser.getUsername(), originalUser.getEmail(),
+                                            dup.username, dup.email
+                                    });
+
+                        } catch (Exception e) {
+                            logger.error( "Error Adding user {} to org {}", 
+                                    new Object[]{originalUserInfo.getEmail(), orgName} );
+                        }
+                    }
+                    
+                } // else original user already has this org
+                
+            }
+        }
     }
 
 
@@ -685,15 +762,15 @@ public class ImportAdmins extends ToolBase {
 
                     // Import/create the entity
                     UUID uuid = getId(entityProps);
-                    String type = getType(entityProps);
+                    String type = getType( entityProps );
 
                     try {
                         long startTime = System.currentTimeMillis();
                         
                         em.create(uuid, type, entityProps);
 
-                        logger.debug( "Imported admin user {} / {}",
-                            new Object[] { uuid, entityProps.get( "username" ) } );
+                        logger.debug( "Imported admin user {}:{}:{}",
+                            new Object[] { uuid, entityProps.get( "username" ), entityProps.get("email") } );
 
                         userCount.getAndIncrement();
                         auditQueue.put(entityProps);
@@ -709,19 +786,37 @@ public class ImportAdmins extends ToolBase {
                         }
                         
                     } catch (DuplicateUniquePropertyExistsException de) {
-                        logger.warn("Unable to create admin user {}:{}, duplicate property {}",
-                                new Object[]{ uuid, entityProps.get("username"), de.getPropertyName() });
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Exception", de);
-                        }
+                        String dupProperty = de.getPropertyName();
+                        handleDuplicateAccount( em, dupProperty, entityProps );
+                        continue;
+
+                        
                     } catch (Exception e) {
-                        e.printStackTrace();
+                        logger.error("Error", e);
                     }
                 } catch (InterruptedException e) {
-                    e.printStackTrace();
+                    logger.error( "Error", e );
                 }
 
             }
         }
+
+        
+        private void handleDuplicateAccount(EntityManager em, String dupProperty, Map<String, Object> entityProps ) {
+
+            logger.info( "Processing duplicate user {}:{}:{} with duplicate {}", new Object[]{
+                    entityProps.get( "uuid" ), entityProps.get( "username" ), entityProps.get( "email" ), dupProperty} );
+           
+            UUID dupUuid = UUID.fromString( entityProps.get("uuid").toString() );
+            try {
+                dupsByDupUuid.put( dupUuid, new DuplicateUser( dupProperty, entityProps ) );
+                
+            } catch (Exception e) {
+                logger.error("Error processing dup user {}:{}:{}",
+                        new Object[] {entityProps.get( "username" ), entityProps.get("email"), dupUuid});
+                return;
+            }
+
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16de78d9/stack/tools/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/resources/log4j.properties b/stack/tools/src/main/resources/log4j.properties
index 6cf0a92..80c32a1 100644
--- a/stack/tools/src/main/resources/log4j.properties
+++ b/stack/tools/src/main/resources/log4j.properties
@@ -26,7 +26,7 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) [%c] - %m%n
 
-log4j.logger.org.apache.usergrid.tools=INFO
+log4j.logger.org.apache.usergrid.tools=DEBUG
 
 log4j.logger.org.apache.usergrid.management.cassandra=WARN
 log4j.logger.org.apache.usergrid.persistence.cassandra.DB=WARN

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16de78d9/stack/tools/src/test/java/org/apache/usergrid/tools/ExportImportAdminsTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportImportAdminsTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportImportAdminsTest.java
index 898a97d..9cce040 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportImportAdminsTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportImportAdminsTest.java
@@ -4,7 +4,7 @@
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License.  You may obtain a copy of the License at:223
  *
  *      http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -26,6 +26,8 @@ import org.apache.usergrid.ServiceITSuite;
 import org.apache.usergrid.management.OrganizationInfo;
 import org.apache.usergrid.management.OrganizationOwnerInfo;
 import org.apache.usergrid.management.UserInfo;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.utils.UUIDUtils;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -40,13 +42,13 @@ import java.io.FilenameFilter;
 import java.util.*;
 
 import static junit.framework.TestCase.assertNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.apache.usergrid.persistence.cassandra.CassandraService.MANAGEMENT_APPLICATION_ID;
+import static org.junit.Assert.*;
 
 
 public class ExportImportAdminsTest {
     static final Logger logger = LoggerFactory.getLogger( ExportImportAdminsTest.class );
-
+    
     @ClassRule
     public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
 
@@ -146,21 +148,28 @@ public class ExportImportAdminsTest {
     public void testImportAdminUsersAndOrgs() throws Exception {
 
         // first: generate the data file with unique user and org IDs and names
+        
+        // data contains three users each with a unique org, one user has a duplicate email
 
         String rand1 = RandomStringUtils.randomAlphanumeric( 10 );
         String rand2 = RandomStringUtils.randomAlphanumeric( 10 );
+        String rand3 = RandomStringUtils.randomAlphanumeric( 10 );
 
         UUID user_uuid_1 = UUIDUtils.newTimeUUID();
         UUID user_uuid_2 = UUIDUtils.newTimeUUID();
+        UUID user_uuid_3 = UUIDUtils.newTimeUUID();
 
         UUID org_uuid_1  = UUIDUtils.newTimeUUID();
         UUID org_uuid_2  = UUIDUtils.newTimeUUID();
+        UUID org_uuid_3  = UUIDUtils.newTimeUUID();
 
-        String user_name_1 = "user_" + rand1;
-        String user_name_2 = "user_" + rand2;
+        String user_name_1 = "user1_" + rand1;
+        String user_name_2 = "user2_" + rand2;
+        String user_name_3 = "user3_" + rand3;
 
-        String org_name_1  = "org_"  + rand1;
-        String org_name_2  = "org_"  + rand2;
+        String org_name_1  = "org1_"  + rand1;
+        String org_name_2  = "org2_"  + rand2;
+        String org_name_3  = "org3_"  + rand3;
 
         // loop through resource files with prefix 'admin-user' those are the data file templates
 
@@ -179,15 +188,19 @@ public class ExportImportAdminsTest {
 
                 fileContent = fileContent.replaceAll( "USER_UUID_1", user_uuid_1.toString() );
                 fileContent = fileContent.replaceAll( "USER_UUID_2", user_uuid_2.toString() );
+                fileContent = fileContent.replaceAll( "USER_UUID_3", user_uuid_3.toString() );
 
                 fileContent = fileContent.replaceAll( "ORG_UUID_1",  org_uuid_1.toString() );
                 fileContent = fileContent.replaceAll( "ORG_UUID_2",  org_uuid_2.toString() );
+                fileContent = fileContent.replaceAll( "ORG_UUID_3",  org_uuid_3.toString() );
 
                 fileContent = fileContent.replaceAll( "USER_NAME_1", user_name_1 );
                 fileContent = fileContent.replaceAll( "USER_NAME_2", user_name_2 );
+                fileContent = fileContent.replaceAll( "USER_NAME_3", user_name_3 );
 
                 fileContent = fileContent.replaceAll( "ORG_NAME_1", org_name_1 );
                 fileContent = fileContent.replaceAll( "ORG_NAME_2", org_name_2 );
+                fileContent = fileContent.replaceAll( "ORG_NAME_3", org_name_3 );
 
                 FileOutputStream os = new FileOutputStream(
                         tempDir.getAbsolutePath() + File.separator + fileName );
@@ -200,35 +213,45 @@ public class ExportImportAdminsTest {
         // import data from temp directory
 
         ImportAdmins importAdmins = new ImportAdmins();
-        importAdmins.startTool( new String[] {
-            "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
-            "-inputDir", tempDir.getAbsolutePath()
+        importAdmins.startTool( new String[]{
+                "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
+                "-inputDir", tempDir.getAbsolutePath()
         }, false );
 
         // verify that users and orgs were created correctly
 
         OrganizationInfo orgInfo1 = setup.getMgmtSvc().getOrganizationByUuid( org_uuid_1 );
-        assertNotNull( orgInfo1 );
+        assertNotNull( "org 1 exists", orgInfo1 );
+        List<UserInfo> org1_users = setup.getMgmtSvc().getAdminUsersForOrganization( org_uuid_1 );
+        assertEquals("org1 has one user", 1, org1_users.size() );
 
         OrganizationInfo orgInfo2 = setup.getMgmtSvc().getOrganizationByUuid( org_uuid_2 );
-        assertNotNull( orgInfo2 );
+        assertNotNull( "org 2 exists", orgInfo2 );
+        List<UserInfo> org2_users = setup.getMgmtSvc().getAdminUsersForOrganization( org_uuid_2 );
+        assertEquals( "org2 has two users", 2, org2_users.size() );
+        
+        OrganizationInfo orgInfo3 = setup.getMgmtSvc().getOrganizationByUuid( org_uuid_3 );
+        assertNotNull( "org 3 exists", orgInfo3 );
+        List<UserInfo> org3_users = setup.getMgmtSvc().getAdminUsersForOrganization( org_uuid_3 );
+        assertEquals( "org 3 has 1 users", 1, org3_users.size() );
 
         BiMap<UUID, String> user1_orgs = setup.getMgmtSvc().getOrganizationsForAdminUser( user_uuid_1 );
-        assertEquals("user1 has two orgs", 2, user1_orgs.size() );
-
+        assertEquals( "user 1 has 2 orgs", 2, user1_orgs.size() );
+        
         BiMap<UUID, String> user2_orgs = setup.getMgmtSvc().getOrganizationsForAdminUser( user_uuid_2 );
-        assertEquals("user2 has one orgs", 1, user2_orgs.size() );
+        assertEquals( "user 2 has two orgs gained one from duplicate", 2, user2_orgs.size() );
 
-        List<UserInfo> org1_users = setup.getMgmtSvc().getAdminUsersForOrganization( org_uuid_1 );
-        assertEquals("org1 has one user", 1, org1_users.size() );
+        try {
+            BiMap<UUID, String> user3_orgs = setup.getMgmtSvc().getOrganizationsForAdminUser( user_uuid_3 );
+            fail("fetch user 3 should have thrown exception");
+        } catch ( Exception expected ) {
+            logger.info("EXCEPTION EXPECTED");
+        }
 
-        List<UserInfo> org2_users = setup.getMgmtSvc().getAdminUsersForOrganization( org_uuid_2 );
-        assertEquals("org2 has two users", 2, org2_users.size() );
+        EntityManager em = setup.getEmf().getEntityManager( MANAGEMENT_APPLICATION_ID );
+        Entity user3 = em.get( user_uuid_3 );
+        assertNull( "duplicate user does not exist", user3 );
 
-        UserInfo user1info = setup.getMgmtSvc().getAdminUserByUuid( user_uuid_1 );
-        Map<String, Object> user1_data = setup.getMgmtSvc().getAdminUserOrganizationData( user1info, false );
-        Map<String, Object> user1_data_orgs = (Map<String, Object>)user1_data.get("organizations");
-        assertEquals( 2, user1_data_orgs.size());
 
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16de78d9/stack/tools/src/test/resources/admin-user-metadata.usergrid-management.1433331614293.json
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/resources/admin-user-metadata.usergrid-management.1433331614293.json b/stack/tools/src/test/resources/admin-user-metadata.usergrid-management.1433331614293.json
index 320f8ed..86d7363 100644
--- a/stack/tools/src/test/resources/admin-user-metadata.usergrid-management.1433331614293.json
+++ b/stack/tools/src/test/resources/admin-user-metadata.usergrid-management.1433331614293.json
@@ -81,5 +81,57 @@
         }
       }
     }
+  }, 
+  "USER_UUID_3" : {
+    "activities": [],
+    "devices": [],
+    "feed": [
+      "4c8fee64-09e5-11e5-b3c6-57cd4e12c0b1"
+    ],
+    "groups": [
+      "4c88c26a-09e5-11e5-8a66-594dd93a503d"
+    ],
+    "roles": [],
+    "connections": {},
+    "organizations": [
+      {
+        "uuid": "ORG_UUID_3",
+        "name": "ORG_NAME_3"
+      }
+    ],
+    "dictionaries": {
+      "credentials": {
+        "mongo_pwd": {
+          "recoverable": true,
+          "encrypted": false,
+          "secret": "e7b4fc7db5b97088997e44eced015d42",
+          "hashType": null,
+          "created": 1433331614067,
+          "cryptoChain": [
+            "plaintext"
+          ]
+        },
+        "password": {
+          "recoverable": false,
+          "encrypted": true,
+          "secret": "JDJhJDA5JER0RTdNSldMRjkxSUlJVm5hZWJMTy5DelFLemwvd2tXdUttaHViZWdyRjRURVdxYk5TUGJt",
+          "hashType": null,
+          "created": 1433331614018,
+          "cryptoChain": [
+            "bcrypt"
+          ]
+        },
+        "secret": {
+          "recoverable": true,
+          "encrypted": false,
+          "secret": "YWQ6Rx9A-m5U-TihpkPVS4PmyQO4qig",
+          "hashType": null,
+          "created": 1433331614067,
+          "cryptoChain": [
+            "plaintext"
+          ]
+        }
+      }
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16de78d9/stack/tools/src/test/resources/admin-users.usergrid-management.1433331614293.json
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/resources/admin-users.usergrid-management.1433331614293.json b/stack/tools/src/test/resources/admin-users.usergrid-management.1433331614293.json
index 5f192bf..3f15986 100644
--- a/stack/tools/src/test/resources/admin-users.usergrid-management.1433331614293.json
+++ b/stack/tools/src/test/resources/admin-users.usergrid-management.1433331614293.json
@@ -20,4 +20,16 @@
   "activated" : true,
   "confirmed" : true,
   "disabled" : false
+}, {
+  "uuid" : "USER_UUID_3",
+  "type" : "user",
+  "name" : "USER_NAME_3",
+  "created" : 1433331614002,
+  "modified" : 1433331614002,
+  "username" : "USER_NAME_3",
+  "comment" : "this is a duplicate user, has same email address as user2",
+  "email" : "USER_NAME_2@example.com",
+  "activated" : true,
+  "confirmed" : true,
+  "disabled" : false
 } ]
\ No newline at end of file


[05/18] incubator-usergrid git commit: Less test data and code to compare 1 read and 1 write thread vs. 100 read and 100 write threads.

Posted by sn...@apache.org.
Less test data and code to compare 1 read and 1 write thread vs. 100 read and 100 write threads.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7b168b91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7b168b91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7b168b91

Branch: refs/heads/two-dot-o-dev
Commit: 7b168b91d99ba51da452a9c7980b0078f733df03
Parents: a25f8eb
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 16:41:06 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 16:41:06 2015 -0400

----------------------------------------------------------------------
 .../apache/usergrid/tools/ExportAppTest.java    | 24 ++++++++++++++------
 1 file changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b168b91/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index d1c5c1f..eeaae13 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -41,9 +41,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class ExportAppTest {
     static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
     
-    int NUM_COLLECTIONS = 20;
-    int NUM_ENTITIES = 200; 
-    int NUM_CONNECTIONS = 5;
+    int NUM_COLLECTIONS = 10;
+    int NUM_ENTITIES = 50; 
+    int NUM_CONNECTIONS = 3;
 
     @ClassRule
     public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
@@ -113,12 +113,22 @@ public class ExportAppTest {
         ExportApp exportApp = new ExportApp();
         exportApp.startTool( new String[]{
                 "-application", appInfo.getName(),
-                "-readThreads", "50",
-                "-writeThreads", "10",
+                "-readThreads", "100",
+                "-writeThreads", "100",
                 "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
                 "-outputDir", directoryName
         }, false );
-        
-        logger.info("time = " + (System.currentTimeMillis() - start)/1000 + "s");
+
+        logger.info("100 read and 100 write threads = " + (System.currentTimeMillis() - start)/1000 + "s");
+
+        exportApp.startTool( new String[]{
+                "-application", appInfo.getName(),
+                "-readThreads", "1",
+                "-writeThreads", "1",
+                "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
+                "-outputDir", directoryName + "1"
+        }, false );
+
+        logger.info("1 thread time = " + (System.currentTimeMillis() - start)/1000 + "s");
     }
 }
\ No newline at end of file


[02/18] incubator-usergrid git commit: Add readThread and writeThread CLI parameters.

Posted by sn...@apache.org.
Add readThread and writeThread CLI parameters.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2b65e619
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2b65e619
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2b65e619

Branch: refs/heads/two-dot-o-dev
Commit: 2b65e619316b206720e574a21fe1b33462e0f2dd
Parents: b2bdbb5
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 08:03:18 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 08:03:18 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportApp.java    | 153 +++++++++++++------
 1 file changed, 106 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2b65e619/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index ceb3ecd..21c63a0 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.utils.StringUtils;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.util.MinimalPrettyPrinter;
@@ -46,11 +47,22 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 
 /**
- * Export application's collections.
+ * Export all entities and connections of a Usergrid app.
+ * 
+ * Exports data files to specified directory.
+ * 
+ * Will create as many output files as there are writeThreads (by default: 10).
+ * 
+ * Will create two types of files: *.uge for Usegrird entities and *.ugc for entity to entity connections.
+ * 
+ * Every line of the data files is a complete JSON object.
  */
 public class ExportApp extends ExportingToolBase {
     static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
     
+    private static final String READ_THREAD_COUNT = "readThreads";
+    private static final String WRITE_THREAD_COUNT = "writeThreads";
+
     // we will write two types of files: entities and connections
     BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
     BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
@@ -58,10 +70,15 @@ public class ExportApp extends ExportingToolBase {
     static final String APPLICATION_NAME = "application";
     
     int pollTimeoutSeconds = 10;
+    
+    int readThreadCount = 80; 
+    int writeThreadCount = 10;
+    
+    String applicationName;
+    String organizationName;
 
     // limiting output threads will limit output files 
-    final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(8);
-    final Scheduler scheduler = Schedulers.from( threadPoolExecutor );
+    Scheduler readScheduler;
 
     Map<Thread, JsonGenerator> entityGeneratorsByThread  = new HashMap<Thread, JsonGenerator>();
     Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
@@ -77,29 +94,37 @@ public class ExportApp extends ExportingToolBase {
     ObjectMapper mapper = new ObjectMapper();
     
     /**
-     * Export admin users using multiple threads.
-     * <p/>
-     * How it works:
-     * In main thread we query for IDs of all admin users, add each ID to read queue.
-     * Read-queue workers read admin user data, add data to write queue.
-     * One write-queue worker reads data writes to file.
+     * Tool entry point. 
      */
     @Override
     public void runTool(CommandLine line) throws Exception {
         
-        String applicationName = line.getOptionValue( APPLICATION_NAME );
-
-//        if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
-//            try {
-//                readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
-//            } catch (NumberFormatException nfe) {
-//                logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
-//                return;
-//            }
-//        } else {
-//            readThreadCount = 20;
-//        }
+        applicationName = line.getOptionValue( APPLICATION_NAME );
+
+        if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
+            try {
+                readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
+            } catch (NumberFormatException nfe) {
+                logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+                return;
+            }
+        }
         
+        if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
+            try {
+                writeThreadCount = Integer.parseInt( line.getOptionValue( WRITE_THREAD_COUNT ) );
+            } catch (NumberFormatException nfe) {
+                logger.error( "-" + WRITE_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+                return;
+            }
+        }
+
+        ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
+        readScheduler = Schedulers.from( readThreadPoolExecutor );
+        
+        ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
+        final Scheduler writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+       
         startSpring();
 
         setVerbose( line );
@@ -111,6 +136,7 @@ public class ExportApp extends ExportingToolBase {
        
         UUID applicationId = emf.lookupApplication( applicationName );
         final EntityManager em = emf.getEntityManager( applicationId );
+        organizationName = em.getApplication().getOrganizationName();
 
         // start write queue workers
 
@@ -119,18 +145,18 @@ public class ExportApp extends ExportingToolBase {
         entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
             public Observable<ExportEntity> call(ExportEntity exportEntity) {
                 return Observable.just(exportEntity).doOnNext( 
-                        new EntityWriteAction() ).subscribeOn( scheduler );
+                        new EntityWriteAction() ).subscribeOn( writeScheduler );
             }
-        },10).subscribeOn( scheduler ).subscribe();
+        },10).subscribeOn( writeScheduler ).subscribe();
 
         ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
         rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
         connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
             public Observable<ExportConnection> call(ExportConnection connection ) {
                 return Observable.just(connection).doOnNext( 
-                        new ConnectionWriteAction()).subscribeOn( scheduler );
+                        new ConnectionWriteAction()).subscribeOn( writeScheduler );
             }
-        },10).subscribeOn( scheduler ).subscribe();
+        },10).subscribeOn( writeScheduler ).subscribe();
 
         // start processing data and filling up write queues
 
@@ -139,9 +165,9 @@ public class ExportApp extends ExportingToolBase {
         collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
             public Observable<String> call(String collection) {
                 return Observable.just(collection).doOnNext( 
-                        new CollectionAction( em ) ).subscribeOn( Schedulers.io() );
+                        new CollectionAction( em ) ).subscribeOn( readScheduler );
             }
-        },40).subscribeOn( Schedulers.io() ).subscribe();
+        },40).subscribeOn( readScheduler ).subscribe();
         
         // wait for write thread pollers to get started
 
@@ -192,14 +218,18 @@ public class ExportApp extends ExportingToolBase {
 
         Options options = super.createOptions();
 
-        Option readThreads = OptionBuilder.hasArg().withType("")
+        Option appNameOption = OptionBuilder.hasArg().withType("")
                 .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
-        options.addOption( readThreads );
-        
-//        Option readThreads = OptionBuilder
-//                .hasArg().withType(0).withDescription("Read Threads -" + READ_THREAD_COUNT).create(READ_THREAD_COUNT);
-//        options.addOption( readThreads );
-        
+        options.addOption( appNameOption );
+
+        Option readThreadsOption = OptionBuilder
+                .hasArg().withType(0).withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+        options.addOption( readThreadsOption );
+
+        Option writeThreadsOption = OptionBuilder
+                .hasArg().withType(0).withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+        options.addOption( writeThreadsOption );
+
         return options;
     }
 
@@ -276,9 +306,13 @@ public class ExportApp extends ExportingToolBase {
                                 }
                                 dictionariesByName.put( dictionary, dict );
                             }
+                            
                             ExportEntity exportEntity = new ExportEntity( 
-                                    em.getApplication().getApplicationName(), 
-                                    entity, dictionariesByName );
+                                    organizationName, 
+                                    applicationName, 
+                                    entity, 
+                                    dictionariesByName );
+                            
                             subscriber.onNext( exportEntity );
                             count++;
                             
@@ -333,11 +367,14 @@ public class ExportApp extends ExportingToolBase {
 
                     for (Entity connectedEntity : results.getEntities()) {
                         try {
+                            
                             ExportConnection connection = new ExportConnection( 
-                                    em.getApplication().getApplicationName(), 
+                                    applicationName,
+                                    organizationName,
                                     connectionType, 
                                     exportEntity.getEntity().getUuid(), 
                                     connectedEntity.getUuid());
+                            
                             subscriber.onNext( connection );
                             count++;
 
@@ -379,9 +416,9 @@ public class ExportApp extends ExportingToolBase {
             entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
                 public Observable<ExportEntity> call(ExportEntity exportEntity) {
                     return Observable.just(exportEntity).doOnNext( 
-                            new EntityAction( em ) ).subscribeOn( Schedulers.io() );
+                            new EntityAction( em ) ).subscribeOn( readScheduler );
                 }
-            }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+            }, 8).subscribeOn(readScheduler).toBlocking().last();
         }
     }
 
@@ -413,9 +450,9 @@ public class ExportApp extends ExportingToolBase {
                     entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
                         public Observable<ExportConnection> call(ExportConnection connection) {
                             return Observable.just(connection).doOnNext( 
-                                    new ConnectionsAction() ).subscribeOn( Schedulers.io() );
+                                    new ConnectionsAction() ).subscribeOn(readScheduler);
                         }
-                    }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+                    }, 8).subscribeOn(readScheduler).toBlocking().last();
                 }
                 
             } catch (Exception e) {
@@ -472,7 +509,7 @@ public class ExportApp extends ExportingToolBase {
                 count++;
             }
 
-            logger.info("Done. Wrote {} entities", count);
+            logger.info("Done. De-queued {} entities", count);
             if ( count > 0 ) {
                 subscriber.onCompleted();
             } else {
@@ -513,7 +550,7 @@ public class ExportApp extends ExportingToolBase {
                 count++;
             }
 
-            logger.info("Done. Wrote {} connections", count);
+            logger.info("Done. De-queued {} connections", count);
             if ( count > 0 ) {
                 subscriber.onCompleted();
             } else {
@@ -531,7 +568,8 @@ public class ExportApp extends ExportingToolBase {
 
             boolean wroteData = false;
 
-            String fileName = "target/" + Thread.currentThread().getName() + ".ude";
+            String [] parts = Thread.currentThread().getName().split("-");
+            String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".entities";
 
             JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
             if ( gen == null ) {
@@ -572,7 +610,8 @@ public class ExportApp extends ExportingToolBase {
 
             boolean wroteData = false;
 
-            String fileName = "target/" + Thread.currentThread().getName() + ".ugc";
+            String [] parts = Thread.currentThread().getName().split("-");
+            String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".connections";
 
             JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
             if ( gen == null ) {
@@ -607,10 +646,12 @@ public class ExportApp extends ExportingToolBase {
 }
 
 class ExportEntity {
+    private String organization;
     private String application;
     private Entity entity;
     private Map<String, Object> dictionaries;
-    public ExportEntity( String application, Entity entity, Map<String, Object> dictionaries ) {
+    public ExportEntity( String organization, String application, Entity entity, Map<String, Object> dictionaries ) {
+        this.organization = organization;
         this.application = application;
         this.entity = entity;
         this.dictionaries = dictionaries;
@@ -639,14 +680,24 @@ class ExportEntity {
     public void setDictionaries(Map<String, Object> dictionaries) {
         this.dictionaries = dictionaries;
     }
+
+    public String getOrganization() {
+        return organization;
+    }
+
+    public void setOrganization(String organization) {
+        this.organization = organization;
+    }
 }
 
 class ExportConnection {
+    private String organization;
     private String application;
     private String connectionType;
     private UUID sourceUuid;
     private UUID targetUuid;
-    public ExportConnection(String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+    public ExportConnection(String organization, String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+        this.organization= organization;
         this.application = application;
         this.connectionType = connectionType;
         this.sourceUuid = sourceUuid;
@@ -684,4 +735,12 @@ class ExportConnection {
     public void setTargetUuid(UUID targetUuid) {
         this.targetUuid = targetUuid;
     }
+
+    public String getOrganization() {
+        return organization;
+    }
+
+    public void setOrganization(String organization) {
+        this.organization = organization;
+    }
 }


[04/18] incubator-usergrid git commit: Some reformatting. Also eliminating use of subscriber.unsubscribe(). All observables need to wrap up with onCompleted().

Posted by sn...@apache.org.
Some reformatting. Also eliminating use of subscriber.unsubscribe(). All observables need to wrap up with onCompleted().


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a25f8ebc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a25f8ebc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a25f8ebc

Branch: refs/heads/two-dot-o-dev
Commit: a25f8ebc2877681897a5ef945d39b685c2d3f9fa
Parents: 7a870d6
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 15:31:07 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 15:31:07 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportApp.java    | 113 ++++++++++---------
 stack/tools/src/main/resources/log4j.properties |   5 +-
 .../apache/usergrid/tools/ExportAppTest.java    |  89 ++++-----------
 3 files changed, 81 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a25f8ebc/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index 59509c0..c302a74 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -16,7 +16,6 @@
  */
 package org.apache.usergrid.tools;
 
-
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
@@ -82,8 +81,30 @@ public class ExportApp extends ExportingToolBase {
     // set via CLI
     int readThreadCount = 80;
     int writeThreadCount = 10; // limiting write will limit output files 
-    
 
+
+    @Override
+    @SuppressWarnings("static-access")
+    public Options createOptions() {
+
+        Options options = super.createOptions();
+
+        Option appNameOption = OptionBuilder.hasArg().withType("")
+                .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
+        options.addOption( appNameOption );
+
+        Option readThreadsOption = OptionBuilder.hasArg().withType(0)
+                .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+        options.addOption( readThreadsOption );
+
+        Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
+                .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+        options.addOption( writeThreadsOption );
+
+        return options;
+    }
+
+    
     /**
      * Tool entry point. 
      */
@@ -110,25 +131,25 @@ public class ExportApp extends ExportingToolBase {
             }
         }
 
-        ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
-        readScheduler = Schedulers.from( readThreadPoolExecutor );
-        
-        ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
-        writeScheduler = Schedulers.from( writeThreadPoolExecutor );
-       
-        startSpring();
-
         setVerbose( line );
 
         applyOrgId( line );
         prepareBaseOutputFileName( line );
         outputDir = createOutputParentDir();
         logger.info( "Export directory: " + outputDir.getAbsolutePath() );
-       
+
+        startSpring();
+        
         UUID applicationId = emf.lookupApplication( applicationName );
         final EntityManager em = emf.getEntityManager( applicationId );
         organizationName = em.getApplication().getOrganizationName();
 
+        ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
+        readScheduler = Schedulers.from( readThreadPoolExecutor );
+
+        ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
+        writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+
         Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
         
         collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
@@ -151,30 +172,11 @@ public class ExportApp extends ExportingToolBase {
             .toBlocking().last();
     }
 
-    @Override
-    @SuppressWarnings("static-access")
-    public Options createOptions() {
-
-        Options options = super.createOptions();
-
-        Option appNameOption = OptionBuilder.hasArg().withType("")
-                .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
-        options.addOption( appNameOption );
-
-        Option readThreadsOption = OptionBuilder.hasArg().withType(0)
-                .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
-        options.addOption( readThreadsOption );
-
-        Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
-                .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
-        options.addOption( writeThreadsOption );
-
-        return options;
-    }
 
     // ----------------------------------------------------------------------------------------
     // reading data
 
+    
     /**
      * Emits collection names found in application.
      */
@@ -198,16 +200,13 @@ public class ExportApp extends ExportingToolBase {
             } catch (Exception e) {
                 subscriber.onError( e );
             }
-            if ( count > 0 ) {
-                subscriber.onCompleted();
-                logger.info( "Completed. Read {} collection names", count );
-            } else {
-                subscriber.unsubscribe();
-                logger.info( "No collections found" );
-            }
+            
+            subscriber.onCompleted();
+            logger.info( "Completed. Read {} collection names", count );
         }
     }
 
+    
     /**
      * Emits entities of collection.
      */
@@ -267,13 +266,8 @@ public class ExportApp extends ExportingToolBase {
                     results = em.searchCollection( em.getApplicationRef(), collection, query );
                 }
 
-                if ( count > 0 ) {
-                    subscriber.onCompleted();
-                    logger.info("Completed collection {}. Read {} entities", collection, count);
-                } else {
-                    logger.info("Completed collection {} empty", collection );
-                    subscriber.unsubscribe();
-                }
+                subscriber.onCompleted();
+                logger.info("Completed collection {}. Read {} entities", collection, count);
                 
             } catch ( Exception e ) {
                 subscriber.onError(e);
@@ -281,6 +275,7 @@ public class ExportApp extends ExportingToolBase {
         }
     }
 
+    
     /**
      * Emits connections of an entity.
      */
@@ -331,19 +326,17 @@ public class ExportApp extends ExportingToolBase {
                 subscriber.onError( e );
             }
             
-            if ( count > 0 ) {
-                subscriber.onCompleted();
-                logger.info("Completed entity {} type {} connections count {}",
-                        new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
-                
-            } else {
-                subscriber.unsubscribe();
-                logger.info( "Entity {} type {} has no connections",
-                        exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
-            }
+            subscriber.onCompleted();
+            logger.info("Completed entity {} type {} connections count {}",
+                new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
         }
     }
 
+    
+    // ----------------------------------------------------------------------------------------
+    // writing data
+    
+    
     /**
      * Writes entities to JSON file.
      */
@@ -381,6 +374,7 @@ public class ExportApp extends ExportingToolBase {
         }
     }
 
+    
     /**
      * Writes connection to JSON file.
      */
@@ -418,6 +412,7 @@ public class ExportApp extends ExportingToolBase {
         }
     }
 
+    
     private class FileWrapUpAction implements Action0 {
         @Override
         public void call() {
@@ -448,6 +443,10 @@ public class ExportApp extends ExportingToolBase {
     }
 }
 
+
+/**
+ * Represents entity data to be serialized to JSON.
+ */
 class ExportEntity {
     private String organization;
     private String application;
@@ -493,6 +492,10 @@ class ExportEntity {
     }
 }
 
+
+/**
+ * Represents connection data to be serialized to JSON.
+ */
 class ExportConnection {
     private String organization;
     private String application;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a25f8ebc/stack/tools/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/resources/log4j.properties b/stack/tools/src/main/resources/log4j.properties
index 6cf0a92..def47b4 100644
--- a/stack/tools/src/main/resources/log4j.properties
+++ b/stack/tools/src/main/resources/log4j.properties
@@ -18,7 +18,7 @@
 # and the pattern to %c instead of %l.  (%l is slower.)
 
 # output messages into a rolling log file as well as stdout
-log4j.rootLogger=WARN,stdout
+log4j.rootLogger=ERROR,stdout
 
 # stdout
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
@@ -26,7 +26,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) [%c] - %m%n
 
-log4j.logger.org.apache.usergrid.tools=INFO
+log4j.logger.org.apache.usergrid=INFO
+log4j.logger.org.apache.usergrid.tools=DEBUG
 
 log4j.logger.org.apache.usergrid.management.cassandra=WARN
 log4j.logger.org.apache.usergrid.persistence.cassandra.DB=WARN

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a25f8ebc/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index af8306f..d1c5c1f 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -27,10 +27,7 @@ import org.apache.usergrid.persistence.EntityManager;
 import org.junit.ClassRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import rx.Observable;
 import rx.Scheduler;
-import rx.functions.Action1;
-import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
 import java.util.ArrayList;
@@ -44,9 +41,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class ExportAppTest {
     static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
     
-    int NUM_COLLECTIONS = 5;
-    int NUM_ENTITIES = 10; 
-    int NUM_CONNECTIONS = 1;
+    int NUM_COLLECTIONS = 20;
+    int NUM_ENTITIES = 200; 
+    int NUM_CONNECTIONS = 5;
 
     @ClassRule
     public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
@@ -86,68 +83,25 @@ public class ExportAppTest {
         ExecutorService execService = Executors.newFixedThreadPool( 50);
         final Scheduler scheduler = Schedulers.from( execService );
 
-        Observable.range( 0, NUM_COLLECTIONS ).flatMap( new Func1<Integer, Observable<?>>() {
-            @Override
-            public Observable<?> call(Integer i) {
-                
-                return Observable.just( i ).doOnNext( new Action1<Integer>() {
-                    @Override
-                    public void call(Integer i) {
-                        
-                        final String type = "thing_"+i;
-                        try {
-                            em.createApplicationCollection( type );
-                            connectionCount.getAndIncrement();
-                            
-                        } catch (Exception e) {
-                            throw new RuntimeException( "Error creating collection", e );
-                        }
-                       
-                        Observable.range( 0, NUM_ENTITIES ).flatMap( new Func1<Integer, Observable<?>>() {
-                            @Override
-                            public Observable<?> call(Integer j) {
-                                return Observable.just( j ).doOnNext( new Action1<Integer>() {
-                                    @Override
-                                    public void call(Integer j) {
-                                        
-                                        final String name = "thing_" + j;
-                                        try {
-                                            final Entity source = em.create( 
-                                                    type, new HashMap<String, Object>() {{ put("name", name); }});
-                                            entitiesCount.getAndIncrement();
-                                            logger.info( "Created entity {} type {}", name, type );
-                                            
-                                            for ( Entity target : connectedThings ) {
-                                                em.createConnection( source, "has", target );
-                                                connectionCount.getAndIncrement();
-                                                logger.info( "Created connection from entity {} type {} to {}",
-                                                        new Object[]{name, type, target.getName()} );
-                                            }
-
-
-                                        } catch (Exception e) {
-                                            throw new RuntimeException( "Error creating collection", e );
-                                        }
-                                        
-                                        
-                                    }
-                                    
-                                } );
-
-                            }
-                        }, 50 ).subscribeOn( scheduler ).subscribe(); // toBlocking().last();
-                        
-                    }
-                } );
-                
+        for (int i = 0; i < NUM_COLLECTIONS; i++) {
 
-            }
-        }, 30 ).subscribeOn( scheduler ).toBlocking().last();
+            final String type = "thing_" + i;
+            em.createApplicationCollection( type );
+            connectionCount.getAndIncrement();
+
+            for (int j = 0; j < NUM_ENTITIES; j++) {
+                final String name = "thing_" + j;
+                final Entity source = em.create(
+                        type, new HashMap<String, Object>() {{
+                    put( "name", name );
+                }} );
+                entitiesCount.getAndIncrement();
 
-        while ( entitiesCount.get() < NUM_COLLECTIONS * NUM_ENTITIES ) {
-            Thread.sleep( 5000 );
-            logger.info( "Still working. Created {} entities and {} connections", 
-                    entitiesCount.get(), connectionCount.get() );
+                for (Entity target : connectedThings) {
+                    em.createConnection( source, "has", target );
+                    connectionCount.getAndIncrement();
+                }
+            }
         }
 
         logger.info( "Done. Created {} entities and {} connections", entitiesCount.get(), connectionCount.get() );
@@ -166,8 +120,5 @@ public class ExportAppTest {
         }, false );
         
         logger.info("time = " + (System.currentTimeMillis() - start)/1000 + "s");
-
-
-        
     }
 }
\ No newline at end of file


[11/18] incubator-usergrid git commit: Don't ignore broken user accounts (i.e. no creds or orgs), export everything.

Posted by sn...@apache.org.
Don't ignore broken user accounts (i.e. no creds or orgs), export everything.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/897fd508
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/897fd508
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/897fd508

Branch: refs/heads/two-dot-o-dev
Commit: 897fd5083feeab041fd44453abeb2db57efa4614
Parents: 4f8aa2f
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Jul 20 10:37:54 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Jul 20 10:37:54 2015 -0400

----------------------------------------------------------------------
 .../src/main/java/org/apache/usergrid/tools/ExportAdmins.java | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/897fd508/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
index 9dce862..d5dd42c 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
@@ -80,6 +80,8 @@ public class ExportAdmins extends ExportingToolBase {
     private int readThreadCount;
 
     AtomicInteger userCount = new AtomicInteger( 0 );
+    
+    boolean ignoreInvalidUsers = false; // true to ignore users with no credentials or orgs
    
     
     /**
@@ -335,9 +337,10 @@ public class ExportAdmins extends ExportingToolBase {
 
                     String actionTaken = "Processed";
 
-                    if (task.orgNamesByUuid.isEmpty()
+                    if (ignoreInvalidUsers && (task.orgNamesByUuid.isEmpty()
                             || task.dictionariesByName.isEmpty()
-                            || task.dictionariesByName.get( "credentials" ).isEmpty()) {
+                            || task.dictionariesByName.get( "credentials" ).isEmpty())) {
+                        
                         actionTaken = "Ignored";
                         
                     } else {


[09/18] incubator-usergrid git commit: Ignore users with no orgs or creds, log total orgs exported and better logging.

Posted by sn...@apache.org.
Ignore users with no orgs or creds, log total orgs exported and better logging.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/125ffe98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/125ffe98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/125ffe98

Branch: refs/heads/two-dot-o-dev
Commit: 125ffe9893c617a81b26a9aaaf1f13460fb01ca6
Parents: 16de78d
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Jul 17 11:28:07 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Jul 17 11:28:07 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportAdmins.java | 117 ++++++++++++-------
 .../org/apache/usergrid/tools/ImportAdmins.java |   2 +-
 2 files changed, 78 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/125ffe98/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
index ae9c16b..9dce862 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
@@ -61,15 +61,25 @@ import static org.apache.usergrid.persistence.cassandra.CassandraService.MANAGEM
  *    cassandra.lock.keyspace=My_Usergrid_Locks
  */
 public class ExportAdmins extends ExportingToolBase {
-
+    
     static final Logger logger = LoggerFactory.getLogger( ExportAdmins.class );
+    
     public static final String ADMIN_USERS_PREFIX = "admin-users";
     public static final String ADMIN_USER_METADATA_PREFIX = "admin-user-metadata";
+   
+    // map admin user UUID to list of organizations to which user belongs
+    private Map<UUID, List<Org>> userToOrgsMap = new HashMap<UUID, List<Org>>(50000);
+
+    private Map<String, UUID> orgNameToUUID = new HashMap<String, UUID>(50000);
+    
+    private Set<UUID> orgsWritten = new HashSet<UUID>(50000);
+    
+    private Set<UUID> duplicateOrgs = new HashSet<UUID>();
+    
     private static final String READ_THREAD_COUNT = "readThreads";
-    private Map<String, List<Org>> orgMap = new HashMap<String, List<Org>>(80000);
     private int readThreadCount;
-    
-    AtomicInteger count = new AtomicInteger( 0 );
+
+    AtomicInteger userCount = new AtomicInteger( 0 );
    
     
     /**
@@ -173,7 +183,7 @@ public class ExportAdmins extends ExportingToolBase {
         while ( !done ) {
             writeThread.join( 10000, 0 );
             done = !writeThread.isAlive();
-            logger.info( "Wrote {} users", count.get() );
+            logger.info( "Wrote {} users", userCount.get() );
         }
     }
     
@@ -211,10 +221,11 @@ public class ExportAdmins extends ExportingToolBase {
             organizations = em.searchCollection( em.getApplicationRef(), "groups", query );
             for ( Entity organization : organizations.getEntities() ) {
                 execService.submit( new OrgMapWorker( organization ) );
+                count++;
             }
-            count++;
+             
             if ( count % 1000 == 0 ) {
-                logger.info("Processed {} orgs for org map", count);
+                logger.info("Queued {} org map workers", count);
             }
             query.setCursor( organizations.getCursor() );
         }
@@ -222,8 +233,10 @@ public class ExportAdmins extends ExportingToolBase {
 
         execService.shutdown();
         while ( !execService.awaitTermination( 10, TimeUnit.SECONDS ) ) {
-            logger.info("Processed {} orgs for map", orgMap.size() );
+            logger.info( "Processed {} orgs for map", userToOrgsMap.size() );
         }
+        
+        logger.info("Org map complete, counted {} organizations", count);
     }
 
 
@@ -239,17 +252,33 @@ public class ExportAdmins extends ExportingToolBase {
             try {
                 final String orgName = orgEntity.getProperty( "path" ).toString();
                 final UUID orgId = orgEntity.getUuid();
+                
                 for (UserInfo user : managementService.getAdminUsersForOrganization( orgEntity.getUuid() )) {
                     try {
                         Entity admin = managementService.getAdminUserEntityByUuid( user.getUuid() );
-                        List<Org> orgs = orgMap.get( admin.getProperty( "username" ) );
-                        if (orgs == null) {
-                            orgs = new ArrayList<Org>();
-                            orgMap.put( admin.getProperty( "username" ).toString().toLowerCase(), orgs );
+                        Org org = new Org( orgId, orgName );
+
+                        synchronized (userToOrgsMap) {
+                            List<Org> userOrgs = userToOrgsMap.get( admin.getUuid() );
+                            if (userOrgs == null) {
+                                userOrgs = new ArrayList<Org>();
+                                userToOrgsMap.put( admin.getUuid(), userOrgs );
+                            }
+                            userOrgs.add( org );
                         }
-                        orgs.add( new Org( orgId, orgName ) );
 
-                        //logger.debug("Added org {} for user {}", orgName, admin.getProperty( "username" ));
+                        synchronized (orgNameToUUID) {
+                            UUID existingOrgId = orgNameToUUID.get( orgName );
+                            ;
+                            if (existingOrgId != null && !orgId.equals( existingOrgId )) {
+                                if ( !duplicateOrgs.contains( orgId )) {
+                                    logger.info( "Org {}:{} is a duplicate", orgId, orgName );
+                                    duplicateOrgs.add(orgId);
+                                }
+                            } else {
+                                orgNameToUUID.put( orgName, orgId );
+                            }
+                        }
 
                     } catch (Exception e) {
                         logger.warn( "Cannot get orgs for userId {}", user.getUuid() );
@@ -301,10 +330,33 @@ public class ExportAdmins extends ExportingToolBase {
                     AdminUserWriteTask task = new AdminUserWriteTask();
                     task.adminUser = entity;
 
-                    addDictionariesToTask(  task, entity );
+                    addDictionariesToTask( task, entity );
                     addOrganizationsToTask( task );
 
-                    writeQueue.add( task );
+                    String actionTaken = "Processed";
+
+                    if (task.orgNamesByUuid.isEmpty()
+                            || task.dictionariesByName.isEmpty()
+                            || task.dictionariesByName.get( "credentials" ).isEmpty()) {
+                        actionTaken = "Ignored";
+                        
+                    } else {
+                        writeQueue.add( task );
+                    }
+
+                    Map<String, Object> creds = (Map<String, Object>) (task.dictionariesByName.isEmpty() ? 
+                                                0 : task.dictionariesByName.get( "credentials" ));
+                    
+                    logger.error( "{} admin user {}:{}:{} has organizations={} dictionaries={} credentials={}",
+                            new Object[]{
+                                    actionTaken,
+                                    task.adminUser.getProperty( "username" ),
+                                    task.adminUser.getProperty( "email" ),
+                                    task.adminUser.getUuid(),
+                                    task.orgNamesByUuid.size(),
+                                    task.dictionariesByName.size(),
+                                    creds == null ? 0 : creds.size()
+                            } ); 
 
                 } catch ( Exception e ) {
                     logger.error("Error reading data for user " + uuid, e );
@@ -327,20 +379,8 @@ public class ExportAdmins extends ExportingToolBase {
 
             Map<Object, Object> credentialsDictionary = em.getDictionaryAsMap( entity, "credentials" );
 
-            if ( credentialsDictionary != null && !credentialsDictionary.isEmpty() ) {
+            if ( credentialsDictionary != null ) {
                 task.dictionariesByName.put( "credentials", credentialsDictionary );
-
-                if (credentialsDictionary.get( "password" ) == null) {
-                    logger.error( "User {}:{} has no password in credential dictionary",
-                        new Object[]{task.adminUser.getName(), task.adminUser.getUuid()} );
-                }
-                if (credentialsDictionary.get( "secret" ) == null) {
-                    logger.error( "User {}:{} has no secret in credential dictionary",
-                        new Object[]{task.adminUser.getName(), task.adminUser.getUuid()} );
-                }
-            } else {
-                logger.error( "User {}:{} has no or empty credentials dictionary",
-                    new Object[]{task.adminUser.getName(), task.adminUser.getUuid()} );
             }
         }
 
@@ -348,22 +388,17 @@ public class ExportAdmins extends ExportingToolBase {
 
             task.orgNamesByUuid = managementService.getOrganizationsForAdminUser( task.adminUser.getUuid() );
 
-            List<Org> orgs = orgMap.get( task.adminUser.getProperty( "username" ).toString().toLowerCase() );
+            List<Org> orgs = userToOrgsMap.get( task.adminUser.getProperty( "username" ).toString().toLowerCase() );
             
             if ( orgs != null && task.orgNamesByUuid.size() < orgs.size() ) {
+                
+                // list of orgs from getOrganizationsForAdminUser() is less than expected, use userToOrgsMap
                 BiMap<UUID, String> bimap = HashBiMap.create();
                 for (Org org : orgs) {
                     bimap.put( org.orgId, org.orgName );
                 }
                 task.orgNamesByUuid = bimap;
             }
-            
-            if ( task.orgNamesByUuid.isEmpty() ) {
-                logger.error("{}:{}:{} has no orgs", new Object[] {
-                        task.adminUser.getProperty("username"), 
-                        task.adminUser.getProperty("email"), 
-                        task.adminUser.getUuid() } );
-            }
         }
     }
 
@@ -425,7 +460,7 @@ public class ExportAdmins extends ExportingToolBase {
                         task.adminUser.getProperty("email"),
                         task.adminUser.getUuid() } );
 
-                    count.addAndGet( 1 );
+                    userCount.addAndGet( 1 );
 
                 } catch (InterruptedException e) {
                     throw new Exception("Interrupted", e);
@@ -438,7 +473,7 @@ public class ExportAdmins extends ExportingToolBase {
             usersFile.writeEndArray();
             usersFile.close();
 
-            logger.info( "Exported TOTAL {} admin users", count );
+            logger.info( "Exported TOTAL {} admin users and {} organizations", userCount.get(), orgsWritten.size() );
         }
 
 
@@ -490,7 +525,9 @@ public class ExportAdmins extends ExportingToolBase {
 
                 jg.writeEndObject();
                 
-                logger.debug( "Exported organization {}:{}", uuid, orgs.get( uuid ) );
+                synchronized (orgsWritten) {
+                    orgsWritten.add( uuid );
+                }
             }
 
             jg.writeEndArray();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/125ffe98/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
index c6aada7..7e08a4c 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
@@ -636,7 +636,7 @@ public class ImportAdmins extends ToolBase {
                     long duration = stopTime - startTime;
                     durationSum += duration;
 
-                    //logger.debug( "Audited {}th admin", count );
+                    //logger.debug( "Audited {}th admin", userCount );
                     
                     if ( count % 100 == 0 ) {
                         logger.info( "Audited {}. Average Audit Rate: {}(ms)", count, durationSum / count );


[06/18] incubator-usergrid git commit: Minor test fixes.

Posted by sn...@apache.org.
Minor test fixes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/dab84e95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/dab84e95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/dab84e95

Branch: refs/heads/two-dot-o-dev
Commit: dab84e95b54ff9ac573cfa291708938b5e181b61
Parents: 7b168b9
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 15 12:34:26 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 15 12:34:26 2015 -0400

----------------------------------------------------------------------
 .../apache/usergrid/tools/ExportAppTest.java    | 33 ++++++++++++++++++--
 1 file changed, 31 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dab84e95/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index eeaae13..c5411fd 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -24,12 +24,15 @@ import org.apache.usergrid.management.ApplicationInfo;
 import org.apache.usergrid.management.OrganizationOwnerInfo;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
+import org.junit.Assert;
 import org.junit.ClassRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Scheduler;
 import rx.schedulers.Schedulers;
 
+import java.io.File;
+import java.io.FileFilter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -37,7 +40,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
+
+/**
+ * TODO: better test, this is really just a smoke test.
+ */
 public class ExportAppTest {
     static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
     
@@ -119,8 +128,15 @@ public class ExportAppTest {
                 "-outputDir", directoryName
         }, false );
 
-        logger.info("100 read and 100 write threads = " + (System.currentTimeMillis() - start)/1000 + "s");
+        logger.info( "100 read and 100 write threads = " + (System.currentTimeMillis() - start) / 1000 + "s" );
+        
+        File exportDir = new File(directoryName);
+        assertTrue( getFileCount( exportDir, "entities"    ) > 0 );
+        assertTrue( getFileCount( exportDir, "collections" ) > 0 );
+        assertTrue( getFileCount( exportDir, "entities" ) >= 100 );
+        assertTrue( getFileCount( exportDir, "collections" ) >= 100 );
 
+        File exportDir1 = new File(directoryName + "1");
         exportApp.startTool( new String[]{
                 "-application", appInfo.getName(),
                 "-readThreads", "1",
@@ -129,6 +145,19 @@ public class ExportAppTest {
                 "-outputDir", directoryName + "1"
         }, false );
 
-        logger.info("1 thread time = " + (System.currentTimeMillis() - start)/1000 + "s");
+        logger.info( "1 thread time = " + (System.currentTimeMillis() - start) / 1000 + "s" );
+
+        exportDir = new File(directoryName);
+        assertEquals( 1, getFileCount( exportDir, "entities" ));
+        assertEquals( 1, getFileCount( exportDir, "collections" ));
+    }
+
+    private static int getFileCount(File exportDir, final String ext ) {
+        return exportDir.listFiles( new FileFilter() {
+            @Override
+            public boolean accept(File pathname) {
+                return pathname.getAbsolutePath().endsWith("." + ext);
+            }
+        } ).length;
     }
 }
\ No newline at end of file


[03/18] incubator-usergrid git commit: Remove queues and make the whole thing one "stream"

Posted by sn...@apache.org.
Remove queues and make the whole thing one "stream"


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7a870d69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7a870d69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7a870d69

Branch: refs/heads/two-dot-o-dev
Commit: 7a870d6929cea76f5bbec9aa8f5a8caa8dee07e4
Parents: 2b65e61
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 13:31:18 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 13:31:18 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportApp.java    | 375 +++++--------------
 .../usergrid/tools/ExportingToolBase.java       |   2 +-
 .../apache/usergrid/tools/ExportAppTest.java    | 114 +++++-
 3 files changed, 185 insertions(+), 306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index 21c63a0..59509c0 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.Scheduler;
 import rx.Subscriber;
+import rx.functions.Action0;
 import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
@@ -42,7 +43,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
@@ -59,40 +61,29 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class ExportApp extends ExportingToolBase {
     static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
-    
-    private static final String READ_THREAD_COUNT = "readThreads";
-    private static final String WRITE_THREAD_COUNT = "writeThreads";
-
-    // we will write two types of files: entities and connections
-    BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
-    BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
 
     static final String APPLICATION_NAME = "application";
-    
-    int pollTimeoutSeconds = 10;
-    
-    int readThreadCount = 80; 
-    int writeThreadCount = 10;
-    
+    private static final String READ_THREAD_COUNT = "readThreads";
+    private static final String WRITE_THREAD_COUNT = "writeThreads";
+   
     String applicationName;
     String organizationName;
 
-    // limiting output threads will limit output files 
+    AtomicInteger entitiesWritten = new AtomicInteger(0);
+    AtomicInteger connectionsWritten = new AtomicInteger(0);
+
     Scheduler readScheduler;
+    Scheduler writeScheduler;
 
+    ObjectMapper mapper = new ObjectMapper();
     Map<Thread, JsonGenerator> entityGeneratorsByThread  = new HashMap<Thread, JsonGenerator>();
     Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
 
-    List<String> emptyFiles = new ArrayList<String>();
+    // set via CLI
+    int readThreadCount = 80;
+    int writeThreadCount = 10; // limiting write will limit output files 
     
-    AtomicInteger activePollers = new AtomicInteger(0);
-    AtomicInteger entitiesQueued = new AtomicInteger(0);
-    AtomicInteger entitiesWritten = new AtomicInteger(0);
-    AtomicInteger connectionsWritten = new AtomicInteger(0);
-    AtomicInteger connectionsQueued = new AtomicInteger(0);
 
-    ObjectMapper mapper = new ObjectMapper();
-    
     /**
      * Tool entry point. 
      */
@@ -123,7 +114,7 @@ public class ExportApp extends ExportingToolBase {
         readScheduler = Schedulers.from( readThreadPoolExecutor );
         
         ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
-        final Scheduler writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+        writeScheduler = Schedulers.from( writeThreadPoolExecutor );
        
         startSpring();
 
@@ -138,78 +129,26 @@ public class ExportApp extends ExportingToolBase {
         final EntityManager em = emf.getEntityManager( applicationId );
         organizationName = em.getApplication().getOrganizationName();
 
-        // start write queue workers
-
-        EntityWritesOnSubscribe entityWritesOnSub = new EntityWritesOnSubscribe( entityWriteQueue );
-        rx.Observable entityWritesObservable = rx.Observable.create( entityWritesOnSub );
-        entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
-            public Observable<ExportEntity> call(ExportEntity exportEntity) {
-                return Observable.just(exportEntity).doOnNext( 
-                        new EntityWriteAction() ).subscribeOn( writeScheduler );
-            }
-        },10).subscribeOn( writeScheduler ).subscribe();
-
-        ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
-        rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
-        connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
-            public Observable<ExportConnection> call(ExportConnection connection ) {
-                return Observable.just(connection).doOnNext( 
-                        new ConnectionWriteAction()).subscribeOn( writeScheduler );
+        Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
+        
+        collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
+            
+            public Observable<ExportEntity> call(String collection) {
+                return Observable.create( new EntityObservable( em, collection ))
+                        .doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
             }
-        },10).subscribeOn( writeScheduler ).subscribe();
-
-        // start processing data and filling up write queues
-
-        CollectionsOnSubscribe onSubscribe = new CollectionsOnSubscribe( em );
-        rx.Observable collectionsObservable = rx.Observable.create( onSubscribe );
-        collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
-            public Observable<String> call(String collection) {
-                return Observable.just(collection).doOnNext( 
-                        new CollectionAction( em ) ).subscribeOn( readScheduler );
+            
+        }, 10).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
+            
+            public Observable<ExportConnection> call(ExportEntity exportEntity) {
+                return Observable.create( new ConnectionsObservable( em, exportEntity ))
+                        .doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
             }
-        },40).subscribeOn( readScheduler ).subscribe();
-        
-        // wait for write thread pollers to get started
-
-        try { Thread.sleep( 1000 ); } catch (InterruptedException ignored) {}
-
-        // wait for write-thread pollers to stop
-
-        while ( activePollers.get() > 0 ) {
-            logger.info(
-                     "Active write threads: {}\n"
-                    +"Entities written:     {}\n"
-                    +"Entities queued:      {}\n"
-                    +"Connections written:  {}\n"
-                    +"Connections queued:   {}\n",
-                    new Object[] { 
-                        activePollers.get(),
-                        entitiesWritten.get(), 
-                        entitiesQueued.get(),
-                        connectionsWritten.get(), 
-                        connectionsQueued.get()} );
-            try { Thread.sleep( 5000 ); } catch (InterruptedException ignored) {}
-        }
-
-        // wrap up files
-
-        for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
-            //gen.writeEndArray();
-            gen.flush();
-            gen.close();
-        }
-
-        for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
-            //gen.writeEndArray();
-            gen.flush();
-            gen.close();
-        }
-
-        for ( String fileName : emptyFiles ) {
-            File emptyFile = new File(fileName);
-            emptyFile.deleteOnExit();
-        }
-
+            
+        }, 10)
+            .subscribeOn( readScheduler )
+            .doOnCompleted( new FileWrapUpAction() )
+            .toBlocking().last();
     }
 
     @Override
@@ -222,12 +161,12 @@ public class ExportApp extends ExportingToolBase {
                 .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
         options.addOption( appNameOption );
 
-        Option readThreadsOption = OptionBuilder
-                .hasArg().withType(0).withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+        Option readThreadsOption = OptionBuilder.hasArg().withType(0)
+                .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
         options.addOption( readThreadsOption );
 
-        Option writeThreadsOption = OptionBuilder
-                .hasArg().withType(0).withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+        Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
+                .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
         options.addOption( writeThreadsOption );
 
         return options;
@@ -239,17 +178,15 @@ public class ExportApp extends ExportingToolBase {
     /**
      * Emits collection names found in application.
      */
-    class CollectionsOnSubscribe implements rx.Observable.OnSubscribe<String> {
+    class CollectionsObservable implements rx.Observable.OnSubscribe<String> {
         EntityManager em;
                 
-        public CollectionsOnSubscribe( EntityManager em ) {
+        public CollectionsObservable(EntityManager em) {
             this.em = em;
         }
 
         public void call(Subscriber<? super String> subscriber) {
             
-            logger.info("Starting to read collections");
-            
             int count = 0;
             try {
                 Map<String, Object> collectionMetadata = em.getApplicationCollectionMetadata();
@@ -261,11 +198,12 @@ public class ExportApp extends ExportingToolBase {
             } catch (Exception e) {
                 subscriber.onError( e );
             }
-            logger.info("Done. Read {} collection names", count);
             if ( count > 0 ) {
                 subscriber.onCompleted();
+                logger.info( "Completed. Read {} collection names", count );
             } else {
                 subscriber.unsubscribe();
+                logger.info( "No collections found" );
             }
         }
     }
@@ -273,11 +211,11 @@ public class ExportApp extends ExportingToolBase {
     /**
      * Emits entities of collection.
      */
-    class EntityOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
+    class EntityObservable implements rx.Observable.OnSubscribe<ExportEntity> {
         EntityManager em;
         String collection;
 
-        public EntityOnSubscribe(EntityManager em, String collection) {
+        public EntityObservable(EntityManager em, String collection) {
             this.em = em;
             this.collection = collection;
         }
@@ -286,6 +224,8 @@ public class ExportApp extends ExportingToolBase {
 
             logger.info("Starting to read entities of collection {}", collection);
             
+            subscriber.onStart();
+            
             try {
                 int count = 0;
 
@@ -327,10 +267,11 @@ public class ExportApp extends ExportingToolBase {
                     results = em.searchCollection( em.getApplicationRef(), collection, query );
                 }
 
-                logger.info("Done. Read {} entities", count);
                 if ( count > 0 ) {
                     subscriber.onCompleted();
+                    logger.info("Completed collection {}. Read {} entities", collection, count);
                 } else {
+                    logger.info("Completed collection {} empty", collection );
                     subscriber.unsubscribe();
                 }
                 
@@ -343,18 +284,19 @@ public class ExportApp extends ExportingToolBase {
     /**
      * Emits connections of an entity.
      */
-    class ConnectionsOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
+    class ConnectionsObservable implements rx.Observable.OnSubscribe<ExportConnection> {
         EntityManager em;
         ExportEntity exportEntity;
 
-        public ConnectionsOnSubscribe(EntityManager em, ExportEntity exportEntity) {
+        public ConnectionsObservable(EntityManager em, ExportEntity exportEntity) {
             this.em = em;
             this.exportEntity = exportEntity;
         }
 
         public void call(Subscriber<? super ExportConnection> subscriber) {
 
-            logger.info("Starting to read connections for entity type {}", exportEntity.getEntity().getType());
+            logger.info( "Starting to read connections for entity {} type {}",
+                    exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
             
             int count = 0;
             
@@ -388,173 +330,16 @@ public class ExportApp extends ExportingToolBase {
             } catch (Exception e) {
                 subscriber.onError( e );
             }
-
-            logger.info("Done. Read {} connections", count);
-            if ( count > 0 ) {
-                subscriber.onCompleted();
-            } else {
-                subscriber.unsubscribe();
-            }
-        }
-    }
-
-    /**
-     * Process collection by starting processing of its entities.
-     */
-    class CollectionAction implements Action1<String> {
-        EntityManager em;
-
-        public CollectionAction( EntityManager em ) {
-            this.em = em;
-        }
-
-        public void call(String collection) {
-
-            // process entities of collection in parallel            
-            EntityOnSubscribe onSubscribe = new EntityOnSubscribe( em, collection );
-            rx.Observable entityObservable = rx.Observable.create( onSubscribe );
-            entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
-                public Observable<ExportEntity> call(ExportEntity exportEntity) {
-                    return Observable.just(exportEntity).doOnNext( 
-                            new EntityAction( em ) ).subscribeOn( readScheduler );
-                }
-            }, 8).subscribeOn(readScheduler).toBlocking().last();
-        }
-    }
-
-    /**
-     * Process entity by adding it to entityWriteQueue and starting processing of its connections.
-     */
-    class EntityAction implements Action1<ExportEntity> {
-        EntityManager em;
-
-        public EntityAction( EntityManager em ) {
-            this.em = em;
-        }
-        
-        public void call(ExportEntity exportEntity) {
-            //logger.debug( "Processing entity: " + exportEntity.getEntity().getUuid() );
-
-            entityWriteQueue.add( exportEntity );
-            entitiesQueued.getAndIncrement();
-
-            // if entity has connections, process them in parallel
-            try {
-                Results connectedEntities = em.getConnectedEntities(
-                        exportEntity.getEntity().getUuid(), null, null, Results.Level.CORE_PROPERTIES );
-
-                if ( !connectedEntities.isEmpty() ) {
-                    ConnectionsOnSubscribe onSubscribe = new ConnectionsOnSubscribe( em, exportEntity );
-                    rx.Observable entityObservable = rx.Observable.create( onSubscribe );
-                    
-                    entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
-                        public Observable<ExportConnection> call(ExportConnection connection) {
-                            return Observable.just(connection).doOnNext( 
-                                    new ConnectionsAction() ).subscribeOn(readScheduler);
-                        }
-                    }, 8).subscribeOn(readScheduler).toBlocking().last();
-                }
-                
-            } catch (Exception e) {
-                throw new RuntimeException( "Error getting connections", e );
-            }
-        }
-    }
-
-    /**
-     * Process connection by adding it to connectionWriteQueue. 
-     */
-    class ConnectionsAction implements Action1<ExportConnection> {
-
-        public void call(ExportConnection conn) {
-            //logger.debug( "Processing connections for entity: " + conn.getSourceUuid() );
-            connectionWriteQueue.add(conn);
-            connectionsQueued.getAndIncrement();
-        }
-    }
-
-
-    // ----------------------------------------------------------------------------------------
-    // writing data
-
-    /**
-     * Emits entities to be written.
-     */
-    class EntityWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
-        BlockingQueue<ExportEntity> queue;
-
-        public EntityWritesOnSubscribe( BlockingQueue<ExportEntity> queue ) {
-            this.queue = queue;
-        }
-
-        public void call(Subscriber<? super ExportEntity> subscriber) {
-            int count = 0;
-            
-            while ( true ) {
-                ExportEntity entity = null;
-                try {
-                    //logger.debug( "Wrote {}. Polling for entity to write...", count );
-                    activePollers.getAndIncrement();
-                    entity = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
-                } catch (InterruptedException e) {
-                    logger.error("Entity poll interrupted", e);
-                    continue;
-                } finally {
-                    activePollers.getAndDecrement();
-                }
-                if ( entity == null ) {
-                    break;
-                }
-                subscriber.onNext( entity );
-                count++;
-            }
-
-            logger.info("Done. De-queued {} entities", count);
-            if ( count > 0 ) {
-                subscriber.onCompleted();
-            } else {
-                subscriber.unsubscribe();
-            }
-        }
-    }
-
-    /**
-     * Emits connections to be written.
-     */
-    class ConnectionWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
-        BlockingQueue<ExportConnection> queue;
-
-        public ConnectionWritesOnSubscribe( BlockingQueue<ExportConnection> queue ) {
-            this.queue = queue;
-        }
-
-        public void call(Subscriber<? super ExportConnection> subscriber) {
-            int count = 0;
             
-            while ( true ) {
-                ExportConnection connection = null;
-                try {
-                    //logger.debug( "Wrote {}. Polling for connection to write", count );
-                    activePollers.getAndIncrement();
-                    connection = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
-                } catch (InterruptedException e) {
-                    logger.error("Connection poll interrupted", e);
-                    continue;
-                } finally {
-                    activePollers.getAndDecrement();
-                }
-                if ( connection == null ) {
-                    break;
-                }
-                subscriber.onNext( connection );
-                count++;
-            }
-
-            logger.info("Done. De-queued {} connections", count);
             if ( count > 0 ) {
                 subscriber.onCompleted();
+                logger.info("Completed entity {} type {} connections count {}",
+                        new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
+                
             } else {
                 subscriber.unsubscribe();
+                logger.info( "Entity {} type {} has no connections",
+                        exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
             }
         }
     }
@@ -566,10 +351,9 @@ public class ExportApp extends ExportingToolBase {
 
         public void call(ExportEntity entity) {
 
-            boolean wroteData = false;
-
             String [] parts = Thread.currentThread().getName().split("-");
-            String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".entities";
+            String fileName = outputDir.getAbsolutePath() + File.separator
+                    + applicationName.replace('/','-') + "-" + parts[3] + ".entities";
 
             JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
             if ( gen == null ) {
@@ -577,6 +361,7 @@ public class ExportApp extends ExportingToolBase {
                 // no generator so we are opening new file and writing the start of an array
                 try {
                     gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+                    logger.info("Opened output file {}", fileName);
                 } catch (IOException e) {
                     throw new RuntimeException("Error opening output file: " + fileName, e);
                 }
@@ -589,15 +374,10 @@ public class ExportApp extends ExportingToolBase {
                 gen.writeObject( entity );
                 gen.writeRaw('\n');
                 entitiesWritten.getAndIncrement();
-                wroteData = true;
 
             } catch (IOException e) {
                 throw new RuntimeException("Error writing to output file: " + fileName, e);
             }
-
-            if ( !wroteData ) {
-                emptyFiles.add( fileName );
-            }
         }
     }
 
@@ -608,10 +388,9 @@ public class ExportApp extends ExportingToolBase {
 
         public void call(ExportConnection conn) {
 
-            boolean wroteData = false;
-
             String [] parts = Thread.currentThread().getName().split("-");
-            String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".connections";
+            String fileName = outputDir.getAbsolutePath() + File.separator
+                    + applicationName.replace('/','-') + "-" + parts[3] + ".connections";
 
             JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
             if ( gen == null ) {
@@ -619,6 +398,7 @@ public class ExportApp extends ExportingToolBase {
                 // no generator so we are opening new file and writing the start of an array
                 try {
                     gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+                    logger.info("Opened output file {}", fileName);
                 } catch (IOException e) {
                     throw new RuntimeException("Error opening output file: " + fileName, e);
                 }
@@ -631,18 +411,41 @@ public class ExportApp extends ExportingToolBase {
                 gen.writeObject( conn );
                 gen.writeRaw('\n');
                 connectionsWritten.getAndIncrement();
-                wroteData = true;
 
             } catch (IOException e) {
                 throw new RuntimeException("Error writing to output file: " + fileName, e);
             }
+        }
+    }
+
+    private class FileWrapUpAction implements Action0 {
+        @Override
+        public void call() {
+
+            logger.info("-------------------------------------------------------------------");
+            logger.info("DONE! Entities: {} Connections: {}", entitiesWritten.get(), connectionsWritten.get());
+            logger.info("-------------------------------------------------------------------");
 
-            if ( !wroteData ) {
-                emptyFiles.add( fileName );
+            for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
+                try {
+                    //gen.writeEndArray();
+                    gen.flush();
+                    gen.close();
+                } catch (IOException e) {
+                    logger.error("Error closing output file", e);
+                }
+            }
+            for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
+                try {
+                    //gen.writeEndArray();
+                    gen.flush();
+                    gen.close();
+                } catch (IOException e) {
+                    logger.error("Error closing output file", e);
+                }
             }
         }
     }
-
 }
 
 class ExportEntity {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
index 3de220c..a7c3905 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
@@ -138,7 +138,7 @@ public abstract class ExportingToolBase extends ToolBase {
 
         if ( !file.mkdirs() ) {
 
-            throw new RuntimeException( String.format( "Unable to create diretory %s", dirName ) );
+            throw new RuntimeException( String.format( "Unable to create directory %s", dirName ) );
         }
 
         return file;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index 14b9311..af8306f 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -27,14 +27,26 @@ import org.apache.usergrid.persistence.EntityManager;
 import org.junit.ClassRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 public class ExportAppTest {
     static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
+    
+    int NUM_COLLECTIONS = 5;
+    int NUM_ENTITIES = 10; 
+    int NUM_CONNECTIONS = 1;
 
     @ClassRule
     public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
@@ -52,46 +64,110 @@ public class ExportAppTest {
         ApplicationInfo appInfo = setup.getMgmtSvc().createApplication(
                 orgInfo.getOrganization().getUuid(), "app_" + rand );
 
-        EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
+        final EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
         
-        // create 10 connected things
+        // create connected things
 
-        List<Entity> connectedThings = new ArrayList<Entity>();
+        final List<Entity> connectedThings = new ArrayList<Entity>();
         String connectedType = "connected_thing";
         em.createApplicationCollection(connectedType);
-        for ( int j=0; j<10; j++) {
+        for ( int j=0; j<NUM_CONNECTIONS; j++) {
             final String name = "connected_thing_" + j;
             connectedThings.add( em.create( connectedType, new HashMap<String, Object>() {{
                 put( "name", name );
             }} ) );
         }
        
-        // create 10 collections of 10 things, every other thing is connected to the connected things
-        
-        for ( int i=0; i<10; i++) {
-            String type = "thing_"+i;
-            em.createApplicationCollection(type);
-            for ( int j=0; j<10; j++) {
-                final String name = "thing_" + j;
-                Entity source = em.create(type, new HashMap<String, Object>() {{ put("name", name); }});
-                if ( j % 2 == 0 ) {
-                    for ( Entity target : connectedThings ) {
-                        em.createConnection( source, "has", target );
+        // create collections of things, every other thing is connected to the connected things
+
+        final AtomicInteger entitiesCount = new AtomicInteger(0);
+        final AtomicInteger connectionCount = new AtomicInteger(0);
+
+        ExecutorService execService = Executors.newFixedThreadPool( 50);
+        final Scheduler scheduler = Schedulers.from( execService );
+
+        Observable.range( 0, NUM_COLLECTIONS ).flatMap( new Func1<Integer, Observable<?>>() {
+            @Override
+            public Observable<?> call(Integer i) {
+                
+                return Observable.just( i ).doOnNext( new Action1<Integer>() {
+                    @Override
+                    public void call(Integer i) {
+                        
+                        final String type = "thing_"+i;
+                        try {
+                            em.createApplicationCollection( type );
+                            connectionCount.getAndIncrement();
+                            
+                        } catch (Exception e) {
+                            throw new RuntimeException( "Error creating collection", e );
+                        }
+                       
+                        Observable.range( 0, NUM_ENTITIES ).flatMap( new Func1<Integer, Observable<?>>() {
+                            @Override
+                            public Observable<?> call(Integer j) {
+                                return Observable.just( j ).doOnNext( new Action1<Integer>() {
+                                    @Override
+                                    public void call(Integer j) {
+                                        
+                                        final String name = "thing_" + j;
+                                        try {
+                                            final Entity source = em.create( 
+                                                    type, new HashMap<String, Object>() {{ put("name", name); }});
+                                            entitiesCount.getAndIncrement();
+                                            logger.info( "Created entity {} type {}", name, type );
+                                            
+                                            for ( Entity target : connectedThings ) {
+                                                em.createConnection( source, "has", target );
+                                                connectionCount.getAndIncrement();
+                                                logger.info( "Created connection from entity {} type {} to {}",
+                                                        new Object[]{name, type, target.getName()} );
+                                            }
+
+
+                                        } catch (Exception e) {
+                                            throw new RuntimeException( "Error creating collection", e );
+                                        }
+                                        
+                                        
+                                    }
+                                    
+                                } );
+
+                            }
+                        }, 50 ).subscribeOn( scheduler ).subscribe(); // toBlocking().last();
+                        
                     }
-                }
+                } );
+                
+
             }
+        }, 30 ).subscribeOn( scheduler ).toBlocking().last();
+
+        while ( entitiesCount.get() < NUM_COLLECTIONS * NUM_ENTITIES ) {
+            Thread.sleep( 5000 );
+            logger.info( "Still working. Created {} entities and {} connections", 
+                    entitiesCount.get(), connectionCount.get() );
         }
-        
-        // export to file
 
-        String directoryName = "./target/export" + rand;
+        logger.info( "Done. Created {} entities and {} connections", entitiesCount.get(), connectionCount.get() );
+
+        long start = System.currentTimeMillis();
+        
+        String directoryName = "target/export" + rand;
 
         ExportApp exportApp = new ExportApp();
         exportApp.startTool( new String[]{
                 "-application", appInfo.getName(),
+                "-readThreads", "50",
+                "-writeThreads", "10",
                 "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
                 "-outputDir", directoryName
         }, false );
         
+        logger.info("time = " + (System.currentTimeMillis() - start)/1000 + "s");
+
+
+        
     }
 }
\ No newline at end of file


[17/18] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-dev

Posted by sn...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-dev


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a562366e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a562366e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a562366e

Branch: refs/heads/two-dot-o-dev
Commit: a562366e030dbc6017032dd4af82354e64d48ffc
Parents: fedf165 ca4575d
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 22 11:17:10 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 22 11:17:10 2015 -0400

----------------------------------------------------------------------
 .../usergrid/services/AbstractService.java      | 270 ++++++++++---------
 1 file changed, 144 insertions(+), 126 deletions(-)
----------------------------------------------------------------------



[16/18] incubator-usergrid git commit: Merge branch 'two-dot-o' into two-dot-o-dev

Posted by sn...@apache.org.
Merge branch 'two-dot-o' into two-dot-o-dev

Conflicts:
	stack/core/pom.xml


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/fedf1651
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/fedf1651
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/fedf1651

Branch: refs/heads/two-dot-o-dev
Commit: fedf16516b1f5eb26281b8d9cf6ed3b260c715f5
Parents: d32cee4 066d7db
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 22 09:46:21 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 22 09:46:21 2015 -0400

----------------------------------------------------------------------
 stack/pom.xml                                   |   2 +-
 stack/tools/pom.xml                             |   6 +
 .../org/apache/usergrid/tools/ExportAdmins.java | 117 ++--
 .../org/apache/usergrid/tools/ExportApp.java    | 536 +++++++++++++++++++
 .../usergrid/tools/ExportDataCreator.java       | 244 +++++++--
 .../usergrid/tools/ExportingToolBase.java       |   2 +-
 .../org/apache/usergrid/tools/ImportAdmins.java | 226 ++++++--
 stack/tools/src/main/resources/log4j.properties |   5 +
 .../apache/usergrid/tools/ExportAppTest.java    | 118 ++++
 .../usergrid/tools/ExportImportAdminsTest.java  |  71 ++-
 ...adata.usergrid-management.1433331614293.json |  52 ++
 ...users.usergrid-management.1433331614293.json |  12 +
 12 files changed, 1224 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fedf1651/stack/pom.xml
----------------------------------------------------------------------
diff --cc stack/pom.xml
index d7f279d,cc39e04..6f984e3
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@@ -109,10 -109,10 +109,10 @@@
        <jersey-version>1.18.1</jersey-version>
        <junit-version>4.12</junit-version>
        <log4j-version>1.2.16</log4j-version>
 -      <org.springframework.version>3.1.2.RELEASE</org.springframework.version>
 +      <org.springframework.version>3.2.13.RELEASE</org.springframework.version>
        <shiro-version>1.2.3</shiro-version>
        <slf4j-version>1.6.1</slf4j-version>
-       <snakeyaml-version>1.8</snakeyaml-version>
+       <snakeyaml-version>1.9</snakeyaml-version>
        <tomcat-version>7.0.59</tomcat-version>
        <antlr.version>3.4</antlr.version>
        <tika.version>1.4</tika.version>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fedf1651/stack/tools/src/main/resources/log4j.properties
----------------------------------------------------------------------


[13/18] incubator-usergrid git commit: Added data creator to generate data for ExportApp testing.

Posted by sn...@apache.org.
Added data creator to generate data for ExportApp testing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a6e68a0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a6e68a0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a6e68a0a

Branch: refs/heads/two-dot-o-dev
Commit: a6e68a0adc571b0a30138d4cb71b5e0e49a42c0a
Parents: 2748347
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 21 14:59:59 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 21 14:59:59 2015 -0400

----------------------------------------------------------------------
 stack/pom.xml                                   |   2 +-
 stack/tools/pom.xml                             |   6 +
 .../org/apache/usergrid/tools/ExportApp.java    |  17 +-
 .../usergrid/tools/ExportDataCreator.java       | 244 +++++++++++++++----
 .../org/apache/usergrid/tools/ToolBase.java     |   2 +-
 .../apache/usergrid/tools/ExportAppTest.java    |  65 +----
 6 files changed, 223 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index da1b62c..0e1b32d 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -103,7 +103,7 @@
     <org.springframework.version>3.1.2.RELEASE</org.springframework.version>
     <shiro-version>1.2.0</shiro-version>
     <slf4j-version>1.6.1</slf4j-version>
-    <snakeyaml-version>1.8</snakeyaml-version>
+    <snakeyaml-version>1.9</snakeyaml-version>
     <tomcat-version>7.0.42</tomcat-version>
     <antlr.version>3.4</antlr.version>
     <tika.version>1.4</tika.version>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/pom.xml
----------------------------------------------------------------------
diff --git a/stack/tools/pom.xml b/stack/tools/pom.xml
index 4be3232..3402612 100644
--- a/stack/tools/pom.xml
+++ b/stack/tools/pom.xml
@@ -249,5 +249,11 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>io.codearte.jfairy</groupId>
+      <artifactId>jfairy</artifactId>
+      <version>0.4.3</version>
+    </dependency>
+
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index b2da0ea..db975e6 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -102,7 +102,7 @@ public class ExportApp extends ExportingToolBase {
      */
     @Override
     public void runTool(CommandLine line) throws Exception {
-        
+
         applicationName = line.getOptionValue( APPLICATION_NAME );
 
         if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
@@ -122,8 +122,11 @@ public class ExportApp extends ExportingToolBase {
         logger.info( "Export directory: " + outputDir.getAbsolutePath() );
 
         startSpring();
-        
+
         UUID applicationId = emf.lookupApplication( applicationName );
+        if (applicationId == null) {
+            throw new RuntimeException( "Cannot find application " + applicationName );
+        }
         final EntityManager em = emf.getEntityManager( applicationId );
         organizationName = em.getApplication().getOrganizationName();
 
@@ -133,8 +136,9 @@ public class ExportApp extends ExportingToolBase {
         Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
         
         collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
-            
+
             public Observable<ExportEntity> call(String collection) {
+
                 return Observable.create( new EntityObservable( em, collection ) )
                         .doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
             }
@@ -142,16 +146,17 @@ public class ExportApp extends ExportingToolBase {
         }, writeThreadCount ).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
 
             public Observable<ExportConnection> call(ExportEntity exportEntity) {
+
                 return Observable.create( new ConnectionsObservable( em, exportEntity ) )
                         .doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
             }
 
-        }, writeThreadCount)
+        }, writeThreadCount )
             .doOnCompleted( new FileWrapUpAction() )
             .toBlocking().last();
     }
-
-
+   
+    
     // ----------------------------------------------------------------------------------------
     // reading data
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java
index a3d6517..6fa4896 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportDataCreator.java
@@ -17,90 +17,232 @@
 package org.apache.usergrid.tools;
 
 
-import java.util.UUID;
-
-import org.apache.usergrid.management.ManagementService;
+import io.codearte.jfairy.Fairy;
+import io.codearte.jfairy.producer.company.Company;
+import io.codearte.jfairy.producer.person.Person;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.management.ApplicationInfo;
+import org.apache.usergrid.management.OrganizationInfo;
 import org.apache.usergrid.management.OrganizationOwnerInfo;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.entities.Activity;
-import org.apache.usergrid.persistence.entities.User;
+import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
 
-import static org.junit.Assert.assertNotNull;
+import java.util.*;
 
 
 /**
- * Simple class to create test for for exporting
- *
- * @author tnine
+ * Create an app full of users and data.
  */
-public class ExportDataCreator {
+public class ExportDataCreator extends ToolBase {
+
+    public static final String APP_NAME = "application";
+    public static final String ORG_NAME = "organization";
+    public static final String NUM_USERS = "users";
+    public static final String NUM_COLLECTIONS = "collections";
+    public static final String NUM_ENTITIES = "entities";
+    public static final String ADMIN_USERNAME = "username";
+    public static final String ADMIN_PASSWORD = "password";
 
-    private EntityManagerFactory emf;
+    public String appName = "test-app";
+    public String orgName = "test-organization";
+    public int numUsers = 100;
+    public int numCollections = 20;
+    public int numEntities = 100;
+    public String adminUsername = "adminuser";
+    public String adminPassword = "test";
 
-    private ManagementService managementService;
 
+    @Override
+    public void runTool(CommandLine line) throws Exception {
 
-    /**
-     * @param emf
-     * @param managementService
-     */
-    public ExportDataCreator( EntityManagerFactory emf, ManagementService managementService ) {
-        super();
-        this.emf = emf;
-        this.managementService = managementService;
+        startSpring();
+
+        setVerbose( line );
+
+        if (line.hasOption( APP_NAME )) {
+            appName = line.getOptionValue( APP_NAME );
+        }
+        if (line.hasOption( ORG_NAME )) {
+            orgName = line.getOptionValue( ORG_NAME );
+        }
+        if (line.hasOption( NUM_USERS )) {
+            numUsers = Integer.parseInt( line.getOptionValue( NUM_USERS ) );
+        }
+        if (line.hasOption( NUM_COLLECTIONS )) {
+            numCollections = Integer.parseInt( line.getOptionValue( NUM_COLLECTIONS ) );
+        }
+        if (line.hasOption( NUM_ENTITIES )) {
+            numEntities = Integer.parseInt( line.getOptionValue( NUM_ENTITIES ) );
+        }
+        if (line.hasOption( ADMIN_USERNAME )) {
+            adminUsername = line.getOptionValue( ADMIN_USERNAME );
+        }
+        if (line.hasOption( ADMIN_PASSWORD )) {
+            adminPassword = line.getOptionValue( ADMIN_PASSWORD );
+        }
+
+        createTestData();
     }
 
 
-    public void createTestData() throws Exception {
+    @Override
+    @SuppressWarnings("static-access")
+    public Options createOptions() {
 
-        String orgName = "testexportorg";
+        Options options = super.createOptions();
 
-        //nothing to do 
-        if ( managementService.getOrganizationByName( orgName ) != null ) {
-            return;
-        }
+        Option appName = OptionBuilder.hasArg()
+                .withDescription( "Application name to use" ).create( APP_NAME );
 
-        OrganizationOwnerInfo orgInfo = managementService
-                .createOwnerAndOrganization( orgName, "textExportUser@apigee.com", "Test User",
-                        "textExportUser@apigee.com", "password", true, false );
+        Option orgName = OptionBuilder.hasArg()
+                .withDescription( "Organization to use (will create if not present)" ).create( ORG_NAME );
 
-        UUID appId = managementService.createApplication( orgInfo.getOrganization().getUuid(), "application" ).getId();
+        Option numUsers = OptionBuilder.hasArg()
+                .withDescription( "Number of users create (in addition to users)" ).create( NUM_USERS );
 
-        EntityManager em = emf.getEntityManager( appId );
+        Option numCollection = OptionBuilder.hasArg()
+                .withDescription( "Number of collections to create (in addition to users)" ).create( NUM_COLLECTIONS );
 
-        User first = new User();
-        first.setUsername( "first" );
-        first.setEmail( "first@usergrid.com" );
+        Option numEntities = OptionBuilder.hasArg()
+                .withDescription( "Number of entities to create per collection" ).create( NUM_ENTITIES );
 
-        Entity firstUserEntity = em.create( first );
+        Option adminUsername = OptionBuilder.hasArg()
+                .withDescription( "Admin Username" ).create( ADMIN_USERNAME );
 
-        assertNotNull( firstUserEntity );
+        Option adminPassword = OptionBuilder.hasArg()
+                .withDescription( "Admin Password" ).create( ADMIN_PASSWORD );
 
-        User second = new User();
-        second.setUsername( "second" );
-        second.setEmail( "second@usergrid.com" );
+        options.addOption( appName );
+        options.addOption( orgName );
+        options.addOption( numUsers );
+        options.addOption( numCollection );
+        options.addOption( numEntities );
+        options.addOption( adminUsername );
+        options.addOption( adminPassword );
 
-        Entity secondUserEntity = em.create( second );
+        return options;
+    }
 
-        assertNotNull( secondUserEntity );
 
-        em.createConnection( firstUserEntity, "likes", secondUserEntity );
+    public void createTestData() throws Exception {
 
-        em.createConnection( secondUserEntity, "dislikes", firstUserEntity );
+        OrganizationInfo orgInfo = managementService.getOrganizationByName( orgName );
 
-        // now create some activities and put them into the user stream
+        if (orgInfo == null) {
+            OrganizationOwnerInfo ownerInfo = managementService.createOwnerAndOrganization(
+                    orgName, adminUsername + "@example.com", adminUsername,
+                    adminUsername + "@example.com", adminPassword, true, false );
+            orgInfo = ownerInfo.getOrganization();
+        }
 
-        Activity activity = new Activity();
+        ApplicationInfo appInfo = managementService.getApplicationInfo( orgName + "/" + appName );
 
-        Activity.ActivityObject actor = new Activity.ActivityObject();
-        actor.setEntityType( "user" );
-        actor.setId( firstUserEntity.getUuid().toString() );
+        if (appInfo == null) {
+            UUID appId = managementService.createApplication( orgInfo.getUuid(), appName ).getId();
+            appInfo = managementService.getApplicationInfo( appId );
+        }
 
-        activity.setActor( actor );
-        activity.setVerb( "POST" );
+        EntityManager em = emf.getEntityManager( appInfo.getId() );
+
+        Fairy fairy = Fairy.create();
+
+        List<Entity> users = new ArrayList<Entity>( numUsers );
+
+        for (int i = 0; i < numUsers; i++) {
+
+            final Person person = fairy.person();
+            Entity userEntity = null;
+            try {
+                final Map<String, Object> userMap = new HashMap<String, Object>() {{
+                    put( "username", person.username() );
+                    put( "password", person.password() );
+                    put( "email", person.email() );
+                    put( "companyEmail", person.companyEmail() );
+                    put( "dateOfBirth", person.dateOfBirth() );
+                    put( "firstName", person.firstName() );
+                    put( "lastName", person.lastName() );
+                    put( "nationalIdentificationNumber", person.nationalIdentificationNumber() );
+                    put( "telephoneNumber", person.telephoneNumber() );
+                    put( "passportNumber", person.passportNumber() );
+                    put( "address", person.getAddress() );
+                }};
+
+                userEntity = em.create( "user", userMap );
+                users.add( userEntity );
+
+            } catch (DuplicateUniquePropertyExistsException e) {
+                logger.error( "Dup user generated: " + person.username() );
+                continue;
+            } catch (Exception e) {
+                logger.error("Error creating user", e);
+                continue;
+            }
+
+            final Company company = person.getCompany();
+            try {
+                EntityRef ref = em.getAlias( "company", company.name() );
+                Entity companyEntity = (ref == null) ? null : em.get( ref );
+              
+                // create company if it does not exist yet
+                if ( companyEntity == null ) {
+                    final Map<String, Object> companyMap = new HashMap<String, Object>() {{
+                        put( "name", company.name() );
+                        put( "domain", company.domain() );
+                        put( "email", company.email() );
+                        put( "url", company.url() );
+                        put( "vatIdentificationNumber", company.vatIdentificationNumber() );
+                    }};
+                    companyEntity = em.create( "company", companyMap );
+                } else {
+                    logger.info("Company {} already exists", company.name());
+                }
+
+                em.createConnection( userEntity, "employer", companyEntity );
+
+            } catch (DuplicateUniquePropertyExistsException e) {
+                logger.error( "Dup company generated {} property={}", company.name(), e.getPropertyName() );
+                continue;
+            } catch (Exception e) {
+                logger.error("Error creating or connecting company", e);
+                continue;
+            }
+            
+            try {
+                for (int j = 0; j < 5; j++) {
+                    Activity activity = new Activity();
+                    Activity.ActivityObject actor = new Activity.ActivityObject();
+                    actor.setEntityType( "user" );
+                    actor.setId( userEntity.getUuid().toString() );
+                    activity.setActor( actor );
+                    activity.setVerb( "POST" );
+                    activity.setContent( "User " + person.username() + " generated a random string "
+                            + RandomStringUtils.randomAlphanumeric( 5 ) );
+                    em.createItemInCollection( userEntity, "activities", "activity", activity.getProperties() );
+                }
+
+                if (users.size() > 10) {
+                    for (int j = 0; j < 5; j++) {
+                        try {
+                            em.createConnection( userEntity, "associate", users.get( (int) (Math.random() * users.size()) ) );
+                        } catch (Exception e) {
+                            logger.error( "Error connecting user to user: " + e.getMessage() );
+                        }
+                    }
+                }
+                
+            } catch (Exception e) {
+                logger.error("Error creating activities", e);
+                continue;
+            }
 
-        em.createItemInCollection( firstUserEntity, "activities", "activity", activity.getProperties() );
+        }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
index 63187a4..3c427e1 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
@@ -116,7 +116,7 @@ public abstract class ToolBase {
     public void printCliHelp( String message ) {
         System.out.println( message );
         HelpFormatter formatter = new HelpFormatter();
-        formatter.printHelp( "java -jar usergrid-tools-0.0.1-SNAPSHOT.jar " + getToolName(), createOptions() );
+        formatter.printHelp( "java -jar usergrid-tools-1.0.2.jar " + getToolName(), createOptions() );
         System.exit( -1 );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a6e68a0a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index a1e3f6b..446aa91 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -63,65 +63,23 @@ public class ExportAppTest {
         
         // create app with some data
 
-        OrganizationOwnerInfo orgInfo = setup.getMgmtSvc().createOwnerAndOrganization(
-                "org_" + rand, "user_" + rand, rand.toUpperCase(), rand + "@example.com", rand );
-
-        ApplicationInfo appInfo = setup.getMgmtSvc().createApplication(
-                orgInfo.getOrganization().getUuid(), "app_" + rand );
-
-        final EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
+        String orgName = "org_" + rand;
+        String appName = "app_" + rand;
+        
+        ExportDataCreator creator = new ExportDataCreator();
+        creator.startTool( new String[] {
+                "-organization", orgName,
+                "-application", appName,
+                "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort()
+        }, false);
         
-        // create connected things
-
-        final List<Entity> connectedThings = new ArrayList<Entity>();
-        String connectedType = "connected_thing";
-        em.createApplicationCollection(connectedType);
-        for ( int j=0; j<NUM_CONNECTIONS; j++) {
-            final String name = "connected_thing_" + j;
-            connectedThings.add( em.create( connectedType, new HashMap<String, Object>() {{
-                put( "name", name );
-            }} ) );
-        }
-       
-        // create collections of things, every other thing is connected to the connected things
-
-        final AtomicInteger entitiesCount = new AtomicInteger(0);
-        final AtomicInteger connectionCount = new AtomicInteger(0);
-
-        ExecutorService execService = Executors.newFixedThreadPool( 50);
-        final Scheduler scheduler = Schedulers.from( execService );
-
-        for (int i = 0; i < NUM_COLLECTIONS; i++) {
-
-            final String type = "thing_" + i;
-            em.createApplicationCollection( type );
-            connectionCount.getAndIncrement();
-
-            for (int j = 0; j < NUM_ENTITIES; j++) {
-                final String name = "thing_" + j;
-                final Entity source = em.create(
-                        type, new HashMap<String, Object>() {{
-                    put( "name", name );
-                }} );
-                entitiesCount.getAndIncrement();
-
-                for (Entity target : connectedThings) {
-                    em.createConnection( source, "has", target );
-                    connectionCount.getAndIncrement();
-                }
-            }
-        }
-
-        logger.info( "Done. Created {} entities and {} connections", entitiesCount.get(), connectionCount.get() );
-
         long start = System.currentTimeMillis();
         
         String directoryName = "target/export" + rand;
 
         ExportApp exportApp = new ExportApp();
         exportApp.startTool( new String[]{
-                "-application", appInfo.getName(),
-                "-readThreads", "100",
+                "-application", orgName + "/" + appName,
                 "-writeThreads", "100",
                 "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
                 "-outputDir", directoryName
@@ -137,8 +95,7 @@ public class ExportAppTest {
 
         File exportDir1 = new File(directoryName + "1");
         exportApp.startTool( new String[]{
-                "-application", appInfo.getName(),
-                "-readThreads", "1",
+                "-application", orgName + "/" + appName,
                 "-writeThreads", "1",
                 "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
                 "-outputDir", directoryName + "1"


[18/18] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-dev

Posted by sn...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-dev


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/89dd0ad9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/89dd0ad9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/89dd0ad9

Branch: refs/heads/two-dot-o-dev
Commit: 89dd0ad9868acda49da57fc24748f0cc4f89055b
Parents: a562366 b1393b4
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 22 11:18:29 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 22 11:18:29 2015 -0400

----------------------------------------------------------------------
 .../usergrid/services/AbstractService.java      | 64 ++++++++++----------
 1 file changed, 31 insertions(+), 33 deletions(-)
----------------------------------------------------------------------



[07/18] incubator-usergrid git commit: Make flatmap max observables match write thread count and use Schedulers.io() instead of a custom readScheduler.

Posted by sn...@apache.org.
Make flatmap max observables match write thread count and use Schedulers.io() instead of a custom readScheduler.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b58390d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b58390d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b58390d9

Branch: refs/heads/two-dot-o-dev
Commit: b58390d96eb423287247774533b18e9c4ee43843
Parents: dab84e9
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 15 13:53:55 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 15 13:53:55 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportApp.java    | 39 +++++---------------
 .../apache/usergrid/tools/ExportAppTest.java    | 12 +++---
 2 files changed, 14 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b58390d9/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index c302a74..b2da0ea 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -54,7 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * 
  * Will create as many output files as there are writeThreads (by default: 10).
  * 
- * Will create two types of files: *.uge for Usegrird entities and *.ugc for entity to entity connections.
+ * Will create two types of files: *.entities for Usegrird entities and *.collections for entity to entity connections.
  * 
  * Every line of the data files is a complete JSON object.
  */
@@ -62,7 +62,6 @@ public class ExportApp extends ExportingToolBase {
     static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
 
     static final String APPLICATION_NAME = "application";
-    private static final String READ_THREAD_COUNT = "readThreads";
     private static final String WRITE_THREAD_COUNT = "writeThreads";
    
     String applicationName;
@@ -71,16 +70,13 @@ public class ExportApp extends ExportingToolBase {
     AtomicInteger entitiesWritten = new AtomicInteger(0);
     AtomicInteger connectionsWritten = new AtomicInteger(0);
 
-    Scheduler readScheduler;
     Scheduler writeScheduler;
 
     ObjectMapper mapper = new ObjectMapper();
     Map<Thread, JsonGenerator> entityGeneratorsByThread  = new HashMap<Thread, JsonGenerator>();
     Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
 
-    // set via CLI
-    int readThreadCount = 80;
-    int writeThreadCount = 10; // limiting write will limit output files 
+    int writeThreadCount = 10; // set via CLI option; limiting write will limit output files 
 
 
     @Override
@@ -93,10 +89,6 @@ public class ExportApp extends ExportingToolBase {
                 .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
         options.addOption( appNameOption );
 
-        Option readThreadsOption = OptionBuilder.hasArg().withType(0)
-                .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
-        options.addOption( readThreadsOption );
-
         Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
                 .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
         options.addOption( writeThreadsOption );
@@ -113,15 +105,6 @@ public class ExportApp extends ExportingToolBase {
         
         applicationName = line.getOptionValue( APPLICATION_NAME );
 
-        if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
-            try {
-                readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
-            } catch (NumberFormatException nfe) {
-                logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
-                return;
-            }
-        }
-        
         if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
             try {
                 writeThreadCount = Integer.parseInt( line.getOptionValue( WRITE_THREAD_COUNT ) );
@@ -144,9 +127,6 @@ public class ExportApp extends ExportingToolBase {
         final EntityManager em = emf.getEntityManager( applicationId );
         organizationName = em.getApplication().getOrganizationName();
 
-        ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
-        readScheduler = Schedulers.from( readThreadPoolExecutor );
-
         ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
         writeScheduler = Schedulers.from( writeThreadPoolExecutor );
 
@@ -155,19 +135,18 @@ public class ExportApp extends ExportingToolBase {
         collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
             
             public Observable<ExportEntity> call(String collection) {
-                return Observable.create( new EntityObservable( em, collection ))
+                return Observable.create( new EntityObservable( em, collection ) )
                         .doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
             }
-            
-        }, 10).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
-            
+
+        }, writeThreadCount ).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
+
             public Observable<ExportConnection> call(ExportEntity exportEntity) {
-                return Observable.create( new ConnectionsObservable( em, exportEntity ))
+                return Observable.create( new ConnectionsObservable( em, exportEntity ) )
                         .doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
             }
-            
-        }, 10)
-            .subscribeOn( readScheduler )
+
+        }, writeThreadCount)
             .doOnCompleted( new FileWrapUpAction() )
             .toBlocking().last();
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b58390d9/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index c5411fd..a1e3f6b 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -24,7 +24,6 @@ import org.apache.usergrid.management.ApplicationInfo;
 import org.apache.usergrid.management.OrganizationOwnerInfo;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
-import org.junit.Assert;
 import org.junit.ClassRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -132,9 +131,9 @@ public class ExportAppTest {
         
         File exportDir = new File(directoryName);
         assertTrue( getFileCount( exportDir, "entities"    ) > 0 );
-        assertTrue( getFileCount( exportDir, "collections" ) > 0 );
-        assertTrue( getFileCount( exportDir, "entities" ) >= 100 );
-        assertTrue( getFileCount( exportDir, "collections" ) >= 100 );
+        assertTrue( getFileCount( exportDir, "connections" ) > 0 );
+        assertTrue( getFileCount( exportDir, "entities"    ) <= 100 );
+        assertTrue( getFileCount( exportDir, "connections" ) <= 100 );
 
         File exportDir1 = new File(directoryName + "1");
         exportApp.startTool( new String[]{
@@ -147,9 +146,8 @@ public class ExportAppTest {
 
         logger.info( "1 thread time = " + (System.currentTimeMillis() - start) / 1000 + "s" );
 
-        exportDir = new File(directoryName);
-        assertEquals( 1, getFileCount( exportDir, "entities" ));
-        assertEquals( 1, getFileCount( exportDir, "collections" ));
+        assertEquals( 1, getFileCount( exportDir1, "entities" ));
+        assertEquals( 1, getFileCount( exportDir1, "connections" ));
     }
 
     private static int getFileCount(File exportDir, final String ext ) {


[10/18] incubator-usergrid git commit: More consistency in logging about admin users, using username : email : uuid format.

Posted by sn...@apache.org.
More consistency in logging about admin users, using username : email : uuid format.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4f8aa2f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4f8aa2f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4f8aa2f3

Branch: refs/heads/two-dot-o-dev
Commit: 4f8aa2f36fca8c661d627f1748e3994e62107ce4
Parents: 125ffe9
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Jul 17 17:39:25 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Jul 17 17:39:25 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ImportAdmins.java | 29 ++++++++++++++------
 1 file changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4f8aa2f3/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
index 7e08a4c..f39ef9b 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
@@ -462,7 +462,13 @@ public class ImportAdmins extends ToolBase {
                     } else { // org exists, add original user to it
                         try {
                             managementService.addAdminUserToOrganization( userInfo, orgInfo, false );
-                            logger.debug( "Added user {} to org {}", new Object[]{user.getEmail(), orgName} );
+                            logger.debug( "Added to org user {}:{}:{}",
+                                    new Object[]{
+                                            orgInfo.getName(),
+                                            user.getUsername(),
+                                            user.getEmail(),
+                                            user.getUuid()
+                                    });
 
                         } catch (Exception e) {
                             logger.error( "Error Adding user {} to org {}", new Object[]{user.getEmail(), orgName} );
@@ -533,7 +539,9 @@ public class ImportAdmins extends ToolBase {
                             logger.debug( "Created new org {} for user {}:{}:{} from duplicate user {}:{}",
                                 new Object[]{
                                         orgInfo.getName(),
-                                        originalUser.getUuid(), originalUser.getUsername(), originalUser.getEmail(),
+                                        originalUser.getUsername(), 
+                                        originalUser.getEmail(),
+                                        originalUser.getUuid(), 
                                         dup.username, dup.email
                                 });
 
@@ -546,7 +554,9 @@ public class ImportAdmins extends ToolBase {
                             logger.debug( "Added to org user {}:{}:{} from duplicate user {}:{}",
                                     new Object[]{
                                             orgInfo.getName(),
-                                            originalUser.getUuid(), originalUser.getUsername(), originalUser.getEmail(),
+                                            originalUser.getUsername(), 
+                                            originalUser.getEmail(),
+                                            originalUser.getUuid(), 
                                             dup.username, dup.email
                                     });
 
@@ -686,7 +696,7 @@ public class ImportAdmins extends ToolBase {
 
             while (!done) {
                 try {
-                    ImportMetadataTask task = this.workQueue.poll(30, TimeUnit.SECONDS);
+                    ImportMetadataTask task = this.workQueue.poll( 30, TimeUnit.SECONDS );
 
                     if (task == null) {
                         logger.warn("Reading from metadata queue was null!");
@@ -694,11 +704,11 @@ public class ImportAdmins extends ToolBase {
                         Thread.sleep(1000);
                         continue;
                     }
-                    metadataEmptyCount.set(0);
+                    metadataEmptyCount.set( 0 );
                     
                     long startTime = System.currentTimeMillis();
                     
-                    importEntityMetadata(em, task.entityRef, task.metadata);
+                    importEntityMetadata( em, task.entityRef, task.metadata );
                     
                     long stopTime = System.currentTimeMillis();
                     long duration = stopTime - startTime;
@@ -770,7 +780,7 @@ public class ImportAdmins extends ToolBase {
                         em.create(uuid, type, entityProps);
 
                         logger.debug( "Imported admin user {}:{}:{}",
-                            new Object[] { uuid, entityProps.get( "username" ), entityProps.get("email") } );
+                            new Object[] { entityProps.get( "username" ), entityProps.get("email"), uuid } );
 
                         userCount.getAndIncrement();
                         auditQueue.put(entityProps);
@@ -805,7 +815,10 @@ public class ImportAdmins extends ToolBase {
         private void handleDuplicateAccount(EntityManager em, String dupProperty, Map<String, Object> entityProps ) {
 
             logger.info( "Processing duplicate user {}:{}:{} with duplicate {}", new Object[]{
-                    entityProps.get( "uuid" ), entityProps.get( "username" ), entityProps.get( "email" ), dupProperty} );
+                    entityProps.get( "username" ), 
+                    entityProps.get( "email" ), 
+                    entityProps.get( "uuid" ), 
+                    dupProperty} );
            
             UUID dupUuid = UUID.fromString( entityProps.get("uuid").toString() );
             try {


[14/18] incubator-usergrid git commit: merge

Posted by sn...@apache.org.
merge


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9b9f55ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9b9f55ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9b9f55ac

Branch: refs/heads/two-dot-o-dev
Commit: 9b9f55ac20f669451dadc7cdeeb4643ac0614ad4
Parents: a6e68a0 897fd50
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Jul 21 14:09:05 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Jul 21 14:09:05 2015 -0600

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportAdmins.java | 118 ++++++----
 .../org/apache/usergrid/tools/ImportAdmins.java | 226 ++++++++++++++-----
 .../usergrid/tools/ExportImportAdminsTest.java  |  71 ++++--
 ...adata.usergrid-management.1433331614293.json |  52 +++++
 ...users.usergrid-management.1433331614293.json |  12 +
 5 files changed, 358 insertions(+), 121 deletions(-)
----------------------------------------------------------------------