You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2017/11/23 12:26:05 UTC

aries-rsa git commit: [ARIES-1757] Add support for promise

Repository: aries-rsa
Updated Branches:
  refs/heads/master a8a6a057d -> 1ff19b589


[ARIES-1757] Add support for promise


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/1ff19b58
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/1ff19b58
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/1ff19b58

Branch: refs/heads/master
Commit: 1ff19b589d9c5db6f46d5c506f8777cf4b1ca422
Parents: a8a6a05
Author: Christian Schneider <cs...@adobe.com>
Authored: Thu Nov 23 13:25:59 2017 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Nov 23 13:25:59 2017 +0100

----------------------------------------------------------------------
 provider/tcp/pom.xml                            | 52 +++++++++++---------
 .../aries/rsa/provider/tcp/TCPServer.java       | 27 ++++++++--
 .../rsa/provider/tcp/TcpInvocationHandler.java  | 27 +++++++++-
 .../aries/rsa/provider/tcp/TcpProviderTest.java | 39 +++++++++++++--
 .../tcp/myservice/ExpectedTestException.java    | 11 +++++
 .../rsa/provider/tcp/myservice/MyService.java   | 10 ++--
 .../provider/tcp/myservice/MyServiceImpl.java   | 46 +++++++++++++----
 7 files changed, 166 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/pom.xml
----------------------------------------------------------------------
diff --git a/provider/tcp/pom.xml b/provider/tcp/pom.xml
index a903860..f743563 100644
--- a/provider/tcp/pom.xml
+++ b/provider/tcp/pom.xml
@@ -1,28 +1,34 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.aries.rsa</groupId>
-        <artifactId>org.apache.aries.rsa.parent</artifactId>
-        <version>1.12.0-SNAPSHOT</version>
-        <relativePath>../../parent/pom.xml</relativePath>
-    </parent>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.aries.rsa</groupId>
+		<artifactId>org.apache.aries.rsa.parent</artifactId>
+		<version>1.12.0-SNAPSHOT</version>
+		<relativePath>../../parent/pom.xml</relativePath>
+	</parent>
 
-    <groupId>org.apache.aries.rsa.provider</groupId>
-    <artifactId>org.apache.aries.rsa.provider.tcp</artifactId>
-    <packaging>bundle</packaging>
-    <name>Aries Remote Service Admin provider TCP</name>
-    <description>Provider for Java Serialization over TCP</description>
+	<groupId>org.apache.aries.rsa.provider</groupId>
+	<artifactId>org.apache.aries.rsa.provider.tcp</artifactId>
+	<packaging>bundle</packaging>
+	<name>Aries Remote Service Admin provider TCP</name>
+	<description>Provider for Java Serialization over TCP</description>
 
-    <properties>
-        <topDirectoryLocation>../..</topDirectoryLocation>
-    </properties>
+	<properties>
+		<topDirectoryLocation>../..</topDirectoryLocation>
+	</properties>
 
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.aries.rsa</groupId>
-            <artifactId>org.apache.aries.rsa.spi</artifactId>
-            <scope>provided</scope>
-        </dependency>
-    </dependencies>
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.aries.rsa</groupId>
+			<artifactId>org.apache.aries.rsa.spi</artifactId>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.osgi</groupId>
+			<artifactId>org.osgi.util.promise</artifactId>
+			<version>1.0.0</version>
+		</dependency>
+	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
index 85a7c31..a1c8775 100644
--- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
@@ -26,11 +26,13 @@ import java.lang.reflect.InvocationTargetException;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.osgi.util.promise.Promise;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,20 +80,37 @@ public class TCPServer implements Closeable, Runnable {
         }
     }
 
-    @SuppressWarnings("unchecked")
     private void handleCall(ObjectInputStream ois, ObjectOutputStream objectOutput) throws Exception {
         String methodName = (String)ois.readObject();
         Object[] args = (Object[])ois.readObject();
         Object result = invoker.invoke(methodName, args);
+        result = resolveAsnyc(result);
         if (result instanceof InvocationTargetException) {
             result = ((InvocationTargetException) result).getCause();
-        } else if (result instanceof Future) {
-            Future<Object> fu = (Future<Object>) result;
-            result = fu.get();
         }
         objectOutput.writeObject(result);
     }
 
+    @SuppressWarnings("unchecked")
+    private Object resolveAsnyc(Object result) throws InterruptedException {
+        if (result instanceof Future) {
+            Future<Object> fu = (Future<Object>) result;
+            try {
+                result = fu.get();
+            } catch (ExecutionException e) {
+                result = e.getCause();
+            }
+        } else if (result instanceof Promise) {
+            Promise<Object> fu = (Promise<Object>) result;  
+            try {
+                result = fu.getValue();
+            } catch (InvocationTargetException e) {
+                result = e.getCause();
+            }
+        }
+        return result;
+    }
+
     @Override
     public void close() throws IOException {
         this.serverSocket.close();

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
index 8dea17f..ec59f3d 100644
--- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
@@ -29,6 +29,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.function.Supplier;
 
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+
 public class TcpInvocationHandler implements InvocationHandler {
     private String host;
     private int port;
@@ -46,23 +49,43 @@ public class TcpInvocationHandler implements InvocationHandler {
     @Override
     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
         if (Future.class.isAssignableFrom(method.getReturnType())) {
-            return createAsyncResult(method, args);
+            return createFutureResult(method, args);
+        } else if (Promise.class.isAssignableFrom(method.getReturnType())) {
+                return createPromiseResult(method, args);
         } else {
             return handleSyncCall(method, args);
         }
     }
 
-    private Object createAsyncResult(final Method method, final Object[] args) {
+    private Object createFutureResult(final Method method, final Object[] args) {
         return CompletableFuture.supplyAsync(new Supplier<Object>() {
             public Object get() {
                 try {
                     return handleSyncCall(method, args);
+                } catch (RuntimeException e) {
+                    throw e;
                 } catch (Throwable e) {
                     throw new RuntimeException(e);
                 }
             }
         });
     }
+    
+    private Object createPromiseResult(final Method method, final Object[] args) {
+        final Deferred<Object> deferred = new Deferred<Object>();
+        new Thread(new Runnable() {
+            
+            @Override
+            public void run() {
+                try {
+                    deferred.resolve(handleSyncCall(method, args));
+                } catch (Throwable e) {
+                    deferred.fail(e);
+                }
+            }
+        }).start();
+        return deferred.getPromise();
+    }
 
     private Object handleSyncCall(Method method, Object[] args) throws Throwable {
         Object result;

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
index 2160588..7a33c86 100644
--- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
@@ -22,16 +22,19 @@ import static org.hamcrest.core.StringStartsWith.startsWith;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.aries.rsa.provider.tcp.myservice.ExpectedTestException;
 import org.apache.aries.rsa.provider.tcp.myservice.MyService;
 import org.apache.aries.rsa.provider.tcp.myservice.MyServiceImpl;
 import org.apache.aries.rsa.spi.Endpoint;
@@ -42,6 +45,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.osgi.framework.BundleContext;
+import org.osgi.util.promise.Promise;
 
 public class TcpProviderTest {
 
@@ -73,7 +77,7 @@ public class TcpProviderTest {
     @Test
     public void testCallTimeout() {
         try {
-            myServiceProxy.callSlow();
+            myServiceProxy.callSlow(TIMEOUT + 100);
             Assert.fail("Expecting timeout");
         } catch (RuntimeException e) {
             Assert.assertEquals(SocketTimeoutException.class, e.getCause().getClass());
@@ -88,7 +92,7 @@ public class TcpProviderTest {
         Assert.assertEquals(msg, result);
     }
     
-    @Test(expected=IllegalArgumentException.class)
+    @Test(expected=ExpectedTestException.class)
     public void testCallException() {
         myServiceProxy.callException();
     }
@@ -113,11 +117,38 @@ public class TcpProviderTest {
     }
 
     @Test
-    public void testAsyncCall() throws Exception {
-        Future<String> result = myServiceProxy.callAsync(100);
+    public void testAsyncFuture() throws Exception {
+        Future<String> result = myServiceProxy.callAsyncFuture(100);
         String answer = result.get(1, TimeUnit.SECONDS);
         assertEquals("Finished", answer);
     }
+    
+    @Test(expected = ExpectedTestException.class)
+    public void testAsyncFutureException() throws Throwable {
+        Future<String> result = myServiceProxy.callAsyncFuture(-1);
+        try {
+            result.get();
+        } catch (ExecutionException e) {
+            throw e.getCause();
+        }
+    }
+    
+    @Test
+    public void testAsyncPromise() throws Exception {
+        Promise<String> result = myServiceProxy.callAsyncPromise(100);
+        String answer = result.getValue();
+        assertEquals("Finished", answer);
+    }
+    
+    @Test(expected = ExpectedTestException.class)
+    public void testAsyncPromiseException() throws Throwable {
+        Promise<String> result = myServiceProxy.callAsyncPromise(-1);
+        try {
+            result.getValue();
+        } catch (InvocationTargetException e) {
+            throw e.getCause();
+        }
+    }
 
     @AfterClass
     public static void close() throws IOException {

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/ExpectedTestException.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/ExpectedTestException.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/ExpectedTestException.java
new file mode 100644
index 0000000..ba7d084
--- /dev/null
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/ExpectedTestException.java
@@ -0,0 +1,11 @@
+package org.apache.aries.rsa.provider.tcp.myservice;
+
+public class ExpectedTestException extends RuntimeException {
+
+    public ExpectedTestException() {
+        super();
+    }
+
+    private static final long serialVersionUID = -6271694671646172358L;
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
index cfeb32b..026743d 100644
--- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
@@ -23,19 +23,23 @@ import java.util.concurrent.Future;
 
 import javax.jws.Oneway;
 
+import org.osgi.util.promise.Promise;
+
 public interface MyService {
     String echo(String msg);
 
-    void callSlow();
+    void callSlow(int delay);
     
     void callException();
-
+    
     // Oneway not yet supported
     @Oneway
     void callOneWay(String msg);
     
     void callWithList(List<String> msg);
     
-    Future<String> callAsync(int delay); 
+    Future<String> callAsyncFuture(int delay);
+
+    Promise<String> callAsyncPromise(int delay); 
 
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
index 1a5a48c..a682cb3 100644
--- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
@@ -24,6 +24,9 @@ import java.util.List;
 import java.util.concurrent.Future;
 import java.util.function.Supplier;
 
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+
 public class MyServiceImpl implements MyService {
 
     @Override
@@ -32,16 +35,13 @@ public class MyServiceImpl implements MyService {
     }
 
     @Override
-    public void callSlow() {
-        try {
-            Thread.sleep(300);
-        } catch (InterruptedException e) {
-        } 
+    public void callSlow(int delay) {
+        sleep(delay); 
     }
 
     @Override
     public void callException() {
-        throw new IllegalArgumentException("Throwing expected exception");
+        throw new ExpectedTestException();
     }
 
     @Override
@@ -54,17 +54,43 @@ public class MyServiceImpl implements MyService {
     }
 
     @Override
-    public Future<String> callAsync(final int delay) {
+    public Future<String> callAsyncFuture(final int delay) {
         return supplyAsync(new Supplier<String>() {
             public String get() {
-                try {
-                    Thread.sleep(delay);
-                } catch (InterruptedException e) {
+                if (delay == -1) {
+                    throw new ExpectedTestException();
                 }
+                sleep(delay);
                 return "Finished";
             }
             
         });
     }
+    
+    @Override
+    public Promise<String> callAsyncPromise(final int delay) {
+        final Deferred<String> deferred = new Deferred<String>();
+        new Thread(new Runnable() {
+            
+            @Override
+            public void run() {
+                if (delay == -1) {
+                    deferred.fail(new ExpectedTestException());
+                    return;
+                }
+                sleep(delay);
+                deferred.resolve("Finished");
+            }
+        }).start();
+        
+        return deferred.getPromise();
+    }
+
+    private void sleep(int delay) {
+        try {
+            Thread.sleep(delay);
+        } catch (InterruptedException e) {
+        }
+    }
 
 }