You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/10 21:26:02 UTC
[09/50] [abbrv] git commit: Added heartbeats to ExportService. Sends
one between writing entities every time one takes more than 5 seconds.
Added heartbeats to ExportService. Sends one between writing entities every time one takes more than 5 seconds.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bc70a3dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bc70a3dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bc70a3dd
Branch: refs/pull/70/merge
Commit: bc70a3dd87a6b3970eac4d0f0349ddd93653d646
Parents: 9a16d8f
Author: George Reyes <Ap...@George-Reyess-MacBook-Pro.local>
Authored: Thu Feb 13 14:53:40 2014 -0800
Committer: George Reyes <Ap...@George-Reyess-MacBook-Pro.local>
Committed: Thu Feb 13 14:53:40 2014 -0800
----------------------------------------------------------------------
.../usergrid/management/export/ExportJob.java | 2 +-
.../management/export/ExportService.java | 3 +-
.../management/export/ExportServiceImpl.java | 136 ++++++++-----------
3 files changed, 63 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bc70a3dd/stack/services/src/main/java/org/usergrid/management/export/ExportJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/usergrid/management/export/ExportJob.java b/stack/services/src/main/java/org/usergrid/management/export/ExportJob.java
index bd72c6e..ec5f27b 100644
--- a/stack/services/src/main/java/org/usergrid/management/export/ExportJob.java
+++ b/stack/services/src/main/java/org/usergrid/management/export/ExportJob.java
@@ -39,7 +39,7 @@ public class ExportJob extends OnlyOnceJob {
jobExecution.heartbeat();
//pass in jobExecution so that you can call the heartbeat in the do export method.
- exportService.doExport( config );
+ exportService.doExport( config, jobExecution );
logger.info( "executed ExportJob completed normally" );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bc70a3dd/stack/services/src/main/java/org/usergrid/management/export/ExportService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/usergrid/management/export/ExportService.java b/stack/services/src/main/java/org/usergrid/management/export/ExportService.java
index e2e5d54..aa1dd1b 100644
--- a/stack/services/src/main/java/org/usergrid/management/export/ExportService.java
+++ b/stack/services/src/main/java/org/usergrid/management/export/ExportService.java
@@ -3,6 +3,7 @@ package org.usergrid.management.export;
import java.util.UUID;
+import org.usergrid.batch.JobExecution;
import org.usergrid.management.ExportInfo;
@@ -23,7 +24,7 @@ public interface ExportService {
* Perform the export to the external resource
* @param config
*/
- void doExport(ExportInfo config) throws Exception;
+ void doExport(ExportInfo config, JobExecution jobExecution) throws Exception;
/**
* Returns the UUID to the user
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bc70a3dd/stack/services/src/main/java/org/usergrid/management/export/ExportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/usergrid/management/export/ExportServiceImpl.java b/stack/services/src/main/java/org/usergrid/management/export/ExportServiceImpl.java
index 5f532c7..dbbe520 100644
--- a/stack/services/src/main/java/org/usergrid/management/export/ExportServiceImpl.java
+++ b/stack/services/src/main/java/org/usergrid/management/export/ExportServiceImpl.java
@@ -19,6 +19,7 @@ import org.codehaus.jackson.util.DefaultPrettyPrinter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.usergrid.batch.JobExecution;
import org.usergrid.batch.service.SchedulerService;
import org.usergrid.management.ExportInfo;
import org.usergrid.management.ManagementService;
@@ -40,7 +41,7 @@ import com.google.common.collect.BiMap;
*
*
*/
-public class ExportServiceImpl implements ExportService{
+public class ExportServiceImpl implements ExportService {
private static final Logger logger = LoggerFactory.getLogger( ExportServiceImpl.class );
@@ -57,6 +58,9 @@ public class ExportServiceImpl implements ExportService{
//Maximum amount of entities retrieved in a single go.
public static final int MAX_ENTITY_FETCH = 100;
+ //Amount of time that has passed before sending another heart beat in millis
+ public static final int TIMESTAMP_DELTA = 5000;
+
private JsonFactory jsonFactory = new JsonFactory();
private String outputDir = "/Users/ApigeeCorportation";
@@ -101,17 +105,17 @@ public class ExportServiceImpl implements ExportService{
// good error messages when not found.
//validate user has access key to org (rather valid user has admin access token)
- //this is token validation
+ //this is token validation
JobData jobData = new JobData();
- jobData.setProperty( "exportInfo",config );
+ jobData.setProperty( "exportInfo", config );
long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
- JobData retJobData = sch.createJob( "exportJob",soonestPossible, jobData );
+ JobData retJobData = sch.createJob( "exportJob", soonestPossible, jobData );
jobUUID = retJobData.getUuid();
try {
JobStat merp = sch.getStatsForJob( "exportJob", retJobData.getUuid() );
- System.out.println("hi");
+ System.out.println( "hi" );
}
catch ( Exception e ) {
logger.error( "could not get stats for job" );
@@ -120,24 +124,23 @@ public class ExportServiceImpl implements ExportService{
@Override
- public void doExport( final ExportInfo config ) throws Exception {
+ public void doExport( final ExportInfo config, final JobExecution jobExecution ) throws Exception {
Map<UUID, String> organizations = getOrgs();
for ( Map.Entry<UUID, String> organization : organizations.entrySet() ) {
- exportApplicationsForOrg( organization , config );
+ exportApplicationsForOrg( organization, config, jobExecution );
}
}
-
private Map<UUID, String> getOrgs() throws Exception {
// Loop through the organizations
- // TODO:this will come from the orgs in schedule when you do the validations. delete orgId
+ // TODO:this will come from the orgs in schedule when you do the validations. delete orgId
UUID orgId = null;
Map<UUID, String> organizationNames = null;
- // managementService.setup();
+ // managementService.setup();
if ( orgId == null ) {
@@ -193,47 +196,29 @@ public class ExportServiceImpl implements ExportService{
this.managementService = managementService;
}
- public UUID getJobUUID () {
+
+ public UUID getJobUUID() {
return jobUUID;
}
-//write test checking to see what happens if the input stream is closed or wrong.
-//TODO: make multipart streaming functional
- //currently only stores the collection in memory then flushes it.
- private void exportApplicationsForOrg( Map.Entry<UUID, String> organization,final ExportInfo config ) throws Exception {
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- //baos.reset();
- //ObjectOutputStream oos = new ObjectOutputStream(baos);
-
-// OutputStreamWriter osw = new OutputStreamWriter(oos,"UTF-8");
-// PrintWriter out = new PrintWriter( osw );
-//
-// //oos.reset();
-//
-// Writer wrtJSon = new OutputStreamWriter( oos, "UTF-8" );
+ //write test checking to see what happens if the input stream is closed or wrong.
+ //TODO: make multipart streaming functional
+ //currently only stores the collection in memory then flushes it.
+ private void exportApplicationsForOrg( Map.Entry<UUID, String> organization, final ExportInfo config,
+ final JobExecution jobExecution ) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
logger.info( "" + organization );
-
// Loop through the applications per organization
BiMap<UUID, String> applications = managementService.getApplicationsForOrganization( organization.getKey() );
for ( Map.Entry<UUID, String> application : applications.entrySet() ) {
logger.info( application.getValue() + " : " + application.getKey() );
- // Get the JSon serializer.
- //Creates the applications folder
- /* What needs to be done:
- * take the file name generator and create one that will only output the collections we need
- * this will probably icnlude taking both file names, and making sure that it is not doing
- * two passes as todd had it originally. */
-
- // JsonGenerator jg = getJsonGenerator( createOutputFile( "application", application.getValue() ) );
-
- String appFileName = prepareOutputFileName( "application", application.getValue() );
+ String appFileName = prepareOutputFileName( "application", application.getValue() );
JsonGenerator jg = getJsonGenerator( baos );
@@ -243,8 +228,8 @@ public class ExportServiceImpl implements ExportService{
Entity appEntity = rootEm.get( application.getKey() );
+ jobExecution.heartbeat();
Map<String, Object> dictionaries = new HashMap<String, Object>();
-
for ( String dictionary : rootEm.getDictionaries( appEntity ) ) {
Map<Object, Object> dict = rootEm.getDictionaryAsMap( appEntity, dictionary );
@@ -273,40 +258,31 @@ public class ExportServiceImpl implements ExportService{
nsEntity.setMetadata( "counters", entityCounters );
nsEntity.setMetadata( "collections", collections );
+ jobExecution.heartbeat();
jg.writeStartArray();
jg.writeObject( nsEntity );
- // Create a GENERATOR for the application collections.
- //JsonGenerator collectionsJg = getJsonGenerator( createOutputFile( "collections", application.getValue() ) );
-
- //String collectionsFilename = prepareOutputFileName( "collections","appDummyName" );
- //JsonGenerator collectionsJg = getJsonGenerator( oos );
-
-
- //collectionsJg.writeStartObject();
- // jg.writeStartObject();
Map<String, Object> metadata = em.getApplicationCollectionMetadata();
- //don't need to echo as not a command line tool anymore
- //echo( JsonUtils.mapToFormattedJsonString( metadata ) );
+ long starting_time = System.currentTimeMillis();
// Loop through the collections. This is the only way to loop
// through the entities in the application (former namespace).
for ( String collectionName : metadata.keySet() ) {
+
Query query = new Query();
query.setLimit( MAX_ENTITY_FETCH );
query.setResultsLevel( Results.Level.ALL_PROPERTIES );
Results entities = em.searchCollection( em.getApplicationRef(), collectionName, query );
- while ( entities.size() > 0 ) {
+ starting_time = checkTimeDelta( starting_time, jobExecution );
+
+ while ( entities.size() > 0 ) {
+ jobExecution.heartbeat();
for ( Entity entity : entities ) {
- // Export the entity first and later the collections for
- // this entity.
jg.writeObject( entity );
- //echo( entity );
-
saveCollectionMembers( jg, em, application.getValue(), entity );
}
@@ -324,21 +300,31 @@ public class ExportServiceImpl implements ExportService{
// Close writer and file for this application.
- // logger.warn();
+ // logger.warn();
jg.writeEndArray();
jg.close();
-
baos.flush();
baos.close();
- InputStream is = new ByteArrayInputStream( baos.toByteArray());
- //InputStream is = new ObjectInputStream );
+ InputStream is = new ByteArrayInputStream( baos.toByteArray() );
s3Export.copyToS3( is, config );
- //below line doesn't copy very good data anyways.
}
}
+
+ public long checkTimeDelta( long startingTime, final JobExecution jobExecution ) {
+
+ long cur_time = System.currentTimeMillis();
+
+ if ( startingTime <= ( cur_time - TIMESTAMP_DELTA ) ) {
+ jobExecution.heartbeat();
+ return cur_time;
+ }
+ return startingTime;
+ }
+
+
/**
* Serialize and save the collection members of this <code>entity</code>
*
@@ -349,6 +335,8 @@ public class ExportServiceImpl implements ExportService{
private void saveCollectionMembers( JsonGenerator jg, EntityManager em, String application, Entity entity )
throws Exception {
+ long timestamp = System.currentTimeMillis();
+
Set<String> collections = em.getCollections( entity );
// Only create entry for Entities that have collections
@@ -356,16 +344,14 @@ public class ExportServiceImpl implements ExportService{
return;
}
- // jg.writeFieldName( entity.getUuid().toString() );
- jg.writeStartObject();
for ( String collectionName : collections ) {
jg.writeFieldName( collectionName );
- // Start collection array.
jg.writeStartArray();
- Results collectionMembers = em.getCollection( entity, collectionName, null, 100000, Results.Level.IDS, false );
+ Results collectionMembers =
+ em.getCollection( entity, collectionName, null, 100000, Results.Level.IDS, false );
List<UUID> entityIds = collectionMembers.getIds();
@@ -389,11 +375,10 @@ public class ExportServiceImpl implements ExportService{
jg.writeEndObject();
}
- // protected JsonGenerator getJsonGenerator( String outFile ) throws IOException {
- // return getJsonGenerator( new File( outputDir, outFile ) );
- // }
- /** Persists the connection for this entity. */
+ /**
+ * Persists the connection for this entity.
+ */
private void saveDictionaries( Entity entity, EntityManager em, JsonGenerator jg ) throws Exception {
jg.writeFieldName( "dictionaries" );
@@ -424,7 +409,9 @@ public class ExportServiceImpl implements ExportService{
}
- /** Persists the connection for this entity. */
+ /**
+ * Persists the connection for this entity.
+ */
private void saveConnections( Entity entity, EntityManager em, JsonGenerator jg ) throws Exception {
jg.writeFieldName( "connections" );
@@ -449,7 +436,7 @@ public class ExportServiceImpl implements ExportService{
}
- protected JsonGenerator getJsonGenerator(ByteArrayOutputStream out ) throws IOException {
+ protected JsonGenerator getJsonGenerator( ByteArrayOutputStream out ) throws IOException {
//TODO:shouldn't the below be UTF-16?
//PrintWriter out = new PrintWriter( outFile, "UTF-8" );
@@ -459,8 +446,9 @@ public class ExportServiceImpl implements ExportService{
return jg;
}
+
protected File createOutputFile( String type, String name ) {
- return new File(prepareOutputFileName( type, name ) );
+ return new File( prepareOutputFileName( type, name ) );
}
@@ -492,12 +480,8 @@ public class ExportServiceImpl implements ExportService{
return outputFileName;
}
+
@Autowired
@Override
- public void setS3Export (S3Export s3Export) { this.s3Export = s3Export; }
-
-
-
-
-
+ public void setS3Export( S3Export s3Export ) { this.s3Export = s3Export; }
}