You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/02/05 15:45:42 UTC
[13/16] incubator-usergrid git commit: Merge branch
'two-dot-o-import' of
https://git-wip-us.apache.org/repos/asf/incubator-usergrid into
two-dot-o-import
Merge branch 'two-dot-o-import' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-import
Conflicts:
stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1d0e1a19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1d0e1a19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1d0e1a19
Branch: refs/heads/two-dot-o-import
Commit: 1d0e1a19fc4791d531fb7124eae80cc2757e7f08
Parents: b47904e aafa3d4
Author: Dave Johnson <dm...@apigee.com>
Authored: Wed Feb 4 14:56:49 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Wed Feb 4 14:56:49 2015 -0500
----------------------------------------------------------------------
.../persistence/entities/FailedImport.java | 64 +++++
.../entities/FailedImportConnection.java | 41 ++++
.../entities/FailedImportEntity.java | 44 ++++
.../persistence/entities/FileImport.java | 32 +++
.../exceptions/PersistenceException.java | 2 +-
stack/pom.xml | 11 +-
.../importer/FileImportStatistics.java | 240 ++++++++++++++++---
.../management/importer/ImportServiceImpl.java | 161 +++++--------
.../importer/FileImportStatisticsTest.java | 237 +++++++++++++++++-
9 files changed, 691 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1d0e1a19/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 2054804,238c0ab..936e09b
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@@ -657,37 -651,8 +657,8 @@@ public class ImportServiceImpl implemen
EntityManager targetEm = emf.getEntityManager(targetAppId);
logger.debug(" importing into app {} file {}", targetAppId.toString(), file.getAbsolutePath());
- importEntitiesFromFile(file, targetEm, emManagementApp, fileImport );
+ importEntitiesFromFile( collectionName, file, targetEm, emManagementApp, fileImport );
- // TODO: fix the resume on error feature
-
- // // in case of resume, retrieve the last updated UUID for this file
- // String lastUpdatedUUID = fileImport.getLastUpdatedUUID();
- //
- // // this handles partially completed files by updating entities from the point of failure
- // if (!lastUpdatedUUID.equals(" ")) {
- //
- // // go till the last updated entity
- // while (!jp.getText().equals(lastUpdatedUUID)) {
- // jp.nextToken();
- // }
- //
- // // skip the last one and start from the next one
- // while (!(jp.getCurrentToken() == JsonToken.END_OBJECT
- // && jp.nextToken() == JsonToken.START_OBJECT)) {
- // jp.nextToken();
- // }
- // }
- //
- // // get to start of an object i.e next entity.
- // while (jp.getCurrentToken() != JsonToken.START_OBJECT) {
- // jp.nextToken();
- // }
- //
- // while (jp.nextToken() != JsonToken.END_ARRAY) {
- // importEntitiesFromFile(jp, targetEm, emManagementApp, fileImport, jobExecution);
- // }
- // jp.close();
// Updates the state of file import job
if (!fileImport.getState().toString().equals("FAILED")) {
@@@ -792,85 -756,86 +763,86 @@@
// observable that parses JSON and emits write events
JsonParser jp = getJsonParserForFile(file);
+
+ //TODO, move the json parser into the observable creation so that open/close happens automatcially within the stream
+
final JsonEntityParserObservable jsonObservableEntities =
- new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
+ new JsonEntityParserObservable(jp, em, rootEm, collectionName, fileImport, entitiesOnly);
final Observable<WriteEvent> entityEventObservable = Observable.create(jsonObservableEntities);
+ //flush every 100 entities
+ final FileImportStatistics statistics = new FileImportStatistics( em, fileImport.getUuid(), 100 );
+ //truncate due to RX api
+ final int entityNumSkip = (int)statistics.getTotalEntityCount();
+ final int connectionNumSkip = (int)statistics.getTotalConnectionCount();
+
// function to execute for each write event
- //TODO: job execution no longer needed due to having queueMessage.
+
+ /**
+ * Function that invokes the work of the event.
+ */
final Action1<WriteEvent> doWork = new Action1<WriteEvent>() {
@Override
- public void call(WriteEvent writeEvent) {
- writeEvent.doWrite(em,fileImport);
+ public void call( WriteEvent writeEvent ) {
+ writeEvent.doWrite( em, fileImport, statistics );
}
};
- // final AtomicLong entityCounter = new AtomicLong();
- // final AtomicLong eventCounter = new AtomicLong();
+
+
+
+
// start parsing JSON
- entityEventObservable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+ //only take while our stats tell us we should continue processing
+ //potentially skip the first n if this is a resume operation
+ entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
@Override
- public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
- // TODO: need to fixed so that number of entities created can be counted correctly and
- // TODO: also update last updated UUID for fileImport which is a must for resume-ability
-
- // return entityWrapperObservable.doOnNext(doWork).doOnNext(new Action1<WriteEvent>() {
- //
- // @Override
- // public void call(WriteEvent writeEvent) {
- // if (!(writeEvent instanceof EntityEvent)) {
- // final long val = eventCounter.incrementAndGet();
- // if(val % 50 == 0) {
- // jobExecution.heartbeat();
- // }
- // return;
- // }
- //
- // final long value = entityCounter.incrementAndGet();
- // if (value % 2000 == 0) {
- // try {
- // logger.error("UUID = {} value = {}",
- // ((EntityEvent) writeEvent).getEntityUuid().toString(),
- // value );
- // fileImport.setLastUpdatedUUID(
- // ((EntityEvent) writeEvent).getEntityUuid().toString());
- // //checkpoint the UUID here.
- // rootEm.update(fileImport);
- // } catch(Exception ex) {}
- // }
- // if(value % 100 == 0) {
- // logger.error("heartbeat sent by " + fileImport.getFileName());
- // jobExecution.heartbeat();
- // }
- // }
- // }
- // );
- return entityWrapperObservable.doOnNext(doWork);
+ public Boolean call( final WriteEvent writeEvent ) {
+ return !statistics.shouldStopProcessingEntities();
}
- }, Schedulers.io()).toBlocking().last();
+ } ).skip( entityNumSkip ).parallel( new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+ @Override
+ public Observable<WriteEvent> call( Observable<WriteEvent> entityWrapperObservable ) {
+
+
+ return entityWrapperObservable.doOnNext( doWork );
+ }
+ }, Schedulers.io() ).toBlocking().last();
+
+
jp.close();
- logger.debug("\n\nWrote entities\n");
+ logger.debug("\n\nimportEntitiesFromFile(): Wrote entities\n");
// now do other stuff: connections and dictionaries
entitiesOnly = false;
// observable that parses JSON and emits write events
jp = getJsonParserForFile(file);
+
-
final JsonEntityParserObservable jsonObservableOther =
- new JsonEntityParserObservable( jp, em, rootEm, fileImport, entitiesOnly );
- final Observable<WriteEvent> otherEventObservable = Observable.create( jsonObservableOther );
+ new JsonEntityParserObservable(jp, em, rootEm, collectionName, fileImport, entitiesOnly);
+ final Observable<WriteEvent> otherEventObservable = Observable.create(jsonObservableOther);
- otherEventObservable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
- //only take while our stats tell us we should continue processing
- //potentially skip the first n if this is a resume operation
++ // only take while our stats tell us we should continue processing
++ // potentially skip the first n if this is a resume operation
+ otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
@Override
- public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
- return entityWrapperObservable.doOnNext(doWork);
+ public Boolean call( final WriteEvent writeEvent ) {
+ return !statistics.shouldStopProcessingConnections();
}
- }, Schedulers.io()).toBlocking().last();
+ } ).skip( connectionNumSkip ).parallel( new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+ @Override
+ public Observable<WriteEvent> call( Observable<WriteEvent> entityWrapperObservable ) {
+ return entityWrapperObservable.doOnNext( doWork );
+ }
+ }, Schedulers.io() ).toBlocking().last();
jp.close();
+ logger.debug("\n\nimportEntitiesFromFile(): Wrote others\n");
++
+ //flush the job statistics
+ statistics.complete();
-
- logger.debug("\n\nWrote others\n");
}