You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/08/22 16:16:16 UTC

[80/95] [abbrv] git commit: includes new interface for entity and implementation for the 3 different create calls

includes new interface for entity and implementation for the 3 different create calls


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0ad5428d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0ad5428d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0ad5428d

Branch: refs/heads/import-feature
Commit: 0ad5428da3a0f98f810aa9b920cfd1e69ff94b05
Parents: 5acbff0
Author: Harish Rajagopal <hr...@apigee.com>
Authored: Tue Aug 5 08:33:20 2014 -0700
Committer: Harish Rajagopal <hr...@apigee.com>
Committed: Tue Aug 5 08:33:20 2014 -0700

----------------------------------------------------------------------
 .../management/importUG/ImportServiceImpl.java  | 141 +++++++++++++------
 1 file changed, 99 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0ad5428d/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
index 47963b2..f7a9ced 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
@@ -544,16 +544,6 @@ public class ImportServiceImpl implements ImportService {
     }
 
 
- private static class EntityWrapper{
-     UUID entityUuid;
-     String entityType;
-     Map<String, Object> properties;
-     EntityWrapper(UUID entityUuid, String entityType,Map<String, Object> properties){
-         this.entityUuid = entityUuid;
-         this.entityType = entityType;
-         this.properties = properties;
-     }
- }
     /**
      * Imports the entity's connecting references (collections, connections and dictionaries)
      *
@@ -563,28 +553,30 @@ public class ImportServiceImpl implements ImportService {
 
         final JsonParserObservable subscribe = new JsonParserObservable(jp,em,rootEm,importUG, index);
 
-        final Observable<EntityWrapper> observable = Observable.create(subscribe);
+        final Observable<WriteEvent> observable = Observable.create(subscribe);
 
         /**
          * This is the action we want to perform for every UUID we receive
          */
-        final Action1<EntityWrapper> doWork = new Action1<EntityWrapper>() {
+        final Action1<WriteEvent> doWork = new Action1<WriteEvent>() {
             @Override
-            public void call(EntityWrapper jsonEntity){
-                try {
-                            em.create(jsonEntity.entityUuid, jsonEntity.entityType, jsonEntity.properties);
-                            em.getRef(jsonEntity.entityUuid);
-
-
-                    System.out.println(
-                            "Emitting UUID " + jsonEntity.entityUuid + " on thread " + Thread.currentThread()
-                                    .getName() );
-
-                }catch (Exception e) {
-                    System.out.println("something went wrong while creating this - " + e);
-
-                }
+            public void call(WriteEvent writeEvent) {
+                writeEvent.doWrite(em);
             }
+//            @Override
+//            public void call(EntityWrapper jsonEntity){
+//                try {
+//                            em.create(jsonEntity.entityUuid, jsonEntity.entityType, jsonEntity.properties);
+//                            em.getRef(jsonEntity.entityUuid);
+//
+//
+//                    System.out.println("Emitting UUID " + jsonEntity.entityUuid + " on thread " + Thread.currentThread().getName() );
+//
+//                }catch (Exception e) {
+//                    System.out.println("something went wrong while creating this - " + e);
+//
+//                }
+//            }
 
         };
 
@@ -593,9 +585,9 @@ public class ImportServiceImpl implements ImportService {
          * This is boilerplate glue code.  We have to follow this for the parallel operation.  In the "call"
          * method we want to simply return the input observable + the chain of operations we want to invoke
          */
-        observable.parallel(new Func1<Observable<EntityWrapper>, Observable<EntityWrapper>>() {
+        observable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
             @Override
-            public Observable< EntityWrapper> call(Observable<EntityWrapper> entityWrapperObservable) {
+            public Observable< WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
                 return entityWrapperObservable.doOnNext(doWork);
             }
         }, Schedulers.io() ).toBlocking().last();
@@ -603,15 +595,83 @@ public class ImportServiceImpl implements ImportService {
 
     }
 
+    private interface WriteEvent{
+        public void doWrite(EntityManager em);
+    }
+
+    private final class EntityEvent implements WriteEvent{
+        UUID entityUuid;
+        String entityType;
+        Map<String, Object> properties;
+        EntityEvent(UUID entityUuid, String entityType,Map<String, Object> properties){
+            this.entityUuid = entityUuid;
+            this.entityType = entityType;
+            this.properties = properties;
+        }
+        @Override
+        public void doWrite(EntityManager em) {
+            try {
+                em.create(entityUuid, entityType, properties);
+                em.getRef(entityUuid);
+                System.out.println("Emitting UUID " + entityUuid + " on thread " + Thread.currentThread().getName() );
+                }catch (Exception e) {
+                    System.out.println("something went wrong while creating this - " + e);
+                }
+        }
+    }
+    private final class ConnectionEvent implements WriteEvent{
+        EntityRef ownerEntityRef;
+        String connectionType;
+        EntityRef entryRef;
+
+        ConnectionEvent(EntityRef ownerEntityRef, String connectionType, EntityRef entryRef){
+            this.ownerEntityRef = ownerEntityRef;
+            this.connectionType = connectionType;
+            this.entryRef = entryRef;
+        }
 
-    private static final class JsonParserObservable implements Observable.OnSubscribe<EntityWrapper> {
+        @Override
+        public void doWrite(EntityManager em) {
+            try {
+                em.createConnection(ownerEntityRef, connectionType, entryRef);
+                System.out.println("creating connection " + connectionType + " on thread " + Thread.currentThread().getName() );
+            }catch (Exception e) {
+                System.out.println("something went wrong while creating this - " + e);
+            }
+        }
+    }
+    private final class DictionaryEvent implements WriteEvent{
+
+        EntityRef ownerEntityRef;
+        String dictionaryName;
+        Map<String, Object> dictionary;
+
+        DictionaryEvent(EntityRef ownerEntityRef, String dictionaryName, Map<String, Object> dictionary){
+            this.ownerEntityRef = ownerEntityRef;
+            this.dictionaryName = dictionaryName;
+            this.dictionary = dictionary;
+        }
+
+        @Override
+        public void doWrite(EntityManager em) {
+            try {
+                em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
+                System.out.println("creating dictionary  " + dictionaryName + " on thread " + Thread.currentThread().getName() );
+            }catch (Exception e) {
+                System.out.println("something went wrong while creating this - " + e);
+            }
+        }
+    }
+
+
+
+    private final class JsonParserObservable implements Observable.OnSubscribe<WriteEvent> {
         private final JsonParser jp;
         EntityManager em;
         EntityManager rootEm;
         Import importUG;
         int index;
 
-
         private int entityCount = 0;
 
         JsonParserObservable(JsonParser parser, EntityManager em, EntityManager rootEm, Import importUG, int index ) {
@@ -623,21 +683,17 @@ public class ImportServiceImpl implements ImportService {
         }
 
         @Override
-        public void call(final Subscriber<? super EntityWrapper> subscriber) {
+        public void call(final Subscriber<? super WriteEvent> subscriber) {
             ArrayList fileNames = (ArrayList) importUG.getDynamicProperties().get("files");
 
-            EntityWrapper entityWrapper = null;
-            // while(entityWrapper != null && jp.nextToken() != JsonToken.END_OBJECT) {
+            WriteEvent entityWrapper = null;
             Entity entity = null;
             EntityRef ownerEntityRef = null;
             String entityUuid = "";
             String entityType = "";
             try {
-                //JsonToken token = jp.nextToken();
                 while (!subscriber.isUnsubscribed() && jp.nextToken() != JsonToken.END_OBJECT) {
-
                     String collectionName = jp.getCurrentName();
-
                     try {
                         // create the connections
                         if (collectionName.equals("connections")) {
@@ -651,8 +707,9 @@ public class ImportServiceImpl implements ImportService {
                                     String entryId = jp.getText();
 
                                     EntityRef entryRef = em.getRef(UUID.fromString(entryId));
-                                    // Store in DB
-                                    em.createConnection(ownerEntityRef, connectionType, entryRef);
+                                    entityWrapper = new ConnectionEvent(ownerEntityRef, connectionType, entryRef);
+                                    subscriber.onNext(entityWrapper);
+                                    subscriber.onCompleted();
                                 }
                             }
                         }
@@ -667,8 +724,9 @@ public class ImportServiceImpl implements ImportService {
                                 jp.nextToken();
 
                                 @SuppressWarnings("unchecked") Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
-
-                                em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
+                                entityWrapper = new DictionaryEvent(ownerEntityRef, dictionaryName, dictionary);
+                                subscriber.onNext(entityWrapper);
+                                subscriber.onCompleted();
                             }
                         } else {
                             // Regular collections
@@ -693,10 +751,9 @@ public class ImportServiceImpl implements ImportService {
                                 }
                                 token = jp.nextToken();
                             }
-                            entityWrapper = new EntityWrapper(UUID.fromString(entityUuid), entityType, properties);
+                            entityWrapper = new EntityEvent(UUID.fromString(entityUuid), entityType, properties);
                             subscriber.onNext(entityWrapper);
                             ownerEntityRef = em.getRef(UUID.fromString(entityUuid));
-                            //break;
                             subscriber.onCompleted();
                         }
                     } catch (IllegalArgumentException e) {