You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by ju...@apache.org on 2017/04/11 11:09:08 UTC
[1/3] aries-rsa git commit: fastbin should throw a service exception
for unknown methods
Repository: aries-rsa
Updated Branches:
refs/heads/master aaafdf780 -> 1009ba59f
fastbin should throw a service exception for unknown methods
when the client tries to call a method that the implementation does not
have (e.g. incompatible class files) a ServiceException should be thrown
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/d779ff10
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/d779ff10
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/d779ff10
Branch: refs/heads/master
Commit: d779ff10e12ec39e47996f294c6c658f2a4cd391
Parents: aaafdf7
Author: Johannes Utzig <ju...@apache.org>
Authored: Tue Feb 21 10:02:37 2017 +0100
Committer: Johannes Utzig <ju...@apache.org>
Committed: Tue Apr 11 13:04:49 2017 +0200
----------------------------------------------------------------------
.../provider/fastbin/tcp/ServerInvokerImpl.java | 143 +++++++++----------
.../provider/fastbin/TransportFailureTest.java | 2 +-
2 files changed, 72 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d779ff10/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
index 1dd58f9..d971140 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
@@ -246,80 +246,22 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched {
final ServiceFactoryHolder holder = holders.get(service);
Runnable task = null;
if(holder==null) {
- LOGGER.warn("The requested service {"+service+"} is not available");
- task = new Runnable() {
- public void run() {
-
- final DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
- try {
- baos.writeInt(0); // make space for the size field.
- baos.writeVarLong(correlation);
- } catch (IOException e) { // should not happen
- LOGGER.error("Failed to write to buffer",e);
- throw new RuntimeException(e);
- }
-
- // Lets decode the remaining args on the target's executor
- // to take cpu load off the
- BlockingInvocationStrategy strategy = new BlockingInvocationStrategy();
- strategy.service(ObjectSerializationStrategy.INSTANCE, getClass().getClassLoader(), null, new ServiceException("The requested service {"+service+"} is not available"), bais, baos, new Runnable() {
-
- public void run() {
- final Buffer command = baos.toBuffer();
-
- // Update the size field.
- BufferEditor editor = command.buffer().bigEndianEditor();
- editor.writeInt(command.length);
-
- queue().execute(new Runnable() {
- public void run() {
- transport.offer(command);
- }
- });
- }
- });
- }
- };
+ String message = "The requested service {"+service+"} is not available";
+ LOGGER.warn(message);
+ task = new SendTask(bais, correlation, transport, message);
}
final Object svc = holder==null ? null : holder.factory.get();
- if(holder!=null)
- {
- final MethodData methodData = holder.getMethodData(encoded_method);
-
-
- task = new Runnable() {
- public void run() {
-
- final DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
- try {
- baos.writeInt(0); // make space for the size field.
- baos.writeVarLong(correlation);
- } catch (IOException e) { // should not happen
- LOGGER.error("Failed to write to buffer",e);
- throw new RuntimeException(e);
- }
-
- // Lets decode the remaining args on the target's executor
- // to take cpu load off the
- methodData.invocationStrategy.service(methodData.serializationStrategy, holder.loader, methodData.method, svc, bais, baos, new Runnable() {
- public void run() {
- holder.factory.unget();
- final Buffer command = baos.toBuffer();
-
- // Update the size field.
- BufferEditor editor = command.buffer().bigEndianEditor();
- editor.writeInt(command.length);
-
- queue().execute(new Runnable() {
- public void run() {
- transport.offer(command);
- }
- });
- }
- });
- }
- };
-
+ if(holder!=null) {
+ try {
+ final MethodData methodData = holder.getMethodData(encoded_method);
+ task = new SendTask(svc, bais, holder, correlation, methodData, transport);
+ }
+ catch (ReflectiveOperationException reflectionEx) {
+ final String methodName = encoded_method.utf8().toString();
+ String message = "The requested method {"+methodName+"} is not available";
+ LOGGER.warn(message);
+ task = new SendTask(bais, correlation, transport, message);
+ }
}
Executor executor;
@@ -378,4 +320,61 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched {
}
}
+ private final class SendTask implements Runnable {
+ private Object svc;
+ private DataByteArrayInputStream bais;
+ private ServiceFactoryHolder holder;
+ private long correlation;
+ private MethodData methodData;
+ private Transport transport;
+
+
+ private SendTask(Object svc, DataByteArrayInputStream bais, ServiceFactoryHolder holder, long correlation, MethodData methodData, Transport transport) {
+ this.svc = svc;
+ this.bais = bais;
+ this.holder = holder;
+ this.correlation = correlation;
+ this.methodData = methodData;
+ this.transport = transport;
+ }
+
+ private SendTask(DataByteArrayInputStream bais, long correlation, Transport transport, String errorMessage) {
+ this(new ServiceException(errorMessage), bais, null, correlation, new MethodData(new BlockingInvocationStrategy(), ObjectSerializationStrategy.INSTANCE, null),transport);
+ }
+
+ public void run() {
+
+ final DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+ try {
+ baos.writeInt(0); // make space for the size field.
+ baos.writeVarLong(correlation);
+ } catch (IOException e) { // should not happen
+ LOGGER.error("Failed to write to buffer",e);
+ throw new RuntimeException(e);
+ }
+
+ // Lets decode the remaining args on the target's executor
+ // to take cpu load off the
+
+ ClassLoader loader = holder==null ? getClass().getClassLoader() : holder.loader;
+ methodData.invocationStrategy.service(methodData.serializationStrategy, loader, methodData.method, svc, bais, baos, new Runnable() {
+ public void run() {
+ if(holder!=null)
+ holder.factory.unget();
+ final Buffer command = baos.toBuffer();
+
+ // Update the size field.
+ BufferEditor editor = command.buffer().bigEndianEditor();
+ editor.writeInt(command.length);
+
+ queue().execute(new Runnable() {
+ public void run() {
+ transport.offer(command);
+ }
+ });
+ }
+ });
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d779ff10/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java
index 3682d45..4b175fd 100644
--- a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java
+++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java
@@ -105,7 +105,7 @@ public class TransportFailureTest {
Thread.sleep(SLEEP_TIME);
// Big introspection call to access the transport channel and close it, simulating
// a disconnect on the client side.
- ((SocketChannel) get(get(get(get(get(callback, "val$helper"), "onComplete"), "this$1"), "val$transport"), "channel")).close();
+ ((SocketChannel) get(get(get(get(get(callback, "val$helper"), "onComplete"), "this$1"), "transport"), "channel")).close();
} catch (Throwable e) {
e.printStackTrace();
}
[2/3] aries-rsa git commit: [ARIES-1713] TopologyManagerExport fails
with multiple interfaces
Posted by ju...@apache.org.
[ARIES-1713] TopologyManagerExport fails with multiple interfaces
if the exported interfaces are not a simple string but a String+ there
was a ClassCastException in shouldExport
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/33b1e4d2
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/33b1e4d2
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/33b1e4d2
Branch: refs/heads/master
Commit: 33b1e4d282f1af5135768d013e780009d00d405c
Parents: d779ff1
Author: Johannes Utzig <ju...@apache.org>
Authored: Tue Feb 21 10:03:45 2017 +0100
Committer: Johannes Utzig <ju...@apache.org>
Committed: Tue Apr 11 13:05:00 2017 +0200
----------------------------------------------------------------------
.../exporter/TopologyManagerExport.java | 16 ++---
.../exporter/TopologyManagerExportTest.java | 61 +++++++++++++++++---
2 files changed, 62 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/33b1e4d2/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java
index 6200a29..0be28a4 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.concurrent.Executor;
import org.apache.aries.rsa.spi.ExportPolicy;
+import org.apache.aries.rsa.util.StringPlus;
import org.osgi.framework.Bundle;
import org.osgi.framework.ServiceEvent;
import org.osgi.framework.ServiceListener;
@@ -86,7 +87,7 @@ public class TopologyManagerExport implements ServiceListener {
export(serviceRef);
}
};
-
+
public void remove(RemoteServiceAdmin rsa) {
rsaSet.remove(rsa);
endpointRepo.removeRemoteServiceAdmin(rsa);
@@ -122,17 +123,18 @@ public class TopologyManagerExport implements ServiceListener {
// already handled by this remoteServiceAdmin
LOG.debug("already handled by this remoteServiceAdmin -> skipping");
} else {
-
+
exportServiceUsingRemoteServiceAdmin(sref, remoteServiceAdmin, addProps);
}
}
}
private boolean shouldExport(ServiceReference<?> sref, Map<String, ?> addProps) {
- String exported = (String)sref.getProperty(RemoteConstants.SERVICE_EXPORTED_INTERFACES);
- String addExported = (String)addProps.get(RemoteConstants.SERVICE_EXPORTED_INTERFACES);
- String effectiveExported = addExported != null ? addExported : exported;
- return (effectiveExported != null) && !effectiveExported.isEmpty();
+ List<String> exported= StringPlus.normalize(sref.getProperty(RemoteConstants.SERVICE_EXPORTED_INTERFACES));
+ List<String> addExported = StringPlus.normalize(addProps.get(RemoteConstants.SERVICE_EXPORTED_INTERFACES));
+ int length = exported == null ? 0 : exported.size();
+ length += addExported == null ? 0 : addExported.size();
+ return length>0;
}
private Object getSymbolicName(Bundle bundle) {
@@ -140,7 +142,7 @@ public class TopologyManagerExport implements ServiceListener {
}
private void exportServiceUsingRemoteServiceAdmin(final ServiceReference<?> sref,
- final RemoteServiceAdmin remoteServiceAdmin,
+ final RemoteServiceAdmin remoteServiceAdmin,
Map<String, ?> addProps) {
// abort if the service was unregistered by the time we got here
// (we check again at the end, but this optimization saves unnecessary heavy processing)
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/33b1e4d2/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExportTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExportTest.java b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExportTest.java
index 4c9d28f..2307566 100644
--- a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExportTest.java
+++ b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExportTest.java
@@ -20,8 +20,10 @@ package org.apache.aries.rsa.topologymanager.exporter;
import static org.easymock.EasyMock.expectLastCall;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -57,7 +59,7 @@ public class TopologyManagerExportTest {
final ServiceReference sref = createUserService(c);
EndpointDescription epd = createEndpoint();
expectServiceExported(c, rsa, notifier, sref, epd);
-
+
c.replay();
EndpointRepository endpointRepo = new EndpointRepository();
endpointRepo.setNotifier(notifier);
@@ -67,19 +69,19 @@ public class TopologyManagerExportTest {
exportManager.add(rsa);
exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref));
c.verify();
-
+
c.reset();
notifier.endpointRemoved(epd, null);
expectLastCall().once();
c.replay();
exportManager.serviceChanged(new ServiceEvent(ServiceEvent.UNREGISTERING, sref));
c.verify();
-
+
c.reset();
c.replay();
exportManager.serviceChanged(new ServiceEvent(ServiceEvent.MODIFIED, sref));
c.verify();
-
+
c.reset();
c.replay();
exportManager.remove(rsa);
@@ -104,13 +106,50 @@ public class TopologyManagerExportTest {
c.verify();
}
+ @Test
+ public void testExportExistingMultipleInterfaces() throws Exception {
+ IMocksControl c = EasyMock.createControl();
+ RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class);
+ final EndpointListenerNotifier mockEpListenerNotifier = c.createMock(EndpointListenerNotifier.class);
+ List<String> exportedInterfaces = Arrays.asList("a.b.C","foo.Bar");
+ final ServiceReference sref = createUserService(c, exportedInterfaces);
+ expectServiceExported(c, rsa, mockEpListenerNotifier, sref, createEndpoint());
+ c.replay();
+
+ EndpointRepository endpointRepo = new EndpointRepository();
+ endpointRepo.setNotifier(mockEpListenerNotifier);
+ ExportPolicy policy = new DefaultExportPolicy();
+ TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, syncExecutor(), policy);
+ exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref));
+ exportManager.add(rsa);
+ c.verify();
+ }
+
+ @Test
+ public void testExportExistingNoExportedInterfaces() throws Exception {
+ IMocksControl c = EasyMock.createControl();
+ RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class);
+ final EndpointListenerNotifier mockEpListenerNotifier = c.createMock(EndpointListenerNotifier.class);
+ String exportedInterfaces = "";
+ final ServiceReference sref = createUserService(c, exportedInterfaces);
+ c.replay();
+
+ EndpointRepository endpointRepo = new EndpointRepository();
+ endpointRepo.setNotifier(mockEpListenerNotifier);
+ ExportPolicy policy = new DefaultExportPolicy();
+ TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, syncExecutor(), policy);
+ exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref));
+ exportManager.add(rsa);
+ c.verify();
+ }
+
private void expectServiceExported(IMocksControl c, RemoteServiceAdmin rsa,
final EndpointListener listener,
final ServiceReference sref, EndpointDescription epd) {
ExportRegistration exportRegistration = createExportRegistration(c, epd);
EasyMock.expect(rsa.exportService(EasyMock.same(sref), (Map<String, Object>)EasyMock.anyObject()))
.andReturn(Collections.singletonList(exportRegistration)).once();
- listener.endpointAdded(epd, null);
+ listener.endpointAdded(epd, null);
EasyMock.expectLastCall().once();
}
@@ -141,13 +180,19 @@ public class TopologyManagerExportTest {
}
private ServiceReference createUserService(IMocksControl c) {
+ return createUserService(c, "*");
+ }
+
+ private ServiceReference createUserService(IMocksControl c, Object exportedInterfaces) {
final ServiceReference sref = c.createMock(ServiceReference.class);
EasyMock.expect(sref.getProperty(EasyMock.same(RemoteConstants.SERVICE_EXPORTED_INTERFACES)))
- .andReturn("*").anyTimes();
+ .andReturn(exportedInterfaces).anyTimes();
Bundle srefBundle = c.createMock(Bundle.class);
- EasyMock.expect(sref.getBundle()).andReturn(srefBundle).atLeastOnce();
+ if(!"".equals(exportedInterfaces)) {
+ EasyMock.expect(sref.getBundle()).andReturn(srefBundle).atLeastOnce();
+ EasyMock.expect(srefBundle.getSymbolicName()).andReturn("serviceBundleName").atLeastOnce();
+ }
EasyMock.expect(sref.getProperty("objectClass")).andReturn("org.My").anyTimes();
- EasyMock.expect(srefBundle.getSymbolicName()).andReturn("serviceBundleName").atLeastOnce();
return sref;
}
}
[3/3] aries-rsa git commit: [ARIES-1714] fastbin long running
(future) calls sometimes fail
Posted by ju...@apache.org.
[ARIES-1714] fastbin long running (future) calls sometimes fail
set the pool evict size to 0 for long running async calls
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/1009ba59
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/1009ba59
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/1009ba59
Branch: refs/heads/master
Commit: 1009ba59f7a77a6309f95de993c7b85c6e6129a9
Parents: 33b1e4d
Author: Johannes Utzig <ju...@apache.org>
Authored: Tue Feb 21 16:28:57 2017 +0100
Committer: Johannes Utzig <ju...@apache.org>
Committed: Tue Apr 11 13:05:07 2017 +0200
----------------------------------------------------------------------
.../aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1009ba59/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java
index ba0a12b..191a896 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java
@@ -323,7 +323,13 @@ public class ClientInvokerImpl implements ClientInvoker, Dispatched {
protected class InvokerTransportPool extends TransportPool {
public InvokerTransportPool(String uri, DispatchQueue queue) {
- super(uri, queue, TransportPool.DEFAULT_POOL_SIZE, timeout << 1);
+ /*
+ * the evict time needs to be 0. Otherwise the client will
+ * evict transport objects which breaks the connection for
+ * long running async calls.
+ * Since there is limit of 2 transports per uri it shouldn't be that many objects
+ */
+ super(uri, queue, TransportPool.DEFAULT_POOL_SIZE, 0);
}
@Override