You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/02/05 15:45:42 UTC

[13/16] incubator-usergrid git commit: Merge branch 'two-dot-o-import' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-import

Merge branch 'two-dot-o-import' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-import

Conflicts:
	stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1d0e1a19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1d0e1a19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1d0e1a19

Branch: refs/heads/two-dot-o-import
Commit: 1d0e1a19fc4791d531fb7124eae80cc2757e7f08
Parents: b47904e aafa3d4
Author: Dave Johnson <dm...@apigee.com>
Authored: Wed Feb 4 14:56:49 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Wed Feb 4 14:56:49 2015 -0500

----------------------------------------------------------------------
 .../persistence/entities/FailedImport.java      |  64 +++++
 .../entities/FailedImportConnection.java        |  41 ++++
 .../entities/FailedImportEntity.java            |  44 ++++
 .../persistence/entities/FileImport.java        |  32 +++
 .../exceptions/PersistenceException.java        |   2 +-
 stack/pom.xml                                   |  11 +-
 .../importer/FileImportStatistics.java          | 240 ++++++++++++++++---
 .../management/importer/ImportServiceImpl.java  | 161 +++++--------
 .../importer/FileImportStatisticsTest.java      | 237 +++++++++++++++++-
 9 files changed, 691 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1d0e1a19/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 2054804,238c0ab..936e09b
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@@ -657,37 -651,8 +657,8 @@@ public class ImportServiceImpl implemen
                  EntityManager targetEm = emf.getEntityManager(targetAppId);
                  logger.debug("   importing into app {} file {}", targetAppId.toString(), file.getAbsolutePath());
  
 -                importEntitiesFromFile(file, targetEm, emManagementApp, fileImport );
 +                importEntitiesFromFile( collectionName, file, targetEm, emManagementApp, fileImport );
  
-                 // TODO: fix the resume on error feature
- 
- //                // in case of resume, retrieve the last updated UUID for this file
- //                String lastUpdatedUUID = fileImport.getLastUpdatedUUID();
- //
- //                // this handles partially completed files by updating entities from the point of failure
- //                if (!lastUpdatedUUID.equals(" ")) {
- //
- //                    // go till the last updated entity
- //                    while (!jp.getText().equals(lastUpdatedUUID)) {
- //                        jp.nextToken();
- //                    }
- //
- //                    // skip the last one and start from the next one
- //                    while (!(jp.getCurrentToken() == JsonToken.END_OBJECT
- //                        && jp.nextToken() == JsonToken.START_OBJECT)) {
- //                        jp.nextToken();
- //                    }
- //                }
- //
- //                // get to start of an object i.e next entity.
- //                while (jp.getCurrentToken() != JsonToken.START_OBJECT) {
- //                    jp.nextToken();
- //                }
- //
- //                while (jp.nextToken() != JsonToken.END_ARRAY) {
- //                    importEntitiesFromFile(jp, targetEm, emManagementApp, fileImport, jobExecution);
- //                }
- //                jp.close();
  
                  // Updates the state of file import job
                  if (!fileImport.getState().toString().equals("FAILED")) {
@@@ -792,85 -756,86 +763,86 @@@
  
          // observable that parses JSON and emits write events
          JsonParser jp = getJsonParserForFile(file);
+ 
+         //TODO, move the json parser into the observable creation so that open/close happens automatcially within the stream
+ 
          final JsonEntityParserObservable jsonObservableEntities =
 -            new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
 +            new JsonEntityParserObservable(jp, em, rootEm, collectionName, fileImport, entitiesOnly);
          final Observable<WriteEvent> entityEventObservable = Observable.create(jsonObservableEntities);
  
+         //flush every 100 entities
+         final FileImportStatistics statistics = new FileImportStatistics( em, fileImport.getUuid(), 100 );
+         //truncate due to RX api
+         final int entityNumSkip = (int)statistics.getTotalEntityCount();
+         final int connectionNumSkip = (int)statistics.getTotalConnectionCount();
+ 
          // function to execute for each write event
-         //TODO: job execution no longer needed due to having queueMessage.
+ 
+         /**
+          * Function that invokes the work of the event.
+          */
          final Action1<WriteEvent> doWork = new Action1<WriteEvent>() {
              @Override
-             public void call(WriteEvent writeEvent) {
-                 writeEvent.doWrite(em,fileImport);
+             public void call( WriteEvent writeEvent ) {
+                 writeEvent.doWrite( em, fileImport, statistics );
              }
          };
- //        final AtomicLong entityCounter = new AtomicLong();
- //        final AtomicLong eventCounter = new AtomicLong();
+ 
+ 
+ 
 +
          // start parsing JSON
-         entityEventObservable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+         //only take while our stats tell us we should continue processing
+         //potentially skip the first n if this is a resume operation
+         entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
              @Override
-             public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
-                 // TODO: need to fixed so that number of entities created can be counted correctly and
-                 // TODO: also update last updated UUID for fileImport which is a must for resume-ability
- 
- //                return entityWrapperObservable.doOnNext(doWork).doOnNext(new Action1<WriteEvent>() {
- //
- //                         @Override
- //                         public void call(WriteEvent writeEvent) {
- //                             if (!(writeEvent instanceof EntityEvent)) {
- //                                 final long val = eventCounter.incrementAndGet();
- //                                 if(val % 50 == 0) {
- //                                     jobExecution.heartbeat();
- //                                 }
- //                                 return;
- //                             }
- //
- //                             final long value = entityCounter.incrementAndGet();
- //                             if (value % 2000 == 0) {
- //                                 try {
- //                                     logger.error("UUID = {} value = {}",
- //                                         ((EntityEvent) writeEvent).getEntityUuid().toString(),
- //                                          value );
- //                                     fileImport.setLastUpdatedUUID(
- //                                          ((EntityEvent) writeEvent).getEntityUuid().toString());
- //                                     //checkpoint the UUID here.
- //                                     rootEm.update(fileImport);
- //                                 } catch(Exception ex) {}
- //                             }
- //                             if(value % 100 == 0) {
- //                                 logger.error("heartbeat sent by " + fileImport.getFileName());
- //                                 jobExecution.heartbeat();
- //                             }
- //                         }
- //                     }
- //                );
-                 return entityWrapperObservable.doOnNext(doWork);
+             public Boolean call( final WriteEvent writeEvent ) {
+                 return !statistics.shouldStopProcessingEntities();
              }
-         }, Schedulers.io()).toBlocking().last();
+         } ).skip( entityNumSkip ).parallel( new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+             @Override
+             public Observable<WriteEvent> call( Observable<WriteEvent> entityWrapperObservable ) {
+ 
+ 
+                 return entityWrapperObservable.doOnNext( doWork );
+             }
+         }, Schedulers.io() ).toBlocking().last();
+ 
+ 
          jp.close();
  
 -        logger.debug("\n\nWrote entities\n");
 +        logger.debug("\n\nimportEntitiesFromFile(): Wrote entities\n");
  
          // now do other stuff: connections and dictionaries
          entitiesOnly = false;
  
          // observable that parses JSON and emits write events
          jp = getJsonParserForFile(file);
+ 
 -
          final JsonEntityParserObservable jsonObservableOther =
 -            new JsonEntityParserObservable( jp, em, rootEm, fileImport, entitiesOnly );
 -        final Observable<WriteEvent> otherEventObservable = Observable.create( jsonObservableOther );
 +            new JsonEntityParserObservable(jp, em, rootEm, collectionName, fileImport, entitiesOnly);
 +        final Observable<WriteEvent> otherEventObservable = Observable.create(jsonObservableOther);
  
-         otherEventObservable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
 -        //only take while our stats tell us we should continue processing
 -        //potentially skip the first n if this is a resume operation
++        // only take while our stats tell us we should continue processing
++        // potentially skip the first n if this is a resume operation
+         otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
              @Override
-             public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
-                 return entityWrapperObservable.doOnNext(doWork);
+             public Boolean call( final WriteEvent writeEvent ) {
+                 return !statistics.shouldStopProcessingConnections();
              }
-         }, Schedulers.io()).toBlocking().last();
+         } ).skip( connectionNumSkip ).parallel( new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+                 @Override
+                 public Observable<WriteEvent> call( Observable<WriteEvent> entityWrapperObservable ) {
+                     return entityWrapperObservable.doOnNext( doWork );
+                 }
+             }, Schedulers.io() ).toBlocking().last();
  
          jp.close();
  
 +        logger.debug("\n\nimportEntitiesFromFile(): Wrote others\n");
++
+         //flush the job statistics
+         statistics.complete();
 -
 -        logger.debug("\n\nWrote others\n");
      }