You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/08/22 16:16:08 UTC
[72/95] [abbrv] git commit: pushing to build war file and test,
might have several bugs
pushing to build war file and test, might have several bugs
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2f76c868
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2f76c868
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2f76c868
Branch: refs/heads/import-feature
Commit: 2f76c86879eacf20b7e4f8d7fad162b737c238bb
Parents: e877528
Author: Harish Rajagopal <hr...@apigee.com>
Authored: Fri Aug 1 15:29:47 2014 -0700
Committer: Harish Rajagopal <hr...@apigee.com>
Committed: Fri Aug 1 15:29:47 2014 -0700
----------------------------------------------------------------------
stack/core/pom.xml | 12 +
stack/pom.xml | 1 +
.../management/importUG/ImportServiceImpl.java | 229 +++++++++++++------
.../management/cassandra/ImportServiceIT.java | 15 +-
4 files changed, 173 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2f76c868/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index dad4a6b..aa861d6 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -559,5 +559,17 @@
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.netflix.rxjava</groupId>
+ <artifactId>rxjava-core</artifactId>
+ <version>0.19.6</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.netflix.rxjava</groupId>
+ <artifactId>rxjava-math</artifactId>
+ <version>0.19.6</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2f76c868/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index fbd0f8d..dc84473 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -113,6 +113,7 @@
<tomcat-version>7.0.42</tomcat-version>
<antlr.version>3.4</antlr.version>
<tika.version>1.4</tika.version>
+ <rx.version>0.19.6</rx.version>
</properties>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2f76c868/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
index 1cfd7ba..f8ff996 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
@@ -35,6 +35,10 @@ import org.codehaus.jackson.JsonToken;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
import java.io.File;
import java.io.IOException;
@@ -128,6 +132,8 @@ public class ImportServiceImpl implements ImportService {
return importUG.getUuid();
}
+
+
/**
* Query Entity Manager for the string state of the Import Entity. This corresponds to the GET /import
*
@@ -180,6 +186,7 @@ public class ImportServiceImpl implements ImportService {
}
+ //@Override
public Import getImportEntity( final JobExecution jobExecution ) throws Exception {
UUID importId = ( UUID ) jobExecution.getJobData().getProperty( IMPORT_ID );
@@ -188,6 +195,7 @@ public class ImportServiceImpl implements ImportService {
return importManager.get( importId, Import.class );
}
+
@Override
public ArrayList<File> getEphemeralFile() {
return files;
@@ -476,9 +484,9 @@ public class ImportServiceImpl implements ImportService {
if(!lastUpdatedUUID.equals(""))
{
// go till the last updated entity
- while(!jp.getText().equals(lastUpdatedUUID)) {
- jp.nextToken();
- }
+ while(!jp.getText().equals(lastUpdatedUUID)) {
+ jp.nextToken();
+ }
// skip the last one and start from teh next one
while(!(jp.getCurrentToken()==JsonToken.END_OBJECT && jp.nextToken() == JsonToken.START_OBJECT)) {
@@ -527,116 +535,185 @@ public class ImportServiceImpl implements ImportService {
return valid;
}
+
private JsonParser getJsonParserForFile( File collectionFile ) throws Exception {
JsonParser jp = jsonFactory.createJsonParser( collectionFile );
jp.setCodec( new ObjectMapper() );
return jp;
}
+
+ private static class EntityWrapper{
+ UUID entityUuid;
+ String entityType;
+ Map<String, Object> properties;
+ EntityWrapper(UUID entityUuid, String entityType,Map<String, Object> properties){
+ this.entityUuid = entityUuid;
+ this.entityType = entityType;
+ this.properties = properties;
+ }
+ }
/**
* Imports the entity's connecting references (collections, connections and dictionaries)
*
* @param jp JsonPrser pointing to the beginning of the object.
*/
- private void importEntityStuff( JsonParser jp, EntityManager em, EntityManager rootEm, Import importUG, int index) throws Exception {
+ private void importEntityStuff(final JsonParser jp, final EntityManager em, EntityManager rootEm, Import importUG, int index) throws Exception {
- ArrayList fileNames = (ArrayList) importUG.getDynamicProperties().get("files");
- Entity entity = null;
- EntityRef ownerEntityRef=null;
- String entityUuid="";
- String entityType="";
+ final JsonParserObservable subscribe = new JsonParserObservable(jp,em,rootEm,importUG, index);
- // Go inside the value after getting the owner entity id.
- while (jp.nextToken() != JsonToken.END_OBJECT) {
+ final Observable<EntityWrapper> observable = Observable.create(subscribe);
- String collectionName = jp.getCurrentName();
+ final Action1<EntityWrapper> doWork = new Action1<EntityWrapper>() {
+ @Override
+ public void call(EntityWrapper jsonEntity){
+ try {
+ em.create(jsonEntity.entityUuid, jsonEntity.entityType, jsonEntity.properties);
+ em.getRef(jsonEntity.entityUuid);
- try {
- // create the connections
- if (collectionName.equals("connections")) {
+ }catch (Exception e) {
+ }
+ }
- jp.nextToken(); // START_OBJECT
- while (jp.nextToken() != JsonToken.END_OBJECT) {
- String connectionType = jp.getCurrentName();
+ };
- jp.nextToken(); // START_ARRAY
- while (jp.nextToken() != JsonToken.END_ARRAY) {
- String entryId = jp.getText();
+ observable.parallel(new Func1<Observable<EntityWrapper>, Observable<EntityWrapper>>() {
+ @Override
+ public Observable< EntityWrapper> call(Observable<EntityWrapper> entityWrapperObservable) {
+ return entityWrapperObservable.doOnNext(doWork);
+ }
+ });
- EntityRef entryRef = em.getRef(UUID.fromString(entryId));
- // Store in DB
- em.createConnection(ownerEntityRef, connectionType, entryRef);
- }
- }
- }
- // add dictionaries
- else if (collectionName.equals("dictionaries")) {
- jp.nextToken(); // START_OBJECT
- while (jp.nextToken() != JsonToken.END_OBJECT) {
+ }
- String dictionaryName = jp.getCurrentName();
- jp.nextToken();
+ private static final class JsonParserObservable implements Observable.OnSubscribe<EntityWrapper> {
+ private final JsonParser jp;
+ EntityManager em;
+ EntityManager rootEm;
+ Import importUG;
+ int index;
- @SuppressWarnings("unchecked") Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
- em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
- }
- } else {
- // Regular collections
- jp.nextToken(); // START_OBJECT
+ private int entityCount = 0;
+
+ JsonParserObservable(JsonParser parser, EntityManager em, EntityManager rootEm, Import importUG, int index ) {
+ this.jp = parser;
+ this.em = em;
+ this.rootEm = rootEm;
+ this.importUG = importUG;
+ this.index = index;
+ }
+
+ @Override
+ public void call(final Subscriber<? super EntityWrapper> subscriber) {
+ ArrayList fileNames = (ArrayList) importUG.getDynamicProperties().get("files");
+
+ EntityWrapper entityWrapper = null;
+ // while(entityWrapper != null && jp.nextToken() != JsonToken.END_OBJECT) {
+ Entity entity = null;
+ EntityRef ownerEntityRef = null;
+ String entityUuid = "";
+ String entityType = "";
+ try {
+ //JsonToken token = jp.nextToken();
+ while (!subscriber.isUnsubscribed() && jp.nextToken() != JsonToken.END_OBJECT) {
- Map<String, Object> properties = new HashMap<String, Object>();
+ String collectionName = jp.getCurrentName();
- JsonToken token = jp.nextToken();
+ try {
+ // create the connections
+ if (collectionName.equals("connections")) {
- while (token != JsonToken.END_OBJECT) {
- if (token == JsonToken.VALUE_STRING || token == JsonToken.VALUE_NUMBER_INT) {
- String key = jp.getCurrentName();
- if (key.equals("uuid")) {
- entityUuid = jp.getText();
+ jp.nextToken(); // START_OBJECT
+ while (jp.nextToken() != JsonToken.END_OBJECT) {
+ String connectionType = jp.getCurrentName();
- } else if (key.equals("type")) {
- entityType = jp.getText();
- } else if (key.length() != 0 && jp.getText().length() != 0) {
- String value = jp.getText();
- properties.put(key, value);
+ jp.nextToken(); // START_ARRAY
+ while (jp.nextToken() != JsonToken.END_ARRAY) {
+ String entryId = jp.getText();
+
+ EntityRef entryRef = em.getRef(UUID.fromString(entryId));
+ // Store in DB
+ em.createConnection(ownerEntityRef, connectionType, entryRef);
+ }
+ }
+ }
+ // add dictionaries
+ else if (collectionName.equals("dictionaries")) {
+
+ jp.nextToken(); // START_OBJECT
+ while (jp.nextToken() != JsonToken.END_OBJECT) {
+
+ String dictionaryName = jp.getCurrentName();
+
+ jp.nextToken();
+
+ @SuppressWarnings("unchecked") Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
+
+ em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
+ }
+ } else {
+ // Regular collections
+ jp.nextToken(); // START_OBJECT
+
+ Map<String, Object> properties = new HashMap<String, Object>();
+
+ JsonToken token = jp.nextToken();
+
+ while (token != JsonToken.END_OBJECT) {
+ if (token == JsonToken.VALUE_STRING || token == JsonToken.VALUE_NUMBER_INT) {
+ String key = jp.getCurrentName();
+ if (key.equals("uuid")) {
+ entityUuid = jp.getText();
+
+ } else if (key.equals("type")) {
+ entityType = jp.getText();
+ } else if (key.length() != 0 && jp.getText().length() != 0) {
+ String value = jp.getText();
+ properties.put(key, value);
+ }
+ }
+ token = jp.nextToken();
}
+ entityWrapper = new EntityWrapper(UUID.fromString(entityUuid), entityType, properties);
+ subscriber.onNext(entityWrapper);
+ ownerEntityRef = em.getRef(UUID.fromString(entityUuid));
+ //break;
}
- token = jp.nextToken();
+ } catch (IllegalArgumentException e) {
+ // skip illegal entity UUID and go to next one
+ ((Map<String, Object>) fileNames.get(index)).put("Entity Creation Error", e.getMessage());
+ rootEm.update(importUG);
+ } catch (Exception e) {
+ // skip illegal entity UUID and go to next one
+ ((Map<String, Object>) fileNames.get(index)).put("Miscellaneous Error", e.getMessage());
+ rootEm.update(importUG);
}
+ }
- entity = em.create(UUID.fromString(entityUuid), entityType, properties);
- ownerEntityRef = em.getRef(UUID.fromString(entityUuid));
+ // update the last updated entity
+ if (entity != null) {
+ entityCount++;
+ if (entityCount == 2000) {
+ ((Map<String, Object>) fileNames.get(index)).put("lastUpdatedUUID", entityUuid);
+ rootEm.update(importUG);
+ entityCount = 0;
+ }
}
+
+
+ } catch (Exception e) {
+
}
- catch (IllegalArgumentException e) {
- // skip illegal entity UUID and go to next one
- ((Map<String, Object>) fileNames.get(index)).put("Entity Creation Error", e.getMessage());
- rootEm.update(importUG);
- }
- catch (Exception e) {
- // skip illegal entity UUID and go to next one
- ((Map<String, Object>) fileNames.get(index)).put("Miscellaneous Error", e.getMessage());
- rootEm.update(importUG);
- }
- }
- // update the last updated entity
- if(entity != null) {
- entityCount++;
- if(entityCount == 2000) {
- ((Map<String, Object>) fileNames.get(index)).put("lastUpdatedUUID", entityUuid);
- rootEm.update(importUG);
- entityCount = 0;
- }
+
}
}
+
}
-/**
- * custom exceptions
- */
+
class OrganizationNotFoundException extends Exception {
OrganizationNotFoundException(String s) {
super(s);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2f76c868/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java
index e5330a1..04d1310 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java
@@ -109,9 +109,9 @@ public class ImportServiceIT {
emTest.createConnection( emTest.getRef(entityTest[1].getUuid()), "related", emTest.getRef( entityTest[0].getUuid()));
}
- @Ignore //For this test please input your s3 credentials into settings.xml or Attach a -D with relevant fields.
+// @Ignore //For this test please input your s3 credentials into settings.xml or Attach a -D with relevant fields.
// test case to check if a collection file is imported correctly
- //@Test
+ @Test
public void testIntegrationImportCollection() throws Exception {
// //creates 5 entities in user collection
@@ -124,9 +124,9 @@ public class ImportServiceIT {
//creates entities
for ( int i = 0; i < 5; i++ ) {
userProperties = new LinkedHashMap<String, Object>();
- userProperties.put( "username", "user" + i );
- userProperties.put( "email", "user" + i + "@test.com" );
- entity[i] = em.create( "users", userProperties );
+ userProperties.put( "username", "yabauser" + i );
+ userProperties.put( "email", "yavauser" + i + "@test.com" );
+ entity[i] = em.create( "yabas", userProperties );
}
//creates test connections between first 2 users
@@ -140,7 +140,7 @@ public class ImportServiceIT {
payload.put( "organizationId", organization.getUuid());
payload.put( "applicationId", applicationId );
- payload.put("collectionName", "users");
+ payload.put("collectionName", "yabas");
// schdeule the export job
UUID exportUUID = exportService.schedule( payload );
@@ -178,13 +178,12 @@ public class ImportServiceIT {
}
try {
-
//checks if temp import files are created i.e. downloaded from S3
assertThat(importService.getEphemeralFile().size(), is(not(0)));
//check if entities are actually updated i.e. created and modified should be different
//EntityManager em = setup.getEmf().getEntityManager(applicationId);
- Results collections = em.getCollection(applicationId, "users", null, Results.Level.ALL_PROPERTIES);
+ Results collections = em.getCollection(applicationId, "yabas", null, Results.Level.ALL_PROPERTIES);
List<Entity> entities = collections.getEntities();
// check if connections are created for only the 1st 2 entities in user collection