You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/08/22 16:16:16 UTC
[80/95] [abbrv] git commit: includes new interface for entity and
implementation for the 3 different create calls
includes new interface for entity and implementation for the 3 different create calls
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0ad5428d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0ad5428d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0ad5428d
Branch: refs/heads/import-feature
Commit: 0ad5428da3a0f98f810aa9b920cfd1e69ff94b05
Parents: 5acbff0
Author: Harish Rajagopal <hr...@apigee.com>
Authored: Tue Aug 5 08:33:20 2014 -0700
Committer: Harish Rajagopal <hr...@apigee.com>
Committed: Tue Aug 5 08:33:20 2014 -0700
----------------------------------------------------------------------
.../management/importUG/ImportServiceImpl.java | 141 +++++++++++++------
1 file changed, 99 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0ad5428d/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
index 47963b2..f7a9ced 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
@@ -544,16 +544,6 @@ public class ImportServiceImpl implements ImportService {
}
- private static class EntityWrapper{
- UUID entityUuid;
- String entityType;
- Map<String, Object> properties;
- EntityWrapper(UUID entityUuid, String entityType,Map<String, Object> properties){
- this.entityUuid = entityUuid;
- this.entityType = entityType;
- this.properties = properties;
- }
- }
/**
* Imports the entity's connecting references (collections, connections and dictionaries)
*
@@ -563,28 +553,30 @@ public class ImportServiceImpl implements ImportService {
final JsonParserObservable subscribe = new JsonParserObservable(jp,em,rootEm,importUG, index);
- final Observable<EntityWrapper> observable = Observable.create(subscribe);
+ final Observable<WriteEvent> observable = Observable.create(subscribe);
/**
* This is the action we want to perform for every UUID we receive
*/
- final Action1<EntityWrapper> doWork = new Action1<EntityWrapper>() {
+ final Action1<WriteEvent> doWork = new Action1<WriteEvent>() {
@Override
- public void call(EntityWrapper jsonEntity){
- try {
- em.create(jsonEntity.entityUuid, jsonEntity.entityType, jsonEntity.properties);
- em.getRef(jsonEntity.entityUuid);
-
-
- System.out.println(
- "Emitting UUID " + jsonEntity.entityUuid + " on thread " + Thread.currentThread()
- .getName() );
-
- }catch (Exception e) {
- System.out.println("something went wrong while creating this - " + e);
-
- }
+ public void call(WriteEvent writeEvent) {
+ writeEvent.doWrite(em);
}
+// @Override
+// public void call(EntityWrapper jsonEntity){
+// try {
+// em.create(jsonEntity.entityUuid, jsonEntity.entityType, jsonEntity.properties);
+// em.getRef(jsonEntity.entityUuid);
+//
+//
+// System.out.println("Emitting UUID " + jsonEntity.entityUuid + " on thread " + Thread.currentThread().getName() );
+//
+// }catch (Exception e) {
+// System.out.println("something went wrong while creating this - " + e);
+//
+// }
+// }
};
@@ -593,9 +585,9 @@ public class ImportServiceImpl implements ImportService {
* This is boilerplate glue code. We have to follow this for the parallel operation. In the "call"
* method we want to simply return the input observable + the chain of operations we want to invoke
*/
- observable.parallel(new Func1<Observable<EntityWrapper>, Observable<EntityWrapper>>() {
+ observable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
@Override
- public Observable< EntityWrapper> call(Observable<EntityWrapper> entityWrapperObservable) {
+ public Observable< WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
return entityWrapperObservable.doOnNext(doWork);
}
}, Schedulers.io() ).toBlocking().last();
@@ -603,15 +595,83 @@ public class ImportServiceImpl implements ImportService {
}
+ private interface WriteEvent{
+ public void doWrite(EntityManager em);
+ }
+
+ private final class EntityEvent implements WriteEvent{
+ UUID entityUuid;
+ String entityType;
+ Map<String, Object> properties;
+ EntityEvent(UUID entityUuid, String entityType,Map<String, Object> properties){
+ this.entityUuid = entityUuid;
+ this.entityType = entityType;
+ this.properties = properties;
+ }
+ @Override
+ public void doWrite(EntityManager em) {
+ try {
+ em.create(entityUuid, entityType, properties);
+ em.getRef(entityUuid);
+ System.out.println("Emitting UUID " + entityUuid + " on thread " + Thread.currentThread().getName() );
+ }catch (Exception e) {
+ System.out.println("something went wrong while creating this - " + e);
+ }
+ }
+ }
+ private final class ConnectionEvent implements WriteEvent{
+ EntityRef ownerEntityRef;
+ String connectionType;
+ EntityRef entryRef;
+
+ ConnectionEvent(EntityRef ownerEntityRef, String connectionType, EntityRef entryRef){
+ this.ownerEntityRef = ownerEntityRef;
+ this.connectionType = connectionType;
+ this.entryRef = entryRef;
+ }
- private static final class JsonParserObservable implements Observable.OnSubscribe<EntityWrapper> {
+ @Override
+ public void doWrite(EntityManager em) {
+ try {
+ em.createConnection(ownerEntityRef, connectionType, entryRef);
+ System.out.println("creating connection " + connectionType + " on thread " + Thread.currentThread().getName() );
+ }catch (Exception e) {
+ System.out.println("something went wrong while creating this - " + e);
+ }
+ }
+ }
+ private final class DictionaryEvent implements WriteEvent{
+
+ EntityRef ownerEntityRef;
+ String dictionaryName;
+ Map<String, Object> dictionary;
+
+ DictionaryEvent(EntityRef ownerEntityRef, String dictionaryName, Map<String, Object> dictionary){
+ this.ownerEntityRef = ownerEntityRef;
+ this.dictionaryName = dictionaryName;
+ this.dictionary = dictionary;
+ }
+
+ @Override
+ public void doWrite(EntityManager em) {
+ try {
+ em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
+ System.out.println("creating dictionary " + dictionaryName + " on thread " + Thread.currentThread().getName() );
+ }catch (Exception e) {
+ System.out.println("something went wrong while creating this - " + e);
+ }
+ }
+ }
+
+
+
+ private final class JsonParserObservable implements Observable.OnSubscribe<WriteEvent> {
private final JsonParser jp;
EntityManager em;
EntityManager rootEm;
Import importUG;
int index;
-
private int entityCount = 0;
JsonParserObservable(JsonParser parser, EntityManager em, EntityManager rootEm, Import importUG, int index ) {
@@ -623,21 +683,17 @@ public class ImportServiceImpl implements ImportService {
}
@Override
- public void call(final Subscriber<? super EntityWrapper> subscriber) {
+ public void call(final Subscriber<? super WriteEvent> subscriber) {
ArrayList fileNames = (ArrayList) importUG.getDynamicProperties().get("files");
- EntityWrapper entityWrapper = null;
- // while(entityWrapper != null && jp.nextToken() != JsonToken.END_OBJECT) {
+ WriteEvent entityWrapper = null;
Entity entity = null;
EntityRef ownerEntityRef = null;
String entityUuid = "";
String entityType = "";
try {
- //JsonToken token = jp.nextToken();
while (!subscriber.isUnsubscribed() && jp.nextToken() != JsonToken.END_OBJECT) {
-
String collectionName = jp.getCurrentName();
-
try {
// create the connections
if (collectionName.equals("connections")) {
@@ -651,8 +707,9 @@ public class ImportServiceImpl implements ImportService {
String entryId = jp.getText();
EntityRef entryRef = em.getRef(UUID.fromString(entryId));
- // Store in DB
- em.createConnection(ownerEntityRef, connectionType, entryRef);
+ entityWrapper = new ConnectionEvent(ownerEntityRef, connectionType, entryRef);
+ subscriber.onNext(entityWrapper);
+ subscriber.onCompleted();
}
}
}
@@ -667,8 +724,9 @@ public class ImportServiceImpl implements ImportService {
jp.nextToken();
@SuppressWarnings("unchecked") Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
-
- em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
+ entityWrapper = new DictionaryEvent(ownerEntityRef, dictionaryName, dictionary);
+ subscriber.onNext(entityWrapper);
+ subscriber.onCompleted();
}
} else {
// Regular collections
@@ -693,10 +751,9 @@ public class ImportServiceImpl implements ImportService {
}
token = jp.nextToken();
}
- entityWrapper = new EntityWrapper(UUID.fromString(entityUuid), entityType, properties);
+ entityWrapper = new EntityEvent(UUID.fromString(entityUuid), entityType, properties);
subscriber.onNext(entityWrapper);
ownerEntityRef = em.getRef(UUID.fromString(entityUuid));
- //break;
subscriber.onCompleted();
}
} catch (IllegalArgumentException e) {