You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/08/22 16:16:11 UTC

[75/95] [abbrv] git commit: adding the schedule call to the unnamed methods

adding the schedule call to the unnamed methods


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5acbff06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5acbff06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5acbff06

Branch: refs/heads/import-feature
Commit: 5acbff0614fc9a545f093f80df6de2b729eaffd6
Parents: 2f76c86
Author: Harish Rajagopal <hr...@apigee.com>
Authored: Mon Aug 4 13:28:35 2014 -0700
Committer: Harish Rajagopal <hr...@apigee.com>
Committed: Mon Aug 4 13:28:35 2014 -0700

----------------------------------------------------------------------
 .../management/importUG/ImportServiceImpl.java  | 23 +++++++++++++++++++-
 1 file changed, 22 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5acbff06/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
index f8ff996..47963b2 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
@@ -39,6 +39,7 @@ import rx.Observable;
 import rx.Subscriber;
 import rx.functions.Action1;
 import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 import java.io.File;
 import java.io.IOException;
@@ -564,6 +565,9 @@ public class ImportServiceImpl implements ImportService {
 
         final Observable<EntityWrapper> observable = Observable.create(subscribe);
 
+        /**
+         * This is the action we want to perform for every UUID we receive
+         */
         final Action1<EntityWrapper> doWork = new Action1<EntityWrapper>() {
             @Override
             public void call(EntityWrapper jsonEntity){
@@ -571,18 +575,30 @@ public class ImportServiceImpl implements ImportService {
                             em.create(jsonEntity.entityUuid, jsonEntity.entityType, jsonEntity.properties);
                             em.getRef(jsonEntity.entityUuid);
 
+
+                    System.out.println(
+                            "Emitting UUID " + jsonEntity.entityUuid + " on thread " + Thread.currentThread()
+                                    .getName() );
+
                 }catch (Exception e) {
+                    System.out.println("something went wrong while creating this - " + e);
+
                 }
             }
 
         };
 
+
+        /**
+         * This is boilerplate glue code.  We have to follow this for the parallel operation.  In the "call"
+         * method we want to simply return the input observable + the chain of operations we want to invoke
+         */
         observable.parallel(new Func1<Observable<EntityWrapper>, Observable<EntityWrapper>>() {
             @Override
             public Observable< EntityWrapper> call(Observable<EntityWrapper> entityWrapperObservable) {
                 return entityWrapperObservable.doOnNext(doWork);
             }
-        });
+        }, Schedulers.io() ).toBlocking().last();
 
 
     }
@@ -681,15 +697,18 @@ public class ImportServiceImpl implements ImportService {
                             subscriber.onNext(entityWrapper);
                             ownerEntityRef = em.getRef(UUID.fromString(entityUuid));
                             //break;
+                            subscriber.onCompleted();
                         }
                     } catch (IllegalArgumentException e) {
                         // skip illegal entity UUID and go to next one
                         ((Map<String, Object>) fileNames.get(index)).put("Entity Creation Error", e.getMessage());
                         rootEm.update(importUG);
+                        subscriber.onError( e );
                     } catch (Exception e) {
                         // skip illegal entity UUID and go to next one
                         ((Map<String, Object>) fileNames.get(index)).put("Miscellaneous Error", e.getMessage());
                         rootEm.update(importUG);
+                        subscriber.onError( e );
                     }
                 }
 
@@ -705,6 +724,8 @@ public class ImportServiceImpl implements ImportService {
 
 
             } catch (Exception e) {
+                System.out.println("something went wrong in observable json parser - " + e);
+
 
             }