You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/02/04 17:54:52 UTC
incubator-usergrid git commit: Added a buffer size to auto flush
during processing.
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-import ee88ea5ba -> 82efd20c7
Added a buffer size to auto flush during processing.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/82efd20c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/82efd20c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/82efd20c
Branch: refs/heads/two-dot-o-import
Commit: 82efd20c7be16c26540de52f672d43ff4d2f9c46
Parents: ee88ea5
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Feb 4 09:54:50 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Feb 4 09:54:50 2015 -0700
----------------------------------------------------------------------
.../importer/FileImportStatistics.java | 98 ++++++++++++++++----
.../importer/FileImportStatisticsTest.java | 95 +++++++++++++++++--
2 files changed, 167 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/82efd20c/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportStatistics.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportStatistics.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportStatistics.java
index 739c709..f8cabcd 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportStatistics.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportStatistics.java
@@ -21,6 +21,8 @@ package org.apache.usergrid.management.importer;
import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.usergrid.persistence.EntityManager;
@@ -32,8 +34,8 @@ import org.apache.usergrid.persistence.exceptions.PersistenceException;
/**
* Statistics used to track a file import. Only 1 instance of this class should exist per file imported in the cluster.
- * There is a direct 1-1 mapping of the statistics provided here and the file import status.
- * This class is threadsafe to be used across multiple threads.
+ * There is a direct 1-1 mapping of the statistics provided here and the file import status. This class is threadsafe to
+ * be used across multiple threads.
*/
public class FileImportStatistics {
@@ -42,15 +44,26 @@ public class FileImportStatistics {
private final AtomicLong entitiesWritten = new AtomicLong( 0 );
private final AtomicLong entitiesFailed = new AtomicLong( 0 );
+ private final AtomicInteger cachedOperations = new AtomicInteger( 0 );
+ private final Semaphore writeSemaphore = new Semaphore( 1 );
private final FileImport fileImport;
private final EntityManager entityManager;
+ private final int flushCount;
- public FileImportStatistics( final UUID fileImportId, final EntityManager entityManager ) {
+ /**
+ * Create an instance to track counters
+ *
+ * @param entityManager The entity manager that will hold these entities.
+ * @param fileImportId The uuid of the fileImport
+ * @param flushCount The number of success + failures to accumulate before flushing
+ */
+ public FileImportStatistics( final EntityManager entityManager, final UUID fileImportId, final int flushCount ) {
this.entityManager = entityManager;
- this.fileImport = getFileImport( fileImportId);
+ this.flushCount = flushCount;
+ this.fileImport = getFileImport( fileImportId );
}
@@ -59,6 +72,7 @@ public class FileImportStatistics {
*/
public void entityWritten() {
entitiesWritten.incrementAndGet();
+ maybeFlush();
}
@@ -66,7 +80,7 @@ public class FileImportStatistics {
* Invoke when an entity fails to write correctly
*/
- public void entityFailed(final String message ) {
+ public void entityFailed( final String message ) {
entitiesFailed.incrementAndGet();
@@ -77,9 +91,10 @@ public class FileImportStatistics {
failedEntityImport = entityManager.create( failedEntityImport );
entityManager.createConnection( fileImport, ERRORS_CONNECTION_NAME, failedEntityImport );
}
- catch(Exception e){
+ catch ( Exception e ) {
throw new PersistenceException( "Unable to save failed entity import message", e );
}
+ maybeFlush();
}
@@ -119,6 +134,60 @@ public class FileImportStatistics {
/**
+ * Return the total number of successful imports + failed imports. Can be used in resume.
+ * Note that this reflects the counts last written to cassandra, NOT the current state in memory
+ * @return
+ */
+ public long getParsedCount(){
+ final FileImport saved = getFileImport( fileImport.getUuid() );
+
+ return saved.getFailedEntityCount() + saved.getImportedEntityCount();
+ }
+
+
+ /**
+ * Returns true if we should stop processing. This will use the following logic
+ *
+ * We've attempted to import over 1k entities After 1k, we have over a 50% failure rate
+ */
+ public boolean shouldStopProcessing() {
+
+ //TODO Dave, George. What algorithm should we use here?
+ return false;
+ }
+
+
+ private void maybeFlush() {
+ final int count = cachedOperations.incrementAndGet();
+
+ //no op
+ if ( count < flushCount ) {
+ return;
+ }
+
+ //another thread is writing, no op, just return
+ if ( !writeSemaphore.tryAcquire() ) {
+ return;
+ }
+
+ final long failed = entitiesFailed.get();
+ final long written = entitiesWritten.get();
+ final String message;
+
+ if ( failed > 0 ) {
+ message = "Failed to import " + failed + " entities. Successfully imported " + written + " entities";
+ }
+ else {
+ message = "Successfully imported " + written + " entities";
+ }
+
+ updateFileImport( written, failed, FileImport.State.STARTED, message );
+ cachedOperations.addAndGet( flushCount * -1 );
+ writeSemaphore.release();
+ }
+
+
+ /**
* Update the file import status with the provided messages
*
* @param written The number of files written
@@ -146,16 +215,15 @@ public class FileImportStatistics {
/**
* Get the FileImport by uuid and return it
- * @param fileImportId
- * @return
+ *
* @throws EntityNotFoundException if we can't find the file import with the given uuid
*/
private FileImport getFileImport( final UUID fileImportId ) {
- final FileImport fileImport;
+ final FileImport fileImport;
try {
- fileImport = entityManager.get( fileImportId, FileImport.class );
+ fileImport = entityManager.get( fileImportId, FileImport.class );
}
catch ( Exception e ) {
throw new RuntimeException( "Unable to load fileImport with id " + fileImportId, e );
@@ -169,14 +237,4 @@ public class FileImportStatistics {
}
- /**
- * Returns true if we should stop processing. This will use the following logic
- *
- * We've attempted to import over 1k entities After 1k, we have over a 50% failure rate
- */
- public boolean stopProcessing() {
-
- //TODO Dave, George. What algorithm should we use here?
- return false;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/82efd20c/stack/services/src/test/java/org/apache/usergrid/management/importer/FileImportStatisticsTest.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/FileImportStatisticsTest.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/FileImportStatisticsTest.java
index e2040fd..15f9ed8 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/FileImportStatisticsTest.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/FileImportStatisticsTest.java
@@ -61,7 +61,7 @@ public class FileImportStatisticsTest {
when( em.get( importFileId, FileImport.class ) ).thenReturn( fileImport );
- final FileImportStatistics fileImportStatistics = new FileImportStatistics( importFileId, em );
+ final FileImportStatistics fileImportStatistics = new FileImportStatistics( em, importFileId, 1000 );
final long expectedCount = 100;
@@ -110,7 +110,7 @@ public class FileImportStatisticsTest {
}
} );
- final FileImportStatistics fileImportStatistics = new FileImportStatistics( importFileId, em );
+ final FileImportStatistics fileImportStatistics = new FileImportStatistics( em, importFileId, 1000 );
final long expectedSuccess = 100;
@@ -181,7 +181,7 @@ public class FileImportStatisticsTest {
when( em.get( importFileId, FileImport.class ) ).thenReturn( fileImport );
- final FileImportStatistics fileImportStatistics = new FileImportStatistics( importFileId, em );
+ final FileImportStatistics fileImportStatistics = new FileImportStatistics( em, importFileId, 1000 );
final long expectedCount = 100;
@@ -204,11 +204,94 @@ public class FileImportStatisticsTest {
assertEquals( "Same count expected", expectedCount, updated.getImportedEntityCount() );
- assertEquals("Fail count is 0", 0, updated.getFailedEntityCount());
+ assertEquals( "Fail count is 0", 0, updated.getFailedEntityCount() );
- assertEquals("Correct expected message", "Something bad happened", updated.getErrorMessage());
+ assertEquals( "Correct expected message", "Something bad happened", updated.getErrorMessage() );
- assertEquals("Expected failed state", FileImport.State.FAILED, updated.getState());
+ assertEquals( "Expected failed state", FileImport.State.FAILED, updated.getState() );
+ }
+
+
+ @Test
+ public void testAutoFlushSuccess() throws Exception {
+
+ final EntityManager em = mock( EntityManager.class );
+
+ final UUID importFileId = UUIDGenerator.newTimeUUID();
+
+
+ final FileImport fileImport = new FileImport();
+ fileImport.setUuid( importFileId );
+
+ when( em.get( importFileId, FileImport.class ) ).thenReturn( fileImport );
+
+ //mock up returning the FailedEntityImport instance after save is invoked.
+
+ when( em.create( any( FailedEntityImport.class ) ) ).thenAnswer( new Answer<FailedEntityImport>() {
+ @Override
+ public FailedEntityImport answer( final InvocationOnMock invocation ) throws Throwable {
+ return ( FailedEntityImport ) invocation.getArguments()[0];
+ }
+ } );
+
+ final int expectedSuccess = 100;
+ final int expectedFails = 100;
+ final int expectedFlushCount = 2;
+ final int flushSize = ( expectedFails + expectedFails ) / expectedFlushCount;
+
+ //set this to 1/2, so that we get saved twice
+ final FileImportStatistics fileImportStatistics = new FileImportStatistics( em, importFileId, flushSize );
+
+
+ for ( long i = 0; i < expectedSuccess; i++ ) {
+ fileImportStatistics.entityWritten();
+ }
+
+
+ for ( int i = 0; i < expectedFails; i++ ) {
+ fileImportStatistics.entityFailed( "Failed to write entity " + i );
+ }
+
+
+ fileImportStatistics.complete();
+
+
+ ArgumentCaptor<FileImport> savedFileImport = ArgumentCaptor.forClass( FileImport.class );
+
+ verify( em, times( expectedFlushCount + 1 ) ).update( savedFileImport.capture() );
+
+ final FileImport updated = savedFileImport.getAllValues().get( 2 );
+
+ assertSame( "Same instance should be updated", fileImport, updated );
+
+ assertEquals( "Same count expected", expectedSuccess, updated.getImportedEntityCount() );
+
+ assertEquals( "Same fail expected", expectedFails, updated.getFailedEntityCount() );
+
+ assertEquals( "Correct error message",
+ "Failed to import " + expectedFails + " entities. Successfully imported " + expectedSuccess + " entities",
+ updated.getErrorMessage() );
+
+ //TODO get the connections from the file import
+
+ ArgumentCaptor<FailedEntityImport> failedEntities = ArgumentCaptor.forClass( FailedEntityImport.class );
+
+ verify( em, times( expectedFails ) )
+ .createConnection( same( fileImport ), eq( "errors" ), failedEntities.capture() );
+
+ //now check all our arguments
+
+ final List<FailedEntityImport> args = failedEntities.getAllValues();
+
+ assertEquals( "Same number of error connections created", expectedFails, args.size() );
+
+
+ for ( int i = 0; i < expectedFails; i++ ) {
+
+ final FailedEntityImport failedImport = args.get( i );
+
+ assertEquals( "Same message expected", "Failed to write entity " + i, failedImport.getErrorMessage() );
+ }
}
}