You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/08/22 16:16:08 UTC

[72/95] [abbrv] git commit: pushing to build war file and test, might have several bugs

pushing to build war file and test, might have several bugs


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2f76c868
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2f76c868
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2f76c868

Branch: refs/heads/import-feature
Commit: 2f76c86879eacf20b7e4f8d7fad162b737c238bb
Parents: e877528
Author: Harish Rajagopal <hr...@apigee.com>
Authored: Fri Aug 1 15:29:47 2014 -0700
Committer: Harish Rajagopal <hr...@apigee.com>
Committed: Fri Aug 1 15:29:47 2014 -0700

----------------------------------------------------------------------
 stack/core/pom.xml                              |  12 +
 stack/pom.xml                                   |   1 +
 .../management/importUG/ImportServiceImpl.java  | 229 +++++++++++++------
 .../management/cassandra/ImportServiceIT.java   |  15 +-
 4 files changed, 173 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2f76c868/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index dad4a6b..aa861d6 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -559,5 +559,17 @@
       <artifactId>spring-test</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>com.netflix.rxjava</groupId>
+      <artifactId>rxjava-core</artifactId>
+      <version>0.19.6</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.netflix.rxjava</groupId>
+      <artifactId>rxjava-math</artifactId>
+      <version>0.19.6</version>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2f76c868/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index fbd0f8d..dc84473 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -113,6 +113,7 @@
     <tomcat-version>7.0.42</tomcat-version>
     <antlr.version>3.4</antlr.version>
     <tika.version>1.4</tika.version>
+    <rx.version>0.19.6</rx.version>
 
   </properties>
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2f76c868/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
index 1cfd7ba..f8ff996 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
@@ -35,6 +35,10 @@ import org.codehaus.jackson.JsonToken;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
 
 import java.io.File;
 import java.io.IOException;
@@ -128,6 +132,8 @@ public class ImportServiceImpl implements ImportService {
         return importUG.getUuid();
     }
 
+
+
     /**
      * Query Entity Manager for the string state of the Import Entity. This corresponds to the GET /import
      *
@@ -180,6 +186,7 @@ public class ImportServiceImpl implements ImportService {
     }
 
 
+    //@Override
     public Import getImportEntity( final JobExecution jobExecution ) throws Exception {
 
         UUID importId = ( UUID ) jobExecution.getJobData().getProperty( IMPORT_ID );
@@ -188,6 +195,7 @@ public class ImportServiceImpl implements ImportService {
         return importManager.get( importId, Import.class );
     }
 
+
     @Override
     public ArrayList<File> getEphemeralFile() {
         return files;
@@ -476,9 +484,9 @@ public class ImportServiceImpl implements ImportService {
                 if(!lastUpdatedUUID.equals(""))
                 {
                     // go till the last updated entity
-                     while(!jp.getText().equals(lastUpdatedUUID)) {
-                         jp.nextToken();
-                     }
+                    while(!jp.getText().equals(lastUpdatedUUID)) {
+                        jp.nextToken();
+                    }
 
                     // skip the last one and start from teh next one
                     while(!(jp.getCurrentToken()==JsonToken.END_OBJECT && jp.nextToken() == JsonToken.START_OBJECT)) {
@@ -527,116 +535,185 @@ public class ImportServiceImpl implements ImportService {
         return valid;
     }
 
+
     private JsonParser getJsonParserForFile( File collectionFile ) throws Exception {
         JsonParser jp = jsonFactory.createJsonParser( collectionFile );
         jp.setCodec( new ObjectMapper() );
         return jp;
     }
 
+
+ private static class EntityWrapper{
+     UUID entityUuid;
+     String entityType;
+     Map<String, Object> properties;
+     EntityWrapper(UUID entityUuid, String entityType,Map<String, Object> properties){
+         this.entityUuid = entityUuid;
+         this.entityType = entityType;
+         this.properties = properties;
+     }
+ }
     /**
      * Imports the entity's connecting references (collections, connections and dictionaries)
      *
      * @param jp JsonPrser pointing to the beginning of the object.
      */
-    private void importEntityStuff( JsonParser jp, EntityManager em, EntityManager rootEm, Import importUG, int index) throws Exception {
+    private void importEntityStuff(final JsonParser jp, final EntityManager em, EntityManager rootEm, Import importUG, int index) throws Exception {
 
-        ArrayList fileNames = (ArrayList) importUG.getDynamicProperties().get("files");
-        Entity entity = null;
-        EntityRef ownerEntityRef=null;
-        String entityUuid="";
-        String entityType="";
+        final JsonParserObservable subscribe = new JsonParserObservable(jp,em,rootEm,importUG, index);
 
-        // Go inside the value after getting the owner entity id.
-        while (jp.nextToken() != JsonToken.END_OBJECT) {
+        final Observable<EntityWrapper> observable = Observable.create(subscribe);
 
-            String collectionName = jp.getCurrentName();
+        final Action1<EntityWrapper> doWork = new Action1<EntityWrapper>() {
+            @Override
+            public void call(EntityWrapper jsonEntity){
+                try {
+                            em.create(jsonEntity.entityUuid, jsonEntity.entityType, jsonEntity.properties);
+                            em.getRef(jsonEntity.entityUuid);
 
-            try {
-                // create the connections
-                if (collectionName.equals("connections")) {
+                }catch (Exception e) {
+                }
+            }
 
-                    jp.nextToken(); // START_OBJECT
-                    while (jp.nextToken() != JsonToken.END_OBJECT) {
-                        String connectionType = jp.getCurrentName();
+        };
 
-                        jp.nextToken(); // START_ARRAY
-                        while (jp.nextToken() != JsonToken.END_ARRAY) {
-                            String entryId = jp.getText();
+        observable.parallel(new Func1<Observable<EntityWrapper>, Observable<EntityWrapper>>() {
+            @Override
+            public Observable< EntityWrapper> call(Observable<EntityWrapper> entityWrapperObservable) {
+                return entityWrapperObservable.doOnNext(doWork);
+            }
+        });
 
-                            EntityRef entryRef = em.getRef(UUID.fromString(entryId));
-                            // Store in DB
-                            em.createConnection(ownerEntityRef, connectionType, entryRef);
-                        }
-                    }
-                }
-                // add dictionaries
-                else if (collectionName.equals("dictionaries")) {
 
-                    jp.nextToken(); // START_OBJECT
-                    while (jp.nextToken() != JsonToken.END_OBJECT) {
+    }
 
-                        String dictionaryName = jp.getCurrentName();
 
-                        jp.nextToken();
+    private static final class JsonParserObservable implements Observable.OnSubscribe<EntityWrapper> {
+        private final JsonParser jp;
+        EntityManager em;
+        EntityManager rootEm;
+        Import importUG;
+        int index;
 
-                        @SuppressWarnings("unchecked") Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
 
-                        em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
-                    }
-                } else {
-                    // Regular collections
-                    jp.nextToken(); // START_OBJECT
+        private int entityCount = 0;
+
+        JsonParserObservable(JsonParser parser, EntityManager em, EntityManager rootEm, Import importUG, int index ) {
+            this.jp = parser;
+            this.em = em;
+            this.rootEm = rootEm;
+            this.importUG = importUG;
+            this.index = index;
+        }
+
+        @Override
+        public void call(final Subscriber<? super EntityWrapper> subscriber) {
+            ArrayList fileNames = (ArrayList) importUG.getDynamicProperties().get("files");
+
+            EntityWrapper entityWrapper = null;
+            // while(entityWrapper != null && jp.nextToken() != JsonToken.END_OBJECT) {
+            Entity entity = null;
+            EntityRef ownerEntityRef = null;
+            String entityUuid = "";
+            String entityType = "";
+            try {
+                //JsonToken token = jp.nextToken();
+                while (!subscriber.isUnsubscribed() && jp.nextToken() != JsonToken.END_OBJECT) {
 
-                    Map<String, Object> properties = new HashMap<String, Object>();
+                    String collectionName = jp.getCurrentName();
 
-                    JsonToken token = jp.nextToken();
+                    try {
+                        // create the connections
+                        if (collectionName.equals("connections")) {
 
-                    while (token != JsonToken.END_OBJECT) {
-                        if (token == JsonToken.VALUE_STRING || token == JsonToken.VALUE_NUMBER_INT) {
-                            String key = jp.getCurrentName();
-                            if (key.equals("uuid")) {
-                                entityUuid = jp.getText();
+                            jp.nextToken(); // START_OBJECT
+                            while (jp.nextToken() != JsonToken.END_OBJECT) {
+                                String connectionType = jp.getCurrentName();
 
-                            } else if (key.equals("type")) {
-                                entityType = jp.getText();
-                            } else if (key.length() != 0 && jp.getText().length() != 0) {
-                                String value = jp.getText();
-                                properties.put(key, value);
+                                jp.nextToken(); // START_ARRAY
+                                while (jp.nextToken() != JsonToken.END_ARRAY) {
+                                    String entryId = jp.getText();
+
+                                    EntityRef entryRef = em.getRef(UUID.fromString(entryId));
+                                    // Store in DB
+                                    em.createConnection(ownerEntityRef, connectionType, entryRef);
+                                }
+                            }
+                        }
+                        // add dictionaries
+                        else if (collectionName.equals("dictionaries")) {
+
+                            jp.nextToken(); // START_OBJECT
+                            while (jp.nextToken() != JsonToken.END_OBJECT) {
+
+                                String dictionaryName = jp.getCurrentName();
+
+                                jp.nextToken();
+
+                                @SuppressWarnings("unchecked") Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
+
+                                em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
+                            }
+                        } else {
+                            // Regular collections
+                            jp.nextToken(); // START_OBJECT
+
+                            Map<String, Object> properties = new HashMap<String, Object>();
+
+                            JsonToken token = jp.nextToken();
+
+                            while (token != JsonToken.END_OBJECT) {
+                                if (token == JsonToken.VALUE_STRING || token == JsonToken.VALUE_NUMBER_INT) {
+                                    String key = jp.getCurrentName();
+                                    if (key.equals("uuid")) {
+                                        entityUuid = jp.getText();
+
+                                    } else if (key.equals("type")) {
+                                        entityType = jp.getText();
+                                    } else if (key.length() != 0 && jp.getText().length() != 0) {
+                                        String value = jp.getText();
+                                        properties.put(key, value);
+                                    }
+                                }
+                                token = jp.nextToken();
                             }
+                            entityWrapper = new EntityWrapper(UUID.fromString(entityUuid), entityType, properties);
+                            subscriber.onNext(entityWrapper);
+                            ownerEntityRef = em.getRef(UUID.fromString(entityUuid));
+                            //break;
                         }
-                        token = jp.nextToken();
+                    } catch (IllegalArgumentException e) {
+                        // skip illegal entity UUID and go to next one
+                        ((Map<String, Object>) fileNames.get(index)).put("Entity Creation Error", e.getMessage());
+                        rootEm.update(importUG);
+                    } catch (Exception e) {
+                        // skip illegal entity UUID and go to next one
+                        ((Map<String, Object>) fileNames.get(index)).put("Miscellaneous Error", e.getMessage());
+                        rootEm.update(importUG);
                     }
+                }
 
-                    entity = em.create(UUID.fromString(entityUuid), entityType, properties);
-                    ownerEntityRef = em.getRef(UUID.fromString(entityUuid));
+                // update the last updated entity
+                if (entity != null) {
+                    entityCount++;
+                    if (entityCount == 2000) {
+                        ((Map<String, Object>) fileNames.get(index)).put("lastUpdatedUUID", entityUuid);
+                        rootEm.update(importUG);
+                        entityCount = 0;
+                    }
                 }
+
+
+            } catch (Exception e) {
+
             }
-            catch (IllegalArgumentException e) {
-                // skip illegal entity UUID and go to next one
-                ((Map<String, Object>) fileNames.get(index)).put("Entity Creation Error", e.getMessage());
-                rootEm.update(importUG);
-            }
-            catch (Exception e) {
-                // skip illegal entity UUID and go to next one
-                ((Map<String, Object>) fileNames.get(index)).put("Miscellaneous Error", e.getMessage());
-                rootEm.update(importUG);
-            }
-        }
-        // update the last updated entity
-        if(entity != null) {
-            entityCount++;
-            if(entityCount == 2000) {
-                ((Map<String, Object>) fileNames.get(index)).put("lastUpdatedUUID", entityUuid);
-                rootEm.update(importUG);
-                entityCount = 0;
-            }
+
         }
     }
+
 }
 
-/**
- * custom exceptions
- */
+
 class OrganizationNotFoundException extends Exception {
     OrganizationNotFoundException(String s) {
         super(s);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2f76c868/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java
index e5330a1..04d1310 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java
@@ -109,9 +109,9 @@ public class ImportServiceIT {
         emTest.createConnection( emTest.getRef(entityTest[1].getUuid()), "related", emTest.getRef( entityTest[0].getUuid()));
     }
 
-    @Ignore //For this test please input your s3 credentials into settings.xml or Attach a -D with relevant fields.
+//    @Ignore //For this test please input your s3 credentials into settings.xml or Attach a -D with relevant fields.
     // test case to check if a collection file is imported correctly
-    //@Test
+    @Test
     public void testIntegrationImportCollection() throws Exception {
 
         // //creates 5 entities in user collection
@@ -124,9 +124,9 @@ public class ImportServiceIT {
         //creates entities
         for ( int i = 0; i < 5; i++ ) {
             userProperties = new LinkedHashMap<String, Object>();
-            userProperties.put( "username", "user" + i );
-            userProperties.put( "email", "user" + i + "@test.com" );
-            entity[i] = em.create( "users", userProperties );
+            userProperties.put( "username", "yabauser" + i );
+            userProperties.put( "email", "yavauser" + i + "@test.com" );
+            entity[i] = em.create( "yabas", userProperties );
         }
 
         //creates test connections between first 2 users
@@ -140,7 +140,7 @@ public class ImportServiceIT {
 
         payload.put( "organizationId",  organization.getUuid());
         payload.put( "applicationId", applicationId );
-        payload.put("collectionName", "users");
+        payload.put("collectionName", "yabas");
 
         // schdeule the export job
         UUID exportUUID = exportService.schedule( payload );
@@ -178,13 +178,12 @@ public class ImportServiceIT {
         }
         try {
 
-
             //checks if temp import files are created i.e. downloaded from S3
             assertThat(importService.getEphemeralFile().size(), is(not(0)));
 
             //check if entities are actually updated i.e. created and modified should be different
             //EntityManager em = setup.getEmf().getEntityManager(applicationId);
-            Results collections = em.getCollection(applicationId, "users", null, Results.Level.ALL_PROPERTIES);
+            Results collections = em.getCollection(applicationId, "yabas", null, Results.Level.ALL_PROPERTIES);
             List<Entity> entities = collections.getEntities();
 
             // check if connections are created for only the 1st 2 entities in user collection