You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/08/22 16:16:11 UTC
[75/95] [abbrv] git commit: adding the schedule call to the unnamed
methods
adding the schedule call to the unnamed methods
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5acbff06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5acbff06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5acbff06
Branch: refs/heads/import-feature
Commit: 5acbff0614fc9a545f093f80df6de2b729eaffd6
Parents: 2f76c86
Author: Harish Rajagopal <hr...@apigee.com>
Authored: Mon Aug 4 13:28:35 2014 -0700
Committer: Harish Rajagopal <hr...@apigee.com>
Committed: Mon Aug 4 13:28:35 2014 -0700
----------------------------------------------------------------------
.../management/importUG/ImportServiceImpl.java | 23 +++++++++++++++++++-
1 file changed, 22 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5acbff06/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
index f8ff996..47963b2 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
@@ -39,6 +39,7 @@ import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;
+import rx.schedulers.Schedulers;
import java.io.File;
import java.io.IOException;
@@ -564,6 +565,9 @@ public class ImportServiceImpl implements ImportService {
final Observable<EntityWrapper> observable = Observable.create(subscribe);
+ /**
+ * This is the action we want to perform for every UUID we receive
+ */
final Action1<EntityWrapper> doWork = new Action1<EntityWrapper>() {
@Override
public void call(EntityWrapper jsonEntity){
@@ -571,18 +575,30 @@ public class ImportServiceImpl implements ImportService {
em.create(jsonEntity.entityUuid, jsonEntity.entityType, jsonEntity.properties);
em.getRef(jsonEntity.entityUuid);
+
+ System.out.println(
+ "Emitting UUID " + jsonEntity.entityUuid + " on thread " + Thread.currentThread()
+ .getName() );
+
}catch (Exception e) {
+ System.out.println("something went wrong while creating this - " + e);
+
}
}
};
+
+ /**
+ * This is boilerplate glue code. We have to follow this for the parallel operation. In the "call"
+ * method we want to simply return the input observable + the chain of operations we want to invoke
+ */
observable.parallel(new Func1<Observable<EntityWrapper>, Observable<EntityWrapper>>() {
@Override
public Observable< EntityWrapper> call(Observable<EntityWrapper> entityWrapperObservable) {
return entityWrapperObservable.doOnNext(doWork);
}
- });
+ }, Schedulers.io() ).toBlocking().last();
}
@@ -681,15 +697,18 @@ public class ImportServiceImpl implements ImportService {
subscriber.onNext(entityWrapper);
ownerEntityRef = em.getRef(UUID.fromString(entityUuid));
//break;
+ subscriber.onCompleted();
}
} catch (IllegalArgumentException e) {
// skip illegal entity UUID and go to next one
((Map<String, Object>) fileNames.get(index)).put("Entity Creation Error", e.getMessage());
rootEm.update(importUG);
+ subscriber.onError( e );
} catch (Exception e) {
// skip illegal entity UUID and go to next one
((Map<String, Object>) fileNames.get(index)).put("Miscellaneous Error", e.getMessage());
rootEm.update(importUG);
+ subscriber.onError( e );
}
}
@@ -705,6 +724,8 @@ public class ImportServiceImpl implements ImportService {
} catch (Exception e) {
+ System.out.println("something went wrong in observable json parser - " + e);
+
}