You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2016/09/14 11:10:03 UTC

aries-rsa git commit: [ARIES-1587] Support streams in fastbin

Repository: aries-rsa
Updated Branches:
  refs/heads/master 4c8ae19f4 -> 475f01328


[ARIES-1587] Support streams in fastbin

injects serializable proxy streams that read/write remotely

Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/475f0132
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/475f0132
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/475f0132

Branch: refs/heads/master
Commit: 475f01328983e3e1b11deb41876ad21eddff333c
Parents: 4c8ae19
Author: Johannes Utzig <j....@seeburger.de>
Authored: Wed Jul 27 10:42:21 2016 +0200
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Tue Sep 13 16:40:49 2016 +0200

----------------------------------------------------------------------
 provider/fastbin/Readme.md                      |   6 +
 .../aries/rsa/provider/fastbin/Activator.java   |  22 +-
 .../rsa/provider/fastbin/FastBinProvider.java   |   8 +
 .../rsa/provider/fastbin/io/ServerInvoker.java  |   3 +
 .../rsa/provider/fastbin/streams/Chunk.java     |  67 ++++++
 .../fastbin/streams/InputStreamProxy.java       | 158 +++++++++++++
 .../fastbin/streams/OutputStreamProxy.java      | 152 ++++++++++++
 .../fastbin/streams/StreamProvider.java         |  75 ++++++
 .../fastbin/streams/StreamProviderImpl.java     | 113 +++++++++
 .../fastbin/tcp/AbstractInvocationStrategy.java |  36 +++
 .../fastbin/tcp/BlockingInvocationStrategy.java |   3 +
 .../provider/fastbin/tcp/ServerInvokerImpl.java |  26 +++
 .../provider/fastbin/StreamInvocationTest.java  | 232 +++++++++++++++++++
 .../fastbin/streams/InputStreamProxyTest.java   | 146 ++++++++++++
 .../fastbin/streams/OutputStreamProxyTest.java  |  81 +++++++
 15 files changed, 1127 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/Readme.md
----------------------------------------------------------------------
diff --git a/provider/fastbin/Readme.md b/provider/fastbin/Readme.md
index 2997a1b..bc24240 100644
--- a/provider/fastbin/Readme.md
+++ b/provider/fastbin/Readme.md
@@ -11,6 +11,12 @@ Sync remote calls have a default timeout of 5 minutes. For long running operatio
 as the return value of the remote method. The client will receive a proxy of that type that will be resolved async as soon as the server finished computation.
 
 
+## Streaming Data
+
+When large amount of data (e.g. files) need to be transfered remotely it is not advisable to use large byte arrays as this will allocate a lot of memory. Instead the fastbin transport allows to
+use `InputStream` and `OutputStream` as parameter or return value. When a remote method contains such a parameter, the stream is replaced with a proxy implementation that pipes data remotely from/to the original stream.
+
+
 ## Endpoint Configuration
 
 service.exported.configs: aries.fastbin

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java
index b89de14..e93e873 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java
@@ -22,6 +22,8 @@ import java.util.Dictionary;
 import java.util.Hashtable;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.aries.rsa.provider.fastbin.io.ClientInvoker;
+import org.apache.aries.rsa.provider.fastbin.io.ServerInvoker;
 import org.apache.aries.rsa.provider.fastbin.util.UuidGenerator;
 import org.apache.aries.rsa.spi.DistributionProvider;
 import org.osgi.service.cm.ManagedService;
@@ -29,7 +31,10 @@ import org.osgi.service.remoteserviceadmin.RemoteConstants;
 
 public class Activator extends BaseActivator implements ManagedService {
 
-    private FastBinProvider provider;
+    static Activator INSTANCE;
+    FastBinProvider provider;
+    ClientInvoker client;
+    ServerInvoker server;
 
     @Override
     protected void doOpen() throws Exception {
@@ -38,6 +43,7 @@ public class Activator extends BaseActivator implements ManagedService {
 
     @Override
     protected void doStart() throws Exception {
+        INSTANCE = this;
         String uri = getString("uri", "tcp://0.0.0.0:2543");
         String exportedAddress = getString("exportedAddress", null);
         if (exportedAddress == null) {
@@ -45,6 +51,8 @@ public class Activator extends BaseActivator implements ManagedService {
         }
         long timeout = getLong("timeout", TimeUnit.MINUTES.toMillis(5));
         provider = new FastBinProvider(uri, exportedAddress, timeout);
+        client = provider.getClient();
+        server = provider.getServer();
         Dictionary<String, Object> props = new Hashtable<>();
         props.put(RemoteConstants.REMOTE_INTENTS_SUPPORTED, new String[]{});
         props.put(RemoteConstants.REMOTE_CONFIGS_SUPPORTED, provider.getSupportedTypes());
@@ -63,4 +71,16 @@ public class Activator extends BaseActivator implements ManagedService {
         }
     }
 
+    public ClientInvoker getClient() {
+        return client;
+    }
+
+    public ServerInvoker getServer() {
+        return server;
+    }
+
+    public static Activator getInstance() {
+        return INSTANCE;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java
index 1491d41..4cf3be7 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java
@@ -72,6 +72,14 @@ public class FastBinProvider implements DistributionProvider {
         server.stop();
     }
 
+    public ClientInvoker getClient() {
+        return client;
+    }
+
+    public ServerInvoker getServer() {
+        return server;
+    }
+
     @Override
     public String[] getSupportedTypes() {
         return new String[] {FASTBIN_CONFIG_TYPE};

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java
index dd3d83b..cc88aaa 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java
@@ -18,6 +18,8 @@
  */
 package org.apache.aries.rsa.provider.fastbin.io;
 
+import org.apache.aries.rsa.provider.fastbin.streams.StreamProvider;
+
 public interface ServerInvoker extends Service {
 
     String getConnectAddress();
@@ -26,6 +28,7 @@ public interface ServerInvoker extends Service {
 
     void unregisterService(String id);
 
+    StreamProvider getStreamProvider();
 
     public interface ServiceFactory {
 

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/Chunk.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/Chunk.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/Chunk.java
new file mode 100644
index 0000000..92ea426
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/Chunk.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.streams;
+
+import java.io.Serializable;
+
+/**
+ *
+ * Represents of chunk of data streamed between client and server
+ * <p>
+ * A chunk comes with a sequence number to verify the correct order of packages
+ *
+ */
+public class Chunk implements Serializable {
+
+    /** field <code>serialVersionUID</code> */
+    private static final long serialVersionUID = -2809449169706358272L;
+    private int chunkNumber;
+    private byte[] data;
+    private boolean last;
+
+    public Chunk(byte[] data, int chunkNumber) {
+        this(data,chunkNumber,false);
+    }
+
+    public Chunk(byte[] data, int chunkNumber, boolean last) {
+        this.data = data;
+        this.chunkNumber = chunkNumber;
+        this.last = last;
+    }
+
+
+    public byte[] getData() {
+        return data;
+    }
+
+    public int getChunkNumber() {
+        return chunkNumber;
+    }
+
+    public void setLast(boolean last) {
+        this.last = last;
+    }
+
+    public boolean isLast() {
+        return last;
+    }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxy.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxy.java
new file mode 100644
index 0000000..f9d9e2f
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxy.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.streams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+
+import org.apache.aries.rsa.provider.fastbin.Activator;
+
+public class InputStreamProxy extends InputStream implements Serializable {
+
+    /** field <code>serialVersionUID</code> */
+    private static final long serialVersionUID = 4741860068546150748L;
+    private int streamID;
+    private String address;
+
+    private transient StreamProvider streamProvider;
+    private transient byte[] buffer;
+    private transient int position;
+    private transient int expectedChunkNumber = 0;
+    private transient boolean reachedEnd = false;
+
+    public InputStreamProxy(int streamID, String address) {
+        this.streamID = streamID;
+        this.address = address;
+    }
+
+    @Override
+    public int read() throws IOException {
+        try{
+            return readInternal();
+        }
+        catch (IOException e) {
+            // clean up on the server side
+            closeSilent();
+            throw e;
+        }
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        try{
+            return super.read(b, off, len);
+        }
+        catch (IOException e) {
+            // clean up on the server side
+            closeSilent();
+            throw e;
+        }
+    }
+
+    /**
+     * @see java.io.InputStream#read()
+     */
+    public int readInternal() throws IOException {
+        if(buffer == null || position==buffer.length)
+            fillBuffer();
+
+        if(position==buffer.length) {
+            //still no data.
+            if(reachedEnd)
+                return -1;
+            //try again
+            return read();
+        }
+        return buffer[position++];
+    }
+
+    private void fillBuffer() throws IOException {
+        if(reachedEnd) {
+            return;
+        }
+        position = 0;
+        Chunk chunk = streamProvider.read(streamID);
+        if(expectedChunkNumber!=chunk.getChunkNumber())
+            throw new IOException("Stream corrupted. Received Chunk "+chunk.getChunkNumber()+" but expected "+expectedChunkNumber);
+        expectedChunkNumber++;
+        buffer = chunk.getData();
+        reachedEnd = chunk.isLast();
+    }
+
+    public int readInternal(byte[] b, int off, int len) throws IOException {
+        if(len==0)
+            return 0;
+        int available = available();
+        if(available <= 0) {
+            if(reachedEnd)
+                return -1;
+            fillBuffer();
+            return read(b, off, len);
+        }
+        int processed = 0;
+        int ready = Math.min(available, len);
+        System.arraycopy(buffer, position, b, off, ready);
+        processed += ready;
+        position += ready;
+        // delegate to the next chunk
+        if (processed == len) {
+            return processed;
+        }
+        int alsoRead = Math.max(0, read(b, off + processed, len - processed));
+        return processed + alsoRead;
+    }
+
+    @Override
+    public int available() throws IOException {
+        if(buffer == null)
+            return 0;
+        return buffer.length-position;
+    }
+
+    @Override
+    public void close() throws IOException {
+        streamProvider.close(streamID);
+    }
+
+    private void closeSilent() {
+        try{
+            close();
+        } catch (Exception e) {
+            //NOOP
+        }
+    }
+
+    private void readObject(ObjectInputStream stream)
+            throws IOException, ClassNotFoundException {
+        stream.defaultReadObject();
+        InvocationHandler handler = Activator.getInstance().getClient().getProxy(address, StreamProvider.STREAM_PROVIDER_SERVICE_NAME, getClass().getClassLoader());
+        streamProvider = (StreamProvider)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{StreamProvider.class}, handler);
+    }
+
+    protected void setStreamProvider(StreamProvider streamProvider) {
+        this.streamProvider = streamProvider;
+    }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxy.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxy.java
new file mode 100644
index 0000000..849b47d
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxy.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.streams;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.aries.rsa.provider.fastbin.Activator;
+
+public class OutputStreamProxy extends OutputStream implements Serializable {
+
+    /** field <code>serialVersionUID</code> */
+    private static final long serialVersionUID = -6008791618074159841L;
+    private int streamID;
+    private String address;
+    private transient StreamProvider streamProvider;
+    private transient int position;
+    private transient byte[] buffer;
+    private transient AtomicInteger chunkCounter;
+
+    public OutputStreamProxy(int streamID, String address) {
+        this.streamID = streamID;
+        this.address = address;
+        init();
+    }
+
+
+    private final void init() {
+        buffer = new byte[StreamProviderImpl.CHUNK_SIZE];
+        chunkCounter = new AtomicInteger(-1);
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        flush();
+        streamProvider.close(streamID);
+    }
+
+    private void closeSilent() {
+        try{
+            close();
+        } catch (Exception e) {
+            //NOOP
+        }
+    }
+
+    private void readObject(ObjectInputStream stream)
+            throws IOException, ClassNotFoundException {
+        stream.defaultReadObject();
+        InvocationHandler handler = Activator.getInstance().getClient().getProxy(address, StreamProvider.STREAM_PROVIDER_SERVICE_NAME, getClass().getClassLoader());
+        streamProvider = (StreamProvider)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{StreamProvider.class}, handler);
+        init();
+    }
+
+    protected void setStreamProvider(StreamProvider streamProvider) {
+        this.streamProvider = streamProvider;
+    }
+
+
+    @Override
+    public void write(int b) throws IOException {
+        try{
+            writeInternal(b);
+        } catch(IOException e) {
+            closeSilent();
+            throw e;
+        }
+
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        try{
+            writeInternal(b, off, len);
+        } catch(IOException e) {
+            closeSilent();
+            throw e;
+        }
+    }
+
+    public void writeInternal(int b) throws IOException {
+        if(position == buffer.length)
+            flush();
+        buffer[position++] = (byte)b;
+
+    }
+
+    public void writeInternal(byte[] b, int off, int len) throws IOException {
+        if(len <= 0)
+            return;
+        int processed = 0;
+        while(processed < len) {
+            int available = buffer.length - position;
+            int chunkLength = Math.min(len-processed, available);
+            System.arraycopy(b, off, buffer, position, chunkLength);
+            position += chunkLength;
+            processed += chunkLength;
+            if(processed < len) {
+                //there is more to go, but now the buffer is full -> flush it
+                flush();
+            }
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        try{
+            flushInternal();
+        } catch(IOException e) {
+            closeSilent();
+            throw e;
+        }
+    }
+
+    public void flushInternal() throws IOException {
+        if(position==0)
+            return;
+        byte[] toSend = buffer;
+        if(position < buffer.length) {
+            toSend = new byte[position];
+            System.arraycopy(buffer, 0, toSend, 0, position);
+        }
+        Chunk chunk = new Chunk(toSend, chunkCounter.incrementAndGet());
+        streamProvider.write(streamID, chunk);
+        position = 0;
+    }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProvider.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProvider.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProvider.java
new file mode 100644
index 0000000..4aed70c
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProvider.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.streams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.aries.rsa.provider.fastbin.io.ServerInvoker;
+
+/**
+ * StreamProvider is a well-known service that gets auto registered in the {@link ServerInvoker}
+ * to enable Input/OutputStreams in remote calls
+ */
+public interface StreamProvider {
+
+    public static final String STREAM_PROVIDER_SERVICE_NAME = "stream-provider";
+
+    /**
+     * closes the specified stream and makes it inaccessible from remote
+     * @param streamID
+     * @throws IOException
+     */
+    void close(int streamID) throws IOException;
+
+    /**
+     * reads the next chunk from the specified stream
+     * @param streamID
+     * @return the next chunk of data
+     * @throws IOException
+     */
+    Chunk read(int streamID) throws IOException;
+
+    /**
+     * writes the next chunk of data to the specified output stream
+     * @param streamID
+     * @param chunk
+     * @throws IOException
+     */
+    void write(int streamID, Chunk chunk) throws IOException;
+
+    /**
+     * registers a new (local) input stream that will be made available for remote calls.
+     * @param in
+     * @return the stream id
+     */
+    int registerStream(InputStream in);
+
+    /**
+     * registers a new (local) output stream that will be made available for remote calls.
+     * @param out
+     * @return the stream id
+     */
+    int registerStream(OutputStream out);
+
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProviderImpl.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProviderImpl.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProviderImpl.java
new file mode 100644
index 0000000..c23370f
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProviderImpl.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.streams;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class StreamProviderImpl implements StreamProvider {
+
+    private ConcurrentHashMap<Integer, Closeable> streams = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<Integer, AtomicInteger> chunks = new ConcurrentHashMap<>();
+    private AtomicInteger counter = new AtomicInteger(0);
+    protected static final int CHUNK_SIZE = 4096*16; //64k
+    private static final byte[] EMPTY = new byte[0];
+
+    ThreadLocal<byte[]> buffer = new ThreadLocal<byte[]>(){
+        @Override
+        protected byte[] initialValue() {
+            return new byte[CHUNK_SIZE];
+        }
+    };
+
+    public int registerStream(InputStream in) {
+        int streamID = counter.incrementAndGet();
+        streams.put(streamID, in);
+        chunks.put(streamID, new AtomicInteger(-1));
+        return streamID;
+    }
+
+
+    @Override
+    public int registerStream(OutputStream out) {
+        int streamID = counter.incrementAndGet();
+        streams.put(streamID, out);
+        chunks.put(streamID, new AtomicInteger(-1));
+        return streamID;
+    }
+
+    @Override
+    public void close(int streamID) throws IOException {
+        Closeable stream = streams.remove(streamID);
+        chunks.remove(streamID);
+        if(stream != null) {
+            stream.close();
+        }
+    }
+
+    @Override
+    public Chunk read(int streamID) throws IOException {
+        InputStream inputStream = getStream(streamID);
+        AtomicInteger chunkNumber = chunks.get(streamID);
+        byte[] result = buffer.get();
+        int read = inputStream.read(result);
+        if(read<0) {
+            close(streamID); //we are finished, best clean it up right away
+            return new Chunk(EMPTY, chunkNumber.incrementAndGet(), true);
+        }
+        if(read!=result.length) {
+            byte[] tmp = new byte[read];
+            System.arraycopy(result, 0, tmp, 0, read);
+            result = tmp;
+        }
+        return new Chunk(result, chunkNumber.incrementAndGet());
+    }
+
+    @Override
+    public void write(int streamID, Chunk chunk) throws IOException {
+        OutputStream out = getStream(streamID);
+        int nextChunkNumber = chunks.get(streamID).incrementAndGet();
+        if(chunk.getChunkNumber() != nextChunkNumber) {
+            throw new IOException("Stream corrupted. Received Chunk "+chunk.getChunkNumber()+" but expected "+nextChunkNumber);
+        }
+        out.write(chunk.getData());
+    }
+
+    @SuppressWarnings({"unchecked"})
+    private <T extends Closeable> T getStream(int id) throws IOException {
+        Closeable closeable = streams.get(id);
+        if(closeable == null)
+            throw new IOException("No Stream with id " + id + "available");
+        try {
+            T result = (T)closeable;
+            return result;
+        }
+        catch (ClassCastException e) {
+            throw new IOException("No Stream with id " + id + "available");
+        }
+    }
+
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java
index 75f06a9..2e9937a 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java
@@ -18,10 +18,15 @@
  */
 package org.apache.aries.rsa.provider.fastbin.tcp;
 
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.lang.reflect.Method;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.aries.rsa.provider.fastbin.Activator;
 import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy;
+import org.apache.aries.rsa.provider.fastbin.streams.InputStreamProxy;
+import org.apache.aries.rsa.provider.fastbin.streams.OutputStreamProxy;
 import org.fusesource.hawtbuf.DataByteArrayInputStream;
 import org.fusesource.hawtbuf.DataByteArrayOutputStream;
 import org.osgi.framework.ServiceException;
@@ -35,10 +40,41 @@ public abstract class AbstractInvocationStrategy implements InvocationStrategy
 
     @Override
     public ResponseFuture request(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream requestStream) throws Exception {
+        replaceStreamParameters(method, args);
         encodeRequest(serializationStrategy, loader, method, args, requestStream);
         return createResponse(serializationStrategy, loader,method, args);
     }
 
+    protected void replaceStreamParameters(Method method, Object[] args) {
+        Class< ? >[] types = method.getParameterTypes();
+        if(args==null)
+            return;
+        for (int i = 0; i < args.length; i++) {
+            if(isStream(types[i])) {
+                args[i] = replaceStream(args[i]);
+            }
+        }
+    }
+
+    protected Object replaceStream(Object value) {
+        if (value instanceof InputStream) {
+            InputStream in = (InputStream)value;
+            int streamID = Activator.getInstance().getServer().getStreamProvider().registerStream(in);
+            value = new InputStreamProxy(streamID, Activator.getInstance().getServer().getConnectAddress());
+        }
+        else if (value instanceof OutputStream) {
+            OutputStream out = (OutputStream)value;
+            int streamID = Activator.getInstance().getServer().getStreamProvider().registerStream(out);
+            value = new OutputStreamProxy(streamID, Activator.getInstance().getServer().getConnectAddress());
+        }
+        return value;
+    }
+
+    protected boolean isStream(Class<?> clazz) {
+        return clazz==InputStream.class || clazz==OutputStream.class;
+    }
+
+
     /**
      * encodes the request to the stream
      * @param serializationStrategy

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java
index 5190157..753b447 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java
@@ -100,6 +100,9 @@ public class BlockingInvocationStrategy extends AbstractInvocationStrategy {
                 final Object[] args = new Object[types.length];
                 serializationStrategy.decodeRequest(loader, types, requestStream, args);
                 value = method.invoke(target, args);
+                if(isStream(method.getReturnType())) {
+                    value = replaceStream(value);
+                }
             } catch (Throwable t) {
                 if (t instanceof InvocationTargetException) {
                     error = t.getCause();

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
index c365a56..d56d64c 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
@@ -38,6 +38,8 @@ import org.apache.aries.rsa.provider.fastbin.io.Transport;
 import org.apache.aries.rsa.provider.fastbin.io.TransportAcceptListener;
 import org.apache.aries.rsa.provider.fastbin.io.TransportListener;
 import org.apache.aries.rsa.provider.fastbin.io.TransportServer;
+import org.apache.aries.rsa.provider.fastbin.streams.StreamProvider;
+import org.apache.aries.rsa.provider.fastbin.streams.StreamProviderImpl;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.BufferEditor;
 import org.fusesource.hawtbuf.DataByteArrayInputStream;
@@ -67,6 +69,7 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched {
     private final Map<String, SerializationStrategy> serializationStrategies;
     protected final TransportServer server;
     protected final Map<UTF8Buffer, ServiceFactoryHolder> holders = new HashMap<UTF8Buffer, ServiceFactoryHolder>();
+    private StreamProvider streamProvider;
 
     static class MethodData {
 
@@ -165,6 +168,11 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched {
         return this.server.getConnectAddress();
     }
 
+    @Override
+    public StreamProvider getStreamProvider() {
+        return streamProvider;
+    }
+
     public void registerService(final String id, final ServiceFactory service, final ClassLoader classLoader) {
         queue().execute(new Runnable() {
             public void run() {
@@ -186,9 +194,27 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched {
     }
 
     public void start(Runnable onComplete) throws Exception {
+        registerStreamProvider();
         this.server.start(onComplete);
     }
 
+    private void registerStreamProvider() {
+        streamProvider = new StreamProviderImpl();
+        registerService(StreamProvider.STREAM_PROVIDER_SERVICE_NAME, new ServerInvoker.ServiceFactory() {
+
+            @Override
+            public Object get() {
+                return streamProvider;
+            }
+
+            @Override
+            public void unget(){
+                // nothing to do
+            }
+        }, getClass().getClassLoader());
+
+    }
+
     public void stop() {
         stop(null);
     }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/StreamInvocationTest.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/StreamInvocationTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/StreamInvocationTest.java
new file mode 100644
index 0000000..241eda0
--- /dev/null
+++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/StreamInvocationTest.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin;
+
+
+import static org.junit.Assert.*;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.aries.rsa.provider.fastbin.InvocationTest.HelloImpl;
+import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy;
+import org.apache.aries.rsa.provider.fastbin.io.ServerInvoker;
+import org.apache.aries.rsa.provider.fastbin.streams.StreamProvider;
+import org.apache.aries.rsa.provider.fastbin.tcp.ClientInvokerImpl;
+import org.apache.aries.rsa.provider.fastbin.tcp.ServerInvokerImpl;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class StreamInvocationTest {
+
+    private ServerInvokerImpl server;
+    private ClientInvokerImpl client;
+    private TestService testService;
+
+
+    @Before
+    public void setup() throws Exception
+    {
+        DispatchQueue queue = Dispatch.createQueue();
+        HashMap<String, SerializationStrategy> map = new HashMap<String, SerializationStrategy>();
+        server = new ServerInvokerImpl("tcp://localhost:0", queue, map);
+        server.start();
+
+        client = new ClientInvokerImpl(queue, map);
+        client.start();
+//        server.stop();
+        server.registerService("service-id", new ServerInvoker.ServiceFactory()
+        {
+            public Object get()
+            {
+                return new TestServiceImpl();
+            }
+
+
+            public void unget()
+            {}
+        }, TestServiceImpl.class.getClassLoader());
+
+        InvocationHandler handler = client.getProxy(server.getConnectAddress(), "service-id", TestServiceImpl.class.getClassLoader());
+        testService = (TestService)Proxy.newProxyInstance(HelloImpl.class.getClassLoader(), new Class[]{TestService.class}, handler);
+        Activator.INSTANCE = new Activator();
+        Activator.INSTANCE.client = client;
+        Activator.INSTANCE.server = server;
+    }
+
+
+    @After
+    public void tearDown()
+    {
+        server.stop();
+        client.stop();
+    }
+
+
+    @Test
+    public void testToString() throws IOException {
+        assertEquals("Test",testService.toString(new ByteArrayInputStream("Test".getBytes())));
+
+    }
+
+    @Test(timeout=5000)
+    public void testToStringLarge() throws IOException {
+        InputStream in = fillStream('a', 1000000);
+        long time = System.currentTimeMillis();
+        String result = testService.toString(in); //roughly 1 MB of data
+        System.out.println("Transfered 1MB of data in "+(System.currentTimeMillis()-time)+"ms");
+        assertEquals(1000000, result.length());
+        for(int i=0;i<result.length();i++) {
+            assertEquals('a',result.charAt(i));
+        }
+
+    }
+
+
+    @Test
+    public void testToStream() throws IOException {
+        assertEquals("Test",new BufferedReader(new InputStreamReader(testService.toStream("Test"))).readLine());
+
+    }
+
+    @Test(timeout=5000)
+    public void testToStreamLarge() throws IOException {
+        String string = fillBuffer('a', 1000000);
+        long time = System.currentTimeMillis();
+        InputStream stream = testService.toStream(string); //roughly 1 MB of data
+        BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
+        String result = reader.readLine();
+        System.out.println("Transfered 1MB of data in "+(System.currentTimeMillis()-time)+"ms");
+        assertEquals(1000000, result.length());
+        for(int i=0;i<result.length();i++) {
+            assertEquals('a',result.charAt(i));
+        }
+
+    }
+
+    @Test
+    public void testIntoStream() throws IOException, InterruptedException {
+        ByteArrayOutputStream result = new ByteArrayOutputStream();
+        testService.intoStream(result, "Test");
+        Thread.sleep(100);
+        assertEquals("Test",new String(result.toByteArray()));
+
+    }
+
+    @Test
+    public void testFutureAndStream() throws IOException, InterruptedException, ExecutionException, NoSuchAlgorithmException {
+        String testString = "This is a test";
+        MessageDigest digester = MessageDigest.getInstance("MD5");
+        byte[] digest = digester.digest(testString.getBytes());
+        Future<byte[]> future = testService.digest(new ByteArrayInputStream(testString.getBytes()));
+        assertArrayEquals(digest,future.get());
+
+    }
+
+    public interface TestService {
+        String toString(InputStream in) throws IOException;
+
+        InputStream toStream(String s) throws IOException;
+
+        void intoStream(OutputStream out, String string) throws IOException;
+
+        Future<byte[]> digest(InputStream in) throws IOException;
+    }
+
+    public class TestServiceImpl implements TestService {
+        @Override
+        public String toString(InputStream in) throws IOException {
+            StringBuilder b = new StringBuilder();
+            try (BufferedReader r = new BufferedReader(new InputStreamReader(in))) {
+                b.append(r.readLine());
+            }
+            return b.toString();
+        }
+
+        @Override
+        public InputStream toStream(String s) throws IOException {
+            return new ByteArrayInputStream(s.getBytes());
+        }
+
+        @Override
+        public void intoStream(final OutputStream out, String string) throws IOException {
+            new Thread(() -> {
+                try{
+                    out.write(string.getBytes());
+                    out.close();
+                } catch(Exception e) {
+                    e.printStackTrace();
+                }
+            }).start();
+        }
+
+        @Override
+        public Future<byte[]> digest(InputStream in) throws IOException {
+            return CompletableFuture.supplyAsync(() -> {
+                try {
+                    MessageDigest digest = MessageDigest.getInstance("MD5");
+                    ByteArrayOutputStream out = new ByteArrayOutputStream();
+                    int i;
+                    while((i = in.read()) != -1) {
+                        out.write(i);
+                    }
+                    byte[] md5 = digest.digest(out.toByteArray());
+                    return md5;
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
+                }
+                return null;
+            });
+        }
+    }
+
+    protected InputStream fillStream(char c, int repetitions) {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        for (int i=0; i<repetitions; i++){
+            out.write(c);
+        }
+        return new ByteArrayInputStream(out.toByteArray());
+    }
+
+    protected String fillBuffer(char c, int repetitions) {
+        StringBuilder b = new StringBuilder(repetitions);
+        for (int i = 0; i < repetitions; i++) {
+            b.append(c);
+        }
+        return b.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxyTest.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxyTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxyTest.java
new file mode 100644
index 0000000..0abcdee
--- /dev/null
+++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxyTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.streams;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class InputStreamProxyTest {
+
+    private StreamProvider streamProvider;
+
+    @Before
+    public void setUp() throws Exception {
+        streamProvider = new StreamProviderImpl();
+    }
+
+    @Test
+    public void testReadFully() throws IOException {
+        int charSize = 10;
+        OwnInputStream in = fillStream('c',charSize);
+        int id = streamProvider.registerStream(in);
+        @SuppressWarnings("resource")
+        InputStreamProxy fixture = new InputStreamProxy(id, "");
+        fixture.setStreamProvider(streamProvider);
+        for (int i = 0; i < charSize; i++) {
+            assertEquals('c',fixture.read());
+        }
+        assertEquals(-1, fixture.read());
+    }
+
+    @Test
+    public void testReadFullyExceedsChunkSize() throws IOException {
+        int charSize = StreamProviderImpl.CHUNK_SIZE+10;
+        OwnInputStream in = fillStream('c',charSize);
+        int id = streamProvider.registerStream(in);
+        @SuppressWarnings("resource")
+        InputStreamProxy fixture = new InputStreamProxy(id, "");
+        fixture.setStreamProvider(streamProvider);
+        for (int i = 0; i < charSize; i++) {
+            assertEquals('c',fixture.read());
+        }
+        assertEquals(-1, fixture.read());
+    }
+
+    @Test
+    public void testReadFullyExceedsChunkSize2() throws IOException {
+        int charSize = StreamProviderImpl.CHUNK_SIZE*2;
+        OwnInputStream in = fillStream('c',charSize);
+        int id = streamProvider.registerStream(in);
+        @SuppressWarnings("resource")
+        InputStreamProxy fixture = new InputStreamProxy(id, "");
+        fixture.setStreamProvider(streamProvider);
+        for (int i = 0; i < charSize; i++) {
+            assertEquals('c',fixture.read());
+        }
+        assertEquals(-1, fixture.read());
+    }
+
+    @Test
+    public void testReadArray() throws IOException {
+        OwnInputStream in = fillStream('c',1000000);
+        int id = streamProvider.registerStream(in);
+        @SuppressWarnings("resource")
+        InputStreamProxy fixture = new InputStreamProxy(id, "");
+        fixture.setStreamProvider(streamProvider);
+        assertEquals('c',fixture.read());
+        assertEquals(StreamProviderImpl.CHUNK_SIZE-1, fixture.available());
+        assertEquals('c',fixture.read());
+        assertEquals('c',fixture.read());
+        assertEquals('c',fixture.read());
+        assertEquals('c',fixture.read());
+        byte[] target = new byte[5];
+        fixture.read(target);
+        assertEquals("ccccc",new String(target));
+
+        target = new byte[1000000-10];
+        assertEquals(target.length,fixture.read(target));
+        assertEquals(1000000-10,new String(target).length());
+        assertEquals(-1, fixture.read(target));
+    }
+
+    @Test
+    public void testClose() throws IOException {
+        OwnInputStream in = fillStream('c',10);
+        int id = streamProvider.registerStream(in);
+        InputStreamProxy fixture = new InputStreamProxy(id, "");
+        fixture.setStreamProvider(streamProvider);
+        assertEquals('c',fixture.read());
+        fixture.close();
+        assertTrue(in.isClosed);
+        try{
+            streamProvider.read(id);
+            fail("must have been closed already");
+        } catch(IOException e) {};
+    }
+
+    private OwnInputStream fillStream(char c, int repetitions) {
+        ByteArrayOutputStream out = new ByteArrayOutputStream(repetitions);
+        for (int i = 0; i < repetitions; i++) {
+            out.write(c);
+        }
+        return new OwnInputStream(new ByteArrayInputStream(out.toByteArray()));
+    }
+
+    private static class OwnInputStream extends FilterInputStream {
+
+        boolean isClosed;
+
+        protected OwnInputStream(InputStream in) {
+            super(in);
+        }
+
+        @Override
+        public void close() throws IOException {
+            super.close();
+            isClosed = true;
+        }
+    }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxyTest.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxyTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxyTest.java
new file mode 100644
index 0000000..57a8fd5
--- /dev/null
+++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxyTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.streams;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class OutputStreamProxyTest {
+
+    private StreamProvider streamProvider;
+
+    @Before
+    public void setUp() throws Exception {
+        streamProvider = new StreamProviderImpl();
+    }
+
+    @Test
+    public void testWriteFully() throws IOException {
+        int charSize = 10;
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        int id = streamProvider.registerStream(out);
+        OutputStreamProxy fixture = new OutputStreamProxy(id, "");
+        fixture.setStreamProvider(streamProvider);
+        for (int i = 0; i < charSize; i++) {
+            fixture.write('x');
+        }
+        assertEquals(0, out.size());
+        fixture.close();
+        assertEquals(10, out.size());
+        assertEquals("xxxxxxxxxx", new String(out.toByteArray()));
+    }
+
+    @Test
+    public void testWriteMixed() throws IOException {
+        int charSize = StreamProviderImpl.CHUNK_SIZE*2;
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        int id = streamProvider.registerStream(out);
+        OutputStreamProxy fixture = new OutputStreamProxy(id, "");
+        fixture.setStreamProvider(streamProvider);
+        for (int i = 0; i < 10; i++) {
+            fixture.write('x');
+        }
+
+        ByteArrayOutputStream temp = new ByteArrayOutputStream();
+        for (int i = 0; i < charSize; i++) {
+            temp.write('x');
+        }
+        fixture.write(temp.toByteArray());
+        fixture.close();
+        assertEquals(10+charSize, out.size());
+        byte[] byteArray = out.toByteArray();
+        for (int i = 0; i < byteArray.length; i++) {
+            assertEquals('x',byteArray[i]);
+        }
+    }
+
+}
+
+
+