You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/07/24 17:21:43 UTC

[15/50] [abbrv] incubator-usergrid git commit: Some reformatting. Also eliminating use of subscriber.unsubscribe(). All observables need to wrap up with onCompleted().

Some reformatting. Also eliminating use of subscriber.unsubscribe(). All observables need to wrap up with onCompleted().


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a25f8ebc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a25f8ebc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a25f8ebc

Branch: refs/heads/USERGRID-869
Commit: a25f8ebc2877681897a5ef945d39b685c2d3f9fa
Parents: 7a870d6
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 15:31:07 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 15:31:07 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportApp.java    | 113 ++++++++++---------
 stack/tools/src/main/resources/log4j.properties |   5 +-
 .../apache/usergrid/tools/ExportAppTest.java    |  89 ++++-----------
 3 files changed, 81 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a25f8ebc/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index 59509c0..c302a74 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -16,7 +16,6 @@
  */
 package org.apache.usergrid.tools;
 
-
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
@@ -82,8 +81,30 @@ public class ExportApp extends ExportingToolBase {
     // set via CLI
     int readThreadCount = 80;
     int writeThreadCount = 10; // limiting write will limit output files 
-    
 
+
+    @Override
+    @SuppressWarnings("static-access")
+    public Options createOptions() {
+
+        Options options = super.createOptions();
+
+        Option appNameOption = OptionBuilder.hasArg().withType("")
+                .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
+        options.addOption( appNameOption );
+
+        Option readThreadsOption = OptionBuilder.hasArg().withType(0)
+                .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+        options.addOption( readThreadsOption );
+
+        Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
+                .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+        options.addOption( writeThreadsOption );
+
+        return options;
+    }
+
+    
     /**
      * Tool entry point. 
      */
@@ -110,25 +131,25 @@ public class ExportApp extends ExportingToolBase {
             }
         }
 
-        ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
-        readScheduler = Schedulers.from( readThreadPoolExecutor );
-        
-        ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
-        writeScheduler = Schedulers.from( writeThreadPoolExecutor );
-       
-        startSpring();
-
         setVerbose( line );
 
         applyOrgId( line );
         prepareBaseOutputFileName( line );
         outputDir = createOutputParentDir();
         logger.info( "Export directory: " + outputDir.getAbsolutePath() );
-       
+
+        startSpring();
+        
         UUID applicationId = emf.lookupApplication( applicationName );
         final EntityManager em = emf.getEntityManager( applicationId );
         organizationName = em.getApplication().getOrganizationName();
 
+        ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
+        readScheduler = Schedulers.from( readThreadPoolExecutor );
+
+        ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
+        writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+
         Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
         
         collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
@@ -151,30 +172,11 @@ public class ExportApp extends ExportingToolBase {
             .toBlocking().last();
     }
 
-    @Override
-    @SuppressWarnings("static-access")
-    public Options createOptions() {
-
-        Options options = super.createOptions();
-
-        Option appNameOption = OptionBuilder.hasArg().withType("")
-                .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
-        options.addOption( appNameOption );
-
-        Option readThreadsOption = OptionBuilder.hasArg().withType(0)
-                .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
-        options.addOption( readThreadsOption );
-
-        Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
-                .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
-        options.addOption( writeThreadsOption );
-
-        return options;
-    }
 
     // ----------------------------------------------------------------------------------------
     // reading data
 
+    
     /**
      * Emits collection names found in application.
      */
@@ -198,16 +200,13 @@ public class ExportApp extends ExportingToolBase {
             } catch (Exception e) {
                 subscriber.onError( e );
             }
-            if ( count > 0 ) {
-                subscriber.onCompleted();
-                logger.info( "Completed. Read {} collection names", count );
-            } else {
-                subscriber.unsubscribe();
-                logger.info( "No collections found" );
-            }
+            
+            subscriber.onCompleted();
+            logger.info( "Completed. Read {} collection names", count );
         }
     }
 
+    
     /**
      * Emits entities of collection.
      */
@@ -267,13 +266,8 @@ public class ExportApp extends ExportingToolBase {
                     results = em.searchCollection( em.getApplicationRef(), collection, query );
                 }
 
-                if ( count > 0 ) {
-                    subscriber.onCompleted();
-                    logger.info("Completed collection {}. Read {} entities", collection, count);
-                } else {
-                    logger.info("Completed collection {} empty", collection );
-                    subscriber.unsubscribe();
-                }
+                subscriber.onCompleted();
+                logger.info("Completed collection {}. Read {} entities", collection, count);
                 
             } catch ( Exception e ) {
                 subscriber.onError(e);
@@ -281,6 +275,7 @@ public class ExportApp extends ExportingToolBase {
         }
     }
 
+    
     /**
      * Emits connections of an entity.
      */
@@ -331,19 +326,17 @@ public class ExportApp extends ExportingToolBase {
                 subscriber.onError( e );
             }
             
-            if ( count > 0 ) {
-                subscriber.onCompleted();
-                logger.info("Completed entity {} type {} connections count {}",
-                        new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
-                
-            } else {
-                subscriber.unsubscribe();
-                logger.info( "Entity {} type {} has no connections",
-                        exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
-            }
+            subscriber.onCompleted();
+            logger.info("Completed entity {} type {} connections count {}",
+                new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
         }
     }
 
+    
+    // ----------------------------------------------------------------------------------------
+    // writing data
+    
+    
     /**
      * Writes entities to JSON file.
      */
@@ -381,6 +374,7 @@ public class ExportApp extends ExportingToolBase {
         }
     }
 
+    
     /**
      * Writes connection to JSON file.
      */
@@ -418,6 +412,7 @@ public class ExportApp extends ExportingToolBase {
         }
     }
 
+    
     private class FileWrapUpAction implements Action0 {
         @Override
         public void call() {
@@ -448,6 +443,10 @@ public class ExportApp extends ExportingToolBase {
     }
 }
 
+
+/**
+ * Represents entity data to be serialized to JSON.
+ */
 class ExportEntity {
     private String organization;
     private String application;
@@ -493,6 +492,10 @@ class ExportEntity {
     }
 }
 
+
+/**
+ * Represents connection data to be serialized to JSON.
+ */
 class ExportConnection {
     private String organization;
     private String application;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a25f8ebc/stack/tools/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/resources/log4j.properties b/stack/tools/src/main/resources/log4j.properties
index 6cf0a92..def47b4 100644
--- a/stack/tools/src/main/resources/log4j.properties
+++ b/stack/tools/src/main/resources/log4j.properties
@@ -18,7 +18,7 @@
 # and the pattern to %c instead of %l.  (%l is slower.)
 
 # output messages into a rolling log file as well as stdout
-log4j.rootLogger=WARN,stdout
+log4j.rootLogger=ERROR,stdout
 
 # stdout
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
@@ -26,7 +26,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) [%c] - %m%n
 
-log4j.logger.org.apache.usergrid.tools=INFO
+log4j.logger.org.apache.usergrid=INFO
+log4j.logger.org.apache.usergrid.tools=DEBUG
 
 log4j.logger.org.apache.usergrid.management.cassandra=WARN
 log4j.logger.org.apache.usergrid.persistence.cassandra.DB=WARN

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a25f8ebc/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index af8306f..d1c5c1f 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -27,10 +27,7 @@ import org.apache.usergrid.persistence.EntityManager;
 import org.junit.ClassRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import rx.Observable;
 import rx.Scheduler;
-import rx.functions.Action1;
-import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
 import java.util.ArrayList;
@@ -44,9 +41,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class ExportAppTest {
     static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
     
-    int NUM_COLLECTIONS = 5;
-    int NUM_ENTITIES = 10; 
-    int NUM_CONNECTIONS = 1;
+    int NUM_COLLECTIONS = 20;
+    int NUM_ENTITIES = 200; 
+    int NUM_CONNECTIONS = 5;
 
     @ClassRule
     public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
@@ -86,68 +83,25 @@ public class ExportAppTest {
         ExecutorService execService = Executors.newFixedThreadPool( 50);
         final Scheduler scheduler = Schedulers.from( execService );
 
-        Observable.range( 0, NUM_COLLECTIONS ).flatMap( new Func1<Integer, Observable<?>>() {
-            @Override
-            public Observable<?> call(Integer i) {
-                
-                return Observable.just( i ).doOnNext( new Action1<Integer>() {
-                    @Override
-                    public void call(Integer i) {
-                        
-                        final String type = "thing_"+i;
-                        try {
-                            em.createApplicationCollection( type );
-                            connectionCount.getAndIncrement();
-                            
-                        } catch (Exception e) {
-                            throw new RuntimeException( "Error creating collection", e );
-                        }
-                       
-                        Observable.range( 0, NUM_ENTITIES ).flatMap( new Func1<Integer, Observable<?>>() {
-                            @Override
-                            public Observable<?> call(Integer j) {
-                                return Observable.just( j ).doOnNext( new Action1<Integer>() {
-                                    @Override
-                                    public void call(Integer j) {
-                                        
-                                        final String name = "thing_" + j;
-                                        try {
-                                            final Entity source = em.create( 
-                                                    type, new HashMap<String, Object>() {{ put("name", name); }});
-                                            entitiesCount.getAndIncrement();
-                                            logger.info( "Created entity {} type {}", name, type );
-                                            
-                                            for ( Entity target : connectedThings ) {
-                                                em.createConnection( source, "has", target );
-                                                connectionCount.getAndIncrement();
-                                                logger.info( "Created connection from entity {} type {} to {}",
-                                                        new Object[]{name, type, target.getName()} );
-                                            }
-
-
-                                        } catch (Exception e) {
-                                            throw new RuntimeException( "Error creating collection", e );
-                                        }
-                                        
-                                        
-                                    }
-                                    
-                                } );
-
-                            }
-                        }, 50 ).subscribeOn( scheduler ).subscribe(); // toBlocking().last();
-                        
-                    }
-                } );
-                
+        for (int i = 0; i < NUM_COLLECTIONS; i++) {
 
-            }
-        }, 30 ).subscribeOn( scheduler ).toBlocking().last();
+            final String type = "thing_" + i;
+            em.createApplicationCollection( type );
+            connectionCount.getAndIncrement();
+
+            for (int j = 0; j < NUM_ENTITIES; j++) {
+                final String name = "thing_" + j;
+                final Entity source = em.create(
+                        type, new HashMap<String, Object>() {{
+                    put( "name", name );
+                }} );
+                entitiesCount.getAndIncrement();
 
-        while ( entitiesCount.get() < NUM_COLLECTIONS * NUM_ENTITIES ) {
-            Thread.sleep( 5000 );
-            logger.info( "Still working. Created {} entities and {} connections", 
-                    entitiesCount.get(), connectionCount.get() );
+                for (Entity target : connectedThings) {
+                    em.createConnection( source, "has", target );
+                    connectionCount.getAndIncrement();
+                }
+            }
         }
 
         logger.info( "Done. Created {} entities and {} connections", entitiesCount.get(), connectionCount.get() );
@@ -166,8 +120,5 @@ public class ExportAppTest {
         }, false );
         
         logger.info("time = " + (System.currentTimeMillis() - start)/1000 + "s");
-
-
-        
     }
 }
\ No newline at end of file