You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by ju...@apache.org on 2017/04/11 11:09:08 UTC

[1/3] aries-rsa git commit: fastbin should throw a service exception for unknown methods

Repository: aries-rsa
Updated Branches:
  refs/heads/master aaafdf780 -> 1009ba59f


fastbin should throw a service exception for unknown methods

when the client tries to call a method that the implementation does not
have (e.g. incompatible class files) a ServiceException should be thrown

Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/d779ff10
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/d779ff10
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/d779ff10

Branch: refs/heads/master
Commit: d779ff10e12ec39e47996f294c6c658f2a4cd391
Parents: aaafdf7
Author: Johannes Utzig <ju...@apache.org>
Authored: Tue Feb 21 10:02:37 2017 +0100
Committer: Johannes Utzig <ju...@apache.org>
Committed: Tue Apr 11 13:04:49 2017 +0200

----------------------------------------------------------------------
 .../provider/fastbin/tcp/ServerInvokerImpl.java | 143 +++++++++----------
 .../provider/fastbin/TransportFailureTest.java  |   2 +-
 2 files changed, 72 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d779ff10/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
index 1dd58f9..d971140 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
@@ -246,80 +246,22 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched {
             final ServiceFactoryHolder holder = holders.get(service);
             Runnable task = null;
             if(holder==null) {
-                LOGGER.warn("The requested service {"+service+"} is not available");
-                task = new Runnable() {
-                    public void run() {
-
-                        final DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
-                        try {
-                            baos.writeInt(0); // make space for the size field.
-                            baos.writeVarLong(correlation);
-                        } catch (IOException e) { // should not happen
-                            LOGGER.error("Failed to write to buffer",e);
-                            throw new RuntimeException(e);
-                        }
-
-                        // Lets decode the remaining args on the target's executor
-                        // to take cpu load off the
-                        BlockingInvocationStrategy strategy = new BlockingInvocationStrategy();
-                        strategy.service(ObjectSerializationStrategy.INSTANCE, getClass().getClassLoader(), null,  new ServiceException("The requested service {"+service+"} is not available"), bais, baos, new Runnable() {
-
-                            public void run() {
-                                final Buffer command = baos.toBuffer();
-
-                                // Update the size field.
-                                BufferEditor editor = command.buffer().bigEndianEditor();
-                                editor.writeInt(command.length);
-
-                                queue().execute(new Runnable() {
-                                    public void run() {
-                                        transport.offer(command);
-                                    }
-                                });
-                            }
-                        });
-                    }
-                };
+                String message = "The requested service {"+service+"} is not available";
+                LOGGER.warn(message);
+                task = new SendTask(bais, correlation, transport, message);
             }
             final Object svc = holder==null ? null : holder.factory.get();
-            if(holder!=null)
-            {
-                final MethodData methodData = holder.getMethodData(encoded_method);
-
-
-                task = new Runnable() {
-                    public void run() {
-
-                        final DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
-                        try {
-                            baos.writeInt(0); // make space for the size field.
-                            baos.writeVarLong(correlation);
-                        } catch (IOException e) { // should not happen
-                            LOGGER.error("Failed to write to buffer",e);
-                            throw new RuntimeException(e);
-                        }
-
-                        // Lets decode the remaining args on the target's executor
-                        // to take cpu load off the
-                        methodData.invocationStrategy.service(methodData.serializationStrategy, holder.loader, methodData.method, svc, bais, baos, new Runnable() {
-                            public void run() {
-                                holder.factory.unget();
-                                final Buffer command = baos.toBuffer();
-
-                                // Update the size field.
-                                BufferEditor editor = command.buffer().bigEndianEditor();
-                                editor.writeInt(command.length);
-
-                                queue().execute(new Runnable() {
-                                    public void run() {
-                                        transport.offer(command);
-                                    }
-                                });
-                            }
-                        });
-                    }
-                };
-
+            if(holder!=null) {
+                try {
+                    final MethodData methodData = holder.getMethodData(encoded_method);
+                    task = new SendTask(svc, bais, holder, correlation, methodData, transport);
+                }
+                catch (ReflectiveOperationException reflectionEx) {
+                    final String methodName = encoded_method.utf8().toString();
+                    String message = "The requested method {"+methodName+"} is not available";
+                    LOGGER.warn(message);
+                    task = new SendTask(bais, correlation, transport, message);
+                }
             }
 
             Executor executor;
@@ -378,4 +320,61 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched {
         }
     }
 
+    private final class SendTask implements Runnable {
+        private Object svc;
+        private DataByteArrayInputStream bais;
+        private ServiceFactoryHolder holder;
+        private long correlation;
+        private MethodData methodData;
+        private Transport transport;
+
+
+        private SendTask(Object svc, DataByteArrayInputStream bais, ServiceFactoryHolder holder, long correlation, MethodData methodData, Transport transport) {
+            this.svc = svc;
+            this.bais = bais;
+            this.holder = holder;
+            this.correlation = correlation;
+            this.methodData = methodData;
+            this.transport = transport;
+        }
+
+        private SendTask(DataByteArrayInputStream bais, long correlation, Transport transport, String errorMessage) {
+            this(new ServiceException(errorMessage), bais, null, correlation, new MethodData(new BlockingInvocationStrategy(), ObjectSerializationStrategy.INSTANCE, null),transport);
+        }
+
+        public void run() {
+
+            final DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+            try {
+                baos.writeInt(0); // make space for the size field.
+                baos.writeVarLong(correlation);
+            } catch (IOException e) { // should not happen
+                LOGGER.error("Failed to write to buffer",e);
+                throw new RuntimeException(e);
+            }
+
+            // Lets decode the remaining args on the target's executor
+            // to take cpu load off the
+
+            ClassLoader loader = holder==null ? getClass().getClassLoader() : holder.loader;
+            methodData.invocationStrategy.service(methodData.serializationStrategy, loader, methodData.method, svc, bais, baos, new Runnable() {
+                public void run() {
+                    if(holder!=null)
+                        holder.factory.unget();
+                    final Buffer command = baos.toBuffer();
+
+                    // Update the size field.
+                    BufferEditor editor = command.buffer().bigEndianEditor();
+                    editor.writeInt(command.length);
+
+                    queue().execute(new Runnable() {
+                        public void run() {
+                            transport.offer(command);
+                        }
+                    });
+                }
+            });
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d779ff10/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java
index 3682d45..4b175fd 100644
--- a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java
+++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java
@@ -105,7 +105,7 @@ public class TransportFailureTest {
                         Thread.sleep(SLEEP_TIME);
                         // Big introspection call to access the transport channel and close it, simulating
                         // a disconnect on the client side.
-                        ((SocketChannel) get(get(get(get(get(callback, "val$helper"), "onComplete"), "this$1"), "val$transport"), "channel")).close();
+                        ((SocketChannel) get(get(get(get(get(callback, "val$helper"), "onComplete"), "this$1"), "transport"), "channel")).close();
                     } catch (Throwable e) {
                         e.printStackTrace();
                     }


[2/3] aries-rsa git commit: [ARIES-1713] TopologyManagerExport fails with multiple interfaces

Posted by ju...@apache.org.
[ARIES-1713] TopologyManagerExport fails with multiple interfaces

if the exported interfaces are not a simple string but a String+ there
was a ClassCastException in shouldExport



Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/33b1e4d2
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/33b1e4d2
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/33b1e4d2

Branch: refs/heads/master
Commit: 33b1e4d282f1af5135768d013e780009d00d405c
Parents: d779ff1
Author: Johannes Utzig <ju...@apache.org>
Authored: Tue Feb 21 10:03:45 2017 +0100
Committer: Johannes Utzig <ju...@apache.org>
Committed: Tue Apr 11 13:05:00 2017 +0200

----------------------------------------------------------------------
 .../exporter/TopologyManagerExport.java         | 16 ++---
 .../exporter/TopologyManagerExportTest.java     | 61 +++++++++++++++++---
 2 files changed, 62 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/33b1e4d2/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java
index 6200a29..0be28a4 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.concurrent.Executor;
 
 import org.apache.aries.rsa.spi.ExportPolicy;
+import org.apache.aries.rsa.util.StringPlus;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.ServiceEvent;
 import org.osgi.framework.ServiceListener;
@@ -86,7 +87,7 @@ public class TopologyManagerExport implements ServiceListener {
             export(serviceRef);
         }
     };
-    
+
     public void remove(RemoteServiceAdmin rsa) {
         rsaSet.remove(rsa);
         endpointRepo.removeRemoteServiceAdmin(rsa);
@@ -122,17 +123,18 @@ public class TopologyManagerExport implements ServiceListener {
                 // already handled by this remoteServiceAdmin
                 LOG.debug("already handled by this remoteServiceAdmin -> skipping");
             } else {
-                
+
                 exportServiceUsingRemoteServiceAdmin(sref, remoteServiceAdmin, addProps);
             }
         }
     }
 
     private boolean shouldExport(ServiceReference<?> sref, Map<String, ?> addProps) {
-        String exported = (String)sref.getProperty(RemoteConstants.SERVICE_EXPORTED_INTERFACES);
-        String addExported = (String)addProps.get(RemoteConstants.SERVICE_EXPORTED_INTERFACES);
-        String effectiveExported = addExported != null ? addExported : exported;
-        return (effectiveExported != null) && !effectiveExported.isEmpty();
+        List<String> exported= StringPlus.normalize(sref.getProperty(RemoteConstants.SERVICE_EXPORTED_INTERFACES));
+        List<String> addExported = StringPlus.normalize(addProps.get(RemoteConstants.SERVICE_EXPORTED_INTERFACES));
+        int length = exported == null ? 0 : exported.size();
+        length += addExported == null ? 0 : addExported.size();
+        return length>0;
     }
 
     private Object getSymbolicName(Bundle bundle) {
@@ -140,7 +142,7 @@ public class TopologyManagerExport implements ServiceListener {
     }
 
     private void exportServiceUsingRemoteServiceAdmin(final ServiceReference<?> sref,
-                                                      final RemoteServiceAdmin remoteServiceAdmin, 
+                                                      final RemoteServiceAdmin remoteServiceAdmin,
                                                       Map<String, ?> addProps) {
         // abort if the service was unregistered by the time we got here
         // (we check again at the end, but this optimization saves unnecessary heavy processing)

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/33b1e4d2/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExportTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExportTest.java b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExportTest.java
index 4c9d28f..2307566 100644
--- a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExportTest.java
+++ b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExportTest.java
@@ -20,8 +20,10 @@ package org.apache.aries.rsa.topologymanager.exporter;
 
 import static org.easymock.EasyMock.expectLastCall;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
@@ -57,7 +59,7 @@ public class TopologyManagerExportTest {
         final ServiceReference sref = createUserService(c);
         EndpointDescription epd = createEndpoint();
         expectServiceExported(c, rsa, notifier, sref, epd);
-        
+
         c.replay();
         EndpointRepository endpointRepo = new EndpointRepository();
         endpointRepo.setNotifier(notifier);
@@ -67,19 +69,19 @@ public class TopologyManagerExportTest {
         exportManager.add(rsa);
         exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref));
         c.verify();
-        
+
         c.reset();
         notifier.endpointRemoved(epd, null);
         expectLastCall().once();
         c.replay();
         exportManager.serviceChanged(new ServiceEvent(ServiceEvent.UNREGISTERING, sref));
         c.verify();
-        
+
         c.reset();
         c.replay();
         exportManager.serviceChanged(new ServiceEvent(ServiceEvent.MODIFIED, sref));
         c.verify();
-        
+
         c.reset();
         c.replay();
         exportManager.remove(rsa);
@@ -104,13 +106,50 @@ public class TopologyManagerExportTest {
         c.verify();
     }
 
+    @Test
+    public void testExportExistingMultipleInterfaces() throws Exception {
+        IMocksControl c = EasyMock.createControl();
+        RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class);
+        final EndpointListenerNotifier mockEpListenerNotifier = c.createMock(EndpointListenerNotifier.class);
+        List<String> exportedInterfaces = Arrays.asList("a.b.C","foo.Bar");
+        final ServiceReference sref = createUserService(c, exportedInterfaces);
+        expectServiceExported(c, rsa, mockEpListenerNotifier, sref, createEndpoint());
+        c.replay();
+
+        EndpointRepository endpointRepo = new EndpointRepository();
+        endpointRepo.setNotifier(mockEpListenerNotifier);
+        ExportPolicy policy = new DefaultExportPolicy();
+        TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, syncExecutor(), policy);
+        exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref));
+        exportManager.add(rsa);
+        c.verify();
+    }
+
+    @Test
+    public void testExportExistingNoExportedInterfaces() throws Exception {
+        IMocksControl c = EasyMock.createControl();
+        RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class);
+        final EndpointListenerNotifier mockEpListenerNotifier = c.createMock(EndpointListenerNotifier.class);
+        String exportedInterfaces = "";
+        final ServiceReference sref = createUserService(c, exportedInterfaces);
+        c.replay();
+
+        EndpointRepository endpointRepo = new EndpointRepository();
+        endpointRepo.setNotifier(mockEpListenerNotifier);
+        ExportPolicy policy = new DefaultExportPolicy();
+        TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, syncExecutor(), policy);
+        exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref));
+        exportManager.add(rsa);
+        c.verify();
+    }
+
     private void expectServiceExported(IMocksControl c, RemoteServiceAdmin rsa,
                                        final EndpointListener listener,
                                        final ServiceReference sref, EndpointDescription epd) {
         ExportRegistration exportRegistration = createExportRegistration(c, epd);
         EasyMock.expect(rsa.exportService(EasyMock.same(sref), (Map<String, Object>)EasyMock.anyObject()))
             .andReturn(Collections.singletonList(exportRegistration)).once();
-        listener.endpointAdded(epd, null); 
+        listener.endpointAdded(epd, null);
         EasyMock.expectLastCall().once();
     }
 
@@ -141,13 +180,19 @@ public class TopologyManagerExportTest {
     }
 
     private ServiceReference createUserService(IMocksControl c) {
+        return createUserService(c, "*");
+    }
+
+    private ServiceReference createUserService(IMocksControl c, Object exportedInterfaces) {
         final ServiceReference sref = c.createMock(ServiceReference.class);
         EasyMock.expect(sref.getProperty(EasyMock.same(RemoteConstants.SERVICE_EXPORTED_INTERFACES)))
-            .andReturn("*").anyTimes();
+            .andReturn(exportedInterfaces).anyTimes();
         Bundle srefBundle = c.createMock(Bundle.class);
-        EasyMock.expect(sref.getBundle()).andReturn(srefBundle).atLeastOnce();
+        if(!"".equals(exportedInterfaces)) {
+            EasyMock.expect(sref.getBundle()).andReturn(srefBundle).atLeastOnce();
+            EasyMock.expect(srefBundle.getSymbolicName()).andReturn("serviceBundleName").atLeastOnce();
+        }
         EasyMock.expect(sref.getProperty("objectClass")).andReturn("org.My").anyTimes();
-        EasyMock.expect(srefBundle.getSymbolicName()).andReturn("serviceBundleName").atLeastOnce();
         return sref;
     }
 }


[3/3] aries-rsa git commit: [ARIES-1714] fastbin long running (future) calls sometimes fail

Posted by ju...@apache.org.
[ARIES-1714] fastbin long running (future) calls sometimes fail

set the pool evict size to 0 for long running async calls

Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/1009ba59
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/1009ba59
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/1009ba59

Branch: refs/heads/master
Commit: 1009ba59f7a77a6309f95de993c7b85c6e6129a9
Parents: 33b1e4d
Author: Johannes Utzig <ju...@apache.org>
Authored: Tue Feb 21 16:28:57 2017 +0100
Committer: Johannes Utzig <ju...@apache.org>
Committed: Tue Apr 11 13:05:07 2017 +0200

----------------------------------------------------------------------
 .../aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java    | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1009ba59/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java
index ba0a12b..191a896 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java
@@ -323,7 +323,13 @@ public class ClientInvokerImpl implements ClientInvoker, Dispatched {
     protected class InvokerTransportPool extends TransportPool {
 
         public InvokerTransportPool(String uri, DispatchQueue queue) {
-            super(uri, queue, TransportPool.DEFAULT_POOL_SIZE, timeout << 1);
+            /*
+             * the evict time needs to be 0. Otherwise the client will
+             * evict transport objects which breaks the connection for
+             * long running async calls.
+             * Since there is limit of 2 transports per uri it shouldn't be that many objects
+             */
+            super(uri, queue, TransportPool.DEFAULT_POOL_SIZE, 0);
         }
 
         @Override