You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2017/11/23 12:26:05 UTC
aries-rsa git commit: [ARIES-1757] Add support for promise
Repository: aries-rsa
Updated Branches:
refs/heads/master a8a6a057d -> 1ff19b589
[ARIES-1757] Add support for promise
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/1ff19b58
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/1ff19b58
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/1ff19b58
Branch: refs/heads/master
Commit: 1ff19b589d9c5db6f46d5c506f8777cf4b1ca422
Parents: a8a6a05
Author: Christian Schneider <cs...@adobe.com>
Authored: Thu Nov 23 13:25:59 2017 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Nov 23 13:25:59 2017 +0100
----------------------------------------------------------------------
provider/tcp/pom.xml | 52 +++++++++++---------
.../aries/rsa/provider/tcp/TCPServer.java | 27 ++++++++--
.../rsa/provider/tcp/TcpInvocationHandler.java | 27 +++++++++-
.../aries/rsa/provider/tcp/TcpProviderTest.java | 39 +++++++++++++--
.../tcp/myservice/ExpectedTestException.java | 11 +++++
.../rsa/provider/tcp/myservice/MyService.java | 10 ++--
.../provider/tcp/myservice/MyServiceImpl.java | 46 +++++++++++++----
7 files changed, 166 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/pom.xml
----------------------------------------------------------------------
diff --git a/provider/tcp/pom.xml b/provider/tcp/pom.xml
index a903860..f743563 100644
--- a/provider/tcp/pom.xml
+++ b/provider/tcp/pom.xml
@@ -1,28 +1,34 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.aries.rsa</groupId>
- <artifactId>org.apache.aries.rsa.parent</artifactId>
- <version>1.12.0-SNAPSHOT</version>
- <relativePath>../../parent/pom.xml</relativePath>
- </parent>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.aries.rsa</groupId>
+ <artifactId>org.apache.aries.rsa.parent</artifactId>
+ <version>1.12.0-SNAPSHOT</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
- <groupId>org.apache.aries.rsa.provider</groupId>
- <artifactId>org.apache.aries.rsa.provider.tcp</artifactId>
- <packaging>bundle</packaging>
- <name>Aries Remote Service Admin provider TCP</name>
- <description>Provider for Java Serialization over TCP</description>
+ <groupId>org.apache.aries.rsa.provider</groupId>
+ <artifactId>org.apache.aries.rsa.provider.tcp</artifactId>
+ <packaging>bundle</packaging>
+ <name>Aries Remote Service Admin provider TCP</name>
+ <description>Provider for Java Serialization over TCP</description>
- <properties>
- <topDirectoryLocation>../..</topDirectoryLocation>
- </properties>
+ <properties>
+ <topDirectoryLocation>../..</topDirectoryLocation>
+ </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.aries.rsa</groupId>
- <artifactId>org.apache.aries.rsa.spi</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.aries.rsa</groupId>
+ <artifactId>org.apache.aries.rsa.spi</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.util.promise</artifactId>
+ <version>1.0.0</version>
+ </dependency>
+ </dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
index 85a7c31..a1c8775 100644
--- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
@@ -26,11 +26,13 @@ import java.lang.reflect.InvocationTargetException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.osgi.util.promise.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,20 +80,37 @@ public class TCPServer implements Closeable, Runnable {
}
}
- @SuppressWarnings("unchecked")
private void handleCall(ObjectInputStream ois, ObjectOutputStream objectOutput) throws Exception {
String methodName = (String)ois.readObject();
Object[] args = (Object[])ois.readObject();
Object result = invoker.invoke(methodName, args);
+ result = resolveAsnyc(result);
if (result instanceof InvocationTargetException) {
result = ((InvocationTargetException) result).getCause();
- } else if (result instanceof Future) {
- Future<Object> fu = (Future<Object>) result;
- result = fu.get();
}
objectOutput.writeObject(result);
}
+ @SuppressWarnings("unchecked")
+ private Object resolveAsnyc(Object result) throws InterruptedException {
+ if (result instanceof Future) {
+ Future<Object> fu = (Future<Object>) result;
+ try {
+ result = fu.get();
+ } catch (ExecutionException e) {
+ result = e.getCause();
+ }
+ } else if (result instanceof Promise) {
+ Promise<Object> fu = (Promise<Object>) result;
+ try {
+ result = fu.getValue();
+ } catch (InvocationTargetException e) {
+ result = e.getCause();
+ }
+ }
+ return result;
+ }
+
@Override
public void close() throws IOException {
this.serverSocket.close();
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
index 8dea17f..ec59f3d 100644
--- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
@@ -29,6 +29,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Supplier;
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+
public class TcpInvocationHandler implements InvocationHandler {
private String host;
private int port;
@@ -46,23 +49,43 @@ public class TcpInvocationHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Future.class.isAssignableFrom(method.getReturnType())) {
- return createAsyncResult(method, args);
+ return createFutureResult(method, args);
+ } else if (Promise.class.isAssignableFrom(method.getReturnType())) {
+ return createPromiseResult(method, args);
} else {
return handleSyncCall(method, args);
}
}
- private Object createAsyncResult(final Method method, final Object[] args) {
+ private Object createFutureResult(final Method method, final Object[] args) {
return CompletableFuture.supplyAsync(new Supplier<Object>() {
public Object get() {
try {
return handleSyncCall(method, args);
+ } catch (RuntimeException e) {
+ throw e;
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
});
}
+
+ private Object createPromiseResult(final Method method, final Object[] args) {
+ final Deferred<Object> deferred = new Deferred<Object>();
+ new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ deferred.resolve(handleSyncCall(method, args));
+ } catch (Throwable e) {
+ deferred.fail(e);
+ }
+ }
+ }).start();
+ return deferred.getPromise();
+ }
private Object handleSyncCall(Method method, Object[] args) throws Throwable {
Object result;
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
index 2160588..7a33c86 100644
--- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
@@ -22,16 +22,19 @@ import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.apache.aries.rsa.provider.tcp.myservice.ExpectedTestException;
import org.apache.aries.rsa.provider.tcp.myservice.MyService;
import org.apache.aries.rsa.provider.tcp.myservice.MyServiceImpl;
import org.apache.aries.rsa.spi.Endpoint;
@@ -42,6 +45,7 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.osgi.framework.BundleContext;
+import org.osgi.util.promise.Promise;
public class TcpProviderTest {
@@ -73,7 +77,7 @@ public class TcpProviderTest {
@Test
public void testCallTimeout() {
try {
- myServiceProxy.callSlow();
+ myServiceProxy.callSlow(TIMEOUT + 100);
Assert.fail("Expecting timeout");
} catch (RuntimeException e) {
Assert.assertEquals(SocketTimeoutException.class, e.getCause().getClass());
@@ -88,7 +92,7 @@ public class TcpProviderTest {
Assert.assertEquals(msg, result);
}
- @Test(expected=IllegalArgumentException.class)
+ @Test(expected=ExpectedTestException.class)
public void testCallException() {
myServiceProxy.callException();
}
@@ -113,11 +117,38 @@ public class TcpProviderTest {
}
@Test
- public void testAsyncCall() throws Exception {
- Future<String> result = myServiceProxy.callAsync(100);
+ public void testAsyncFuture() throws Exception {
+ Future<String> result = myServiceProxy.callAsyncFuture(100);
String answer = result.get(1, TimeUnit.SECONDS);
assertEquals("Finished", answer);
}
+
+ @Test(expected = ExpectedTestException.class)
+ public void testAsyncFutureException() throws Throwable {
+ Future<String> result = myServiceProxy.callAsyncFuture(-1);
+ try {
+ result.get();
+ } catch (ExecutionException e) {
+ throw e.getCause();
+ }
+ }
+
+ @Test
+ public void testAsyncPromise() throws Exception {
+ Promise<String> result = myServiceProxy.callAsyncPromise(100);
+ String answer = result.getValue();
+ assertEquals("Finished", answer);
+ }
+
+ @Test(expected = ExpectedTestException.class)
+ public void testAsyncPromiseException() throws Throwable {
+ Promise<String> result = myServiceProxy.callAsyncPromise(-1);
+ try {
+ result.getValue();
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ }
@AfterClass
public static void close() throws IOException {
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/ExpectedTestException.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/ExpectedTestException.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/ExpectedTestException.java
new file mode 100644
index 0000000..ba7d084
--- /dev/null
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/ExpectedTestException.java
@@ -0,0 +1,11 @@
+package org.apache.aries.rsa.provider.tcp.myservice;
+
+public class ExpectedTestException extends RuntimeException {
+
+ public ExpectedTestException() {
+ super();
+ }
+
+ private static final long serialVersionUID = -6271694671646172358L;
+
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
index cfeb32b..026743d 100644
--- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
@@ -23,19 +23,23 @@ import java.util.concurrent.Future;
import javax.jws.Oneway;
+import org.osgi.util.promise.Promise;
+
public interface MyService {
String echo(String msg);
- void callSlow();
+ void callSlow(int delay);
void callException();
-
+
// Oneway not yet supported
@Oneway
void callOneWay(String msg);
void callWithList(List<String> msg);
- Future<String> callAsync(int delay);
+ Future<String> callAsyncFuture(int delay);
+
+ Promise<String> callAsyncPromise(int delay);
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
index 1a5a48c..a682cb3 100644
--- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
@@ -24,6 +24,9 @@ import java.util.List;
import java.util.concurrent.Future;
import java.util.function.Supplier;
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+
public class MyServiceImpl implements MyService {
@Override
@@ -32,16 +35,13 @@ public class MyServiceImpl implements MyService {
}
@Override
- public void callSlow() {
- try {
- Thread.sleep(300);
- } catch (InterruptedException e) {
- }
+ public void callSlow(int delay) {
+ sleep(delay);
}
@Override
public void callException() {
- throw new IllegalArgumentException("Throwing expected exception");
+ throw new ExpectedTestException();
}
@Override
@@ -54,17 +54,43 @@ public class MyServiceImpl implements MyService {
}
@Override
- public Future<String> callAsync(final int delay) {
+ public Future<String> callAsyncFuture(final int delay) {
return supplyAsync(new Supplier<String>() {
public String get() {
- try {
- Thread.sleep(delay);
- } catch (InterruptedException e) {
+ if (delay == -1) {
+ throw new ExpectedTestException();
}
+ sleep(delay);
return "Finished";
}
});
}
+
+ @Override
+ public Promise<String> callAsyncPromise(final int delay) {
+ final Deferred<String> deferred = new Deferred<String>();
+ new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ if (delay == -1) {
+ deferred.fail(new ExpectedTestException());
+ return;
+ }
+ sleep(delay);
+ deferred.resolve("Finished");
+ }
+ }).start();
+
+ return deferred.getPromise();
+ }
+
+ private void sleep(int delay) {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ }
+ }
}