You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/12 23:28:27 UTC
[08/27] incubator-usergrid git commit: Changed barriers to sockets
Changed barriers to sockets
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d2e5da60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d2e5da60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d2e5da60
Branch: refs/heads/USERGRID-365
Commit: d2e5da60f95eac0f14861b72ba7817fd37eee568
Parents: 5aca048
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Jan 20 14:55:17 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Jan 20 14:55:17 2015 -0700
----------------------------------------------------------------------
.../usergrid/lock/MultiProcessBarrier.java | 54 +++++++++++++-------
.../usergrid/lock/MultiProcessLocalLock.java | 51 ++++++++----------
.../usergrid/setup/SpringIntegrationRunner.java | 39 +++-----------
.../org/apache/usergrid/setup/SystemSetup.java | 30 +++--------
.../usergrid/lock/MultiProcessBarrierTest.java | 10 ++--
.../lock/MultiProcessLocalLockTest.java | 27 +++++++---
6 files changed, 99 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d2e5da60/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessBarrier.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessBarrier.java b/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessBarrier.java
index cd5a1de..7b4e290 100644
--- a/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessBarrier.java
+++ b/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessBarrier.java
@@ -20,26 +20,29 @@
package org.apache.usergrid.lock;
-import java.io.File;
import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
import java.util.concurrent.TimeoutException;
/**
- * A barrier between processes and threads. Everyone will await until proceed has been invoked by a single
- * thread. Other threads will proceed after wait time.
+ * A barrier between processes and threads. Everyone will await until proceed has been invoked by a single thread.
+ * Other threads will proceed after wait time.
*/
public class MultiProcessBarrier {
/**
* The sleep time to wait before checking.
*/
- private static final long SLEEP_TIME = 100;
- public final File barrierFile;
+ private static final int SLEEP_TIME = 100;
+ public final int barrierPort;
+ public ServerSocket serverSocket;
- public MultiProcessBarrier( final String barrierFileName ) {
- this.barrierFile = new File( barrierFileName );
+ public MultiProcessBarrier( final int barrierPort ) {
+ this.barrierPort = barrierPort;
}
@@ -47,31 +50,46 @@ public class MultiProcessBarrier {
* Notify the other processes they can proceed.
*/
public void proceed() throws IOException {
- barrierFile.mkdirs();
- barrierFile.createNewFile();
+ serverSocket = new ServerSocket( barrierPort );
}
/**
* Await the specified file. If it exists, it will proceed
- * @param timeout
- * @throws InterruptedException
- * @throws TimeoutException
*/
- public void await(final long timeout) throws InterruptedException, TimeoutException {
+ public void await( final long timeout ) throws InterruptedException, TimeoutException {
final long stopTime = System.currentTimeMillis() + timeout;
- while(System.currentTimeMillis() < stopTime){
+ while ( System.currentTimeMillis() < stopTime ) {
+
+
+ try {
+ Socket client = new Socket();
+ client.connect( new InetSocketAddress( "127.0.0.1", barrierPort ), SLEEP_TIME );
+ //if we get here we're good, the client can connect and close
+ client.close();
+
+ finalize();
- //barrier is done break
- if(barrierFile.exists()){
return;
}
-
- Thread.sleep( SLEEP_TIME );
+ catch ( IOException e ) {
+ //not open swallow and retry
+ }
+ catch ( Throwable throwable ) {
+ throw new RuntimeException( "Something unexpected happened", throwable );
+ }
}
throw new TimeoutException( "Timeout out after " + timeout + " milliseconds waiting for the file" );
}
+
+
+ @Override
+ protected void finalize() throws Throwable {
+ if ( serverSocket != null && !serverSocket.isClosed() ) {
+ serverSocket.close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d2e5da60/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessLocalLock.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessLocalLock.java b/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessLocalLock.java
index 304ed7a..c109f2c 100644
--- a/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessLocalLock.java
+++ b/stack/test-utils/src/main/java/org/apache/usergrid/lock/MultiProcessLocalLock.java
@@ -20,12 +20,8 @@
package org.apache.usergrid.lock;
-import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.channels.OverlappingFileLockException;
+import java.net.ServerSocket;
/**
@@ -34,15 +30,15 @@ import java.nio.channels.OverlappingFileLockException;
*/
public class MultiProcessLocalLock {
- private final String fileName;
- private FileLock lock;
+ private final int socketNumber;
+ private ServerSocket lock;
/**
* The filename to use as the lock
*/
- public MultiProcessLocalLock( final String fileName ) {
- this.fileName = fileName;
+ public MultiProcessLocalLock( final int socketNumber ) {
+ this.socketNumber = socketNumber;
}
@@ -57,26 +53,16 @@ public class MultiProcessLocalLock {
throw new IllegalStateException( "You already have a lock, you cannot get a lock again" );
}
- File file = new File( fileName );
-
- if ( !file.exists() ) {
- file.getParentFile().mkdirs();
- file.createNewFile();
- }
-
- // get a file channel
- FileChannel fileChannel = new RandomAccessFile( file, "rw" ).getChannel();
-
try {
- lock = fileChannel.tryLock();
+ lock = new ServerSocket( socketNumber );
}
- //we don't have the lock, ignore
- catch(OverlappingFileLockException ofle){
+ catch ( IOException ioe ) {
+ //swallow, we didn't get the lock
return false;
}
- return hasLock();
+ return true;
}
@@ -89,7 +75,7 @@ public class MultiProcessLocalLock {
}
- lock.release();
+ lock.close();
lock = null;
}
@@ -97,22 +83,25 @@ public class MultiProcessLocalLock {
/**
* Return true if this instance has the lock
- * @return
*/
- public boolean hasLock(){
+ public boolean hasLock() {
return lock != null;
}
+
/**
- * Releases the lock if we have it, otherwise is a no-op
- * @return
+ * Releases the lock if we have it, otherwise is a no-op.
+ *
+ * @return true if we had the lock and released it. False if we didn't have the lock
*/
- public void maybeReleaseLock() throws IOException {
+ public boolean maybeReleaseLock() throws IOException {
- if(lock == null){
- return;
+ if ( lock == null ) {
+ return false;
}
releaseLock();
+
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d2e5da60/stack/test-utils/src/main/java/org/apache/usergrid/setup/SpringIntegrationRunner.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/main/java/org/apache/usergrid/setup/SpringIntegrationRunner.java b/stack/test-utils/src/main/java/org/apache/usergrid/setup/SpringIntegrationRunner.java
index d9fd2cc..a651fe3 100644
--- a/stack/test-utils/src/main/java/org/apache/usergrid/setup/SpringIntegrationRunner.java
+++ b/stack/test-utils/src/main/java/org/apache/usergrid/setup/SpringIntegrationRunner.java
@@ -20,15 +20,9 @@
package org.apache.usergrid.setup;
-import org.junit.ClassRule;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.junit.runners.model.InitializationError;
import org.junit.runners.model.Statement;
-import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-
-import org.apache.usergrid.cassandra.CassandraResource;
-import org.apache.usergrid.cassandra.SpringResource;
-import org.apache.usergrid.persistence.index.impl.ElasticSearchResource;
/**
@@ -37,10 +31,7 @@ import org.apache.usergrid.persistence.index.impl.ElasticSearchResource;
public class SpringIntegrationRunner extends BlockJUnit4ClassRunner {
-
- private static boolean initialized;
-
-
+ private static SystemSetup setup;
/**
@@ -54,37 +45,23 @@ public class SpringIntegrationRunner extends BlockJUnit4ClassRunner {
@Override
protected Statement withBeforeClasses( final Statement statement ) {
- if(!initialized){
- runSetup();
- }
-
-
- final Statement toReturn = super.withBeforeClasses( statement );
-
- return toReturn;
-
+ singletonSetup();
+ return super.withBeforeClasses( statement );
}
- /**
- * Run the setup once per JVM
- */
- public static synchronized void runSetup() {
+ private static synchronized void singletonSetup() {
- if(initialized){
+ if ( setup != null ) {
return;
}
+ setup = new SystemSetup();
try {
- new SystemSetup().maybeInitialize();
+ setup.maybeInitialize();
}
catch ( Exception e ) {
- throw new RuntimeException( "Unable to initialize the system", e );
+ throw new RuntimeException( "Unable to start subsystem" );
}
-
-
- initialized = true;
-
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d2e5da60/stack/test-utils/src/main/java/org/apache/usergrid/setup/SystemSetup.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/main/java/org/apache/usergrid/setup/SystemSetup.java b/stack/test-utils/src/main/java/org/apache/usergrid/setup/SystemSetup.java
index 0681c25..9460365 100644
--- a/stack/test-utils/src/main/java/org/apache/usergrid/setup/SystemSetup.java
+++ b/stack/test-utils/src/main/java/org/apache/usergrid/setup/SystemSetup.java
@@ -37,15 +37,15 @@ import org.apache.usergrid.persistence.index.impl.ElasticSearchResource;
*/
public class SystemSetup {
- private static final String TEMP_FILE_PATH = "target/surefirelocks";
- public static final String LOCK_NAME = TEMP_FILE_PATH + "/lock";
- public static final String START_BARRIER_NAME = TEMP_FILE_PATH + "/start_barrier";
+
+ public static final int LOCK_PORT = Integer.parseInt( System.getProperty( "test.lock.port", "10101") );
+ public static final int START_BARRIER_PORT = Integer.parseInt( System.getProperty( "test.barrier.port", "10102") );
public static final long ONE_MINUTE = 60000;
- final MultiProcessLocalLock lock = new MultiProcessLocalLock( LOCK_NAME );
- final MultiProcessBarrier barrier = new MultiProcessBarrier( START_BARRIER_NAME );
+ final MultiProcessLocalLock lock = new MultiProcessLocalLock( LOCK_PORT );
+ final MultiProcessBarrier barrier = new MultiProcessBarrier( START_BARRIER_PORT );
/**
@@ -65,6 +65,7 @@ public class SystemSetup {
SpringResource.getInstance().migrate();
+
//signal to other processes we've migrated, and they can proceed
barrier.proceed();
}
@@ -72,24 +73,7 @@ public class SystemSetup {
barrier.await( ONE_MINUTE );
- //it doesn't matter who finishes first. We need to remove the resources so we start correctly next time.
- //not ideal, but a clean will solve this issue too
- if ( lock.hasLock() ) {
-
- //add a shutdown hook so we clean up after ourselves. Kinda fugly, but works since we can't clean on start
- Runtime.getRuntime().addShutdownHook( new Thread() {
-
- public void run() {
-
- System.out.println( "Shutdown Hook is running !" );
- deleteFile( LOCK_NAME );
- deleteFile( START_BARRIER_NAME );
- }
- } );
-
-
- lock.releaseLock();
- }
+ lock.maybeReleaseLock();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d2e5da60/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessBarrierTest.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessBarrierTest.java b/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessBarrierTest.java
index 3fe0e57..5863041 100644
--- a/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessBarrierTest.java
+++ b/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessBarrierTest.java
@@ -43,11 +43,15 @@ import static org.junit.Assert.fail;
public class MultiProcessBarrierTest {
+ public static final int START_BARRIER_PORT = Integer.parseInt( System.getProperty( "test.barrier.port", "10102") );
+
+
+
@Test
public void singleBarrierTest() throws IOException, InterruptedException, TimeoutException {
final String file = newFileName();
- final MultiProcessBarrier barrier = new MultiProcessBarrier( file );
+ final MultiProcessBarrier barrier = new MultiProcessBarrier( START_BARRIER_PORT );
try {
@@ -80,7 +84,7 @@ public class MultiProcessBarrierTest {
//now create the barrier and execute it
- MultiProcessBarrier barrier = new MultiProcessBarrier( file );
+ MultiProcessBarrier barrier = new MultiProcessBarrier( START_BARRIER_PORT );
barrier.proceed();
assertTrue( "other barriers proceeded", latch.await( 1000, TimeUnit.MILLISECONDS ) );
@@ -120,7 +124,7 @@ public class MultiProcessBarrierTest {
@Override
public void run() {
- MultiProcessBarrier barrier = new MultiProcessBarrier( fileName );
+ MultiProcessBarrier barrier = new MultiProcessBarrier( START_BARRIER_PORT );
try {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d2e5da60/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessLocalLockTest.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessLocalLockTest.java b/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessLocalLockTest.java
index 44c8e20..1f3663f 100644
--- a/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessLocalLockTest.java
+++ b/stack/test-utils/src/test/java/org/apache/usergrid/lock/MultiProcessLocalLockTest.java
@@ -34,6 +34,8 @@ import static org.junit.Assert.*;
*/
public class MultiProcessLocalLockTest {
+ public static final int LOCK_PORT = Integer.parseInt( System.getProperty( "test.lock.port", "10101") );
+
/**
* Create and verify the single lock
* @throws IOException
@@ -43,7 +45,7 @@ public class MultiProcessLocalLockTest {
final String lockName = newFileName();
- MultiProcessLocalLock lock = new MultiProcessLocalLock( lockName);
+ MultiProcessLocalLock lock = new MultiProcessLocalLock( LOCK_PORT);
assertTrue(lock.tryLock());
@@ -62,7 +64,7 @@ public class MultiProcessLocalLockTest {
final String lockName = newFileName();
- MultiProcessLocalLock lock1 = new MultiProcessLocalLock( lockName );
+ MultiProcessLocalLock lock1 = new MultiProcessLocalLock( LOCK_PORT );
assertTrue( lock1.tryLock() );
@@ -70,16 +72,23 @@ public class MultiProcessLocalLockTest {
//get lock 2, should fail
- MultiProcessLocalLock lock2 = new MultiProcessLocalLock( lockName );
+ MultiProcessLocalLock lock2 = new MultiProcessLocalLock( LOCK_PORT );
assertFalse(lock2.tryLock());
assertFalse(lock2.hasLock());
-
//release lock1
- lock1.maybeReleaseLock();
+ boolean lock1release = lock1.maybeReleaseLock();
+
+ assertTrue( "lock released", lock1release );
+
+ boolean lock2release = lock2.maybeReleaseLock();
+
+ assertFalse( "lock released", lock2release );
+
+
//should succeed
@@ -91,7 +100,13 @@ public class MultiProcessLocalLockTest {
assertFalse(lock1.hasLock());
- lock2.maybeReleaseLock();
+ lock1release = lock1.maybeReleaseLock();
+
+ assertFalse( "lock released", lock1release );
+
+ lock2release = lock2.maybeReleaseLock();
+
+ assertTrue( "lock released", lock2release );
}