You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2016/09/14 11:10:03 UTC
aries-rsa git commit: [ARIES-1587] Support streams in fastbin
Repository: aries-rsa
Updated Branches:
refs/heads/master 4c8ae19f4 -> 475f01328
[ARIES-1587] Support streams in fastbin
injects serializable proxy streams that read/write remotely
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/475f0132
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/475f0132
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/475f0132
Branch: refs/heads/master
Commit: 475f01328983e3e1b11deb41876ad21eddff333c
Parents: 4c8ae19
Author: Johannes Utzig <j....@seeburger.de>
Authored: Wed Jul 27 10:42:21 2016 +0200
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Tue Sep 13 16:40:49 2016 +0200
----------------------------------------------------------------------
provider/fastbin/Readme.md | 6 +
.../aries/rsa/provider/fastbin/Activator.java | 22 +-
.../rsa/provider/fastbin/FastBinProvider.java | 8 +
.../rsa/provider/fastbin/io/ServerInvoker.java | 3 +
.../rsa/provider/fastbin/streams/Chunk.java | 67 ++++++
.../fastbin/streams/InputStreamProxy.java | 158 +++++++++++++
.../fastbin/streams/OutputStreamProxy.java | 152 ++++++++++++
.../fastbin/streams/StreamProvider.java | 75 ++++++
.../fastbin/streams/StreamProviderImpl.java | 113 +++++++++
.../fastbin/tcp/AbstractInvocationStrategy.java | 36 +++
.../fastbin/tcp/BlockingInvocationStrategy.java | 3 +
.../provider/fastbin/tcp/ServerInvokerImpl.java | 26 +++
.../provider/fastbin/StreamInvocationTest.java | 232 +++++++++++++++++++
.../fastbin/streams/InputStreamProxyTest.java | 146 ++++++++++++
.../fastbin/streams/OutputStreamProxyTest.java | 81 +++++++
15 files changed, 1127 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/Readme.md
----------------------------------------------------------------------
diff --git a/provider/fastbin/Readme.md b/provider/fastbin/Readme.md
index 2997a1b..bc24240 100644
--- a/provider/fastbin/Readme.md
+++ b/provider/fastbin/Readme.md
@@ -11,6 +11,12 @@ Sync remote calls have a default timeout of 5 minutes. For long running operatio
as the return value of the remote method. The client will receive a proxy of that type that will be resolved async as soon as the server finished computation.
+## Streaming Data
+
+When large amount of data (e.g. files) need to be transfered remotely it is not advisable to use large byte arrays as this will allocate a lot of memory. Instead the fastbin transport allows to
+use `InputStream` and `OutputStream` as parameter or return value. When a remote method contains such a parameter, the stream is replaced with a proxy implementation that pipes data remotely from/to the original stream.
+
+
## Endpoint Configuration
service.exported.configs: aries.fastbin
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java
index b89de14..e93e873 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/Activator.java
@@ -22,6 +22,8 @@ import java.util.Dictionary;
import java.util.Hashtable;
import java.util.concurrent.TimeUnit;
+import org.apache.aries.rsa.provider.fastbin.io.ClientInvoker;
+import org.apache.aries.rsa.provider.fastbin.io.ServerInvoker;
import org.apache.aries.rsa.provider.fastbin.util.UuidGenerator;
import org.apache.aries.rsa.spi.DistributionProvider;
import org.osgi.service.cm.ManagedService;
@@ -29,7 +31,10 @@ import org.osgi.service.remoteserviceadmin.RemoteConstants;
public class Activator extends BaseActivator implements ManagedService {
- private FastBinProvider provider;
+ static Activator INSTANCE;
+ FastBinProvider provider;
+ ClientInvoker client;
+ ServerInvoker server;
@Override
protected void doOpen() throws Exception {
@@ -38,6 +43,7 @@ public class Activator extends BaseActivator implements ManagedService {
@Override
protected void doStart() throws Exception {
+ INSTANCE = this;
String uri = getString("uri", "tcp://0.0.0.0:2543");
String exportedAddress = getString("exportedAddress", null);
if (exportedAddress == null) {
@@ -45,6 +51,8 @@ public class Activator extends BaseActivator implements ManagedService {
}
long timeout = getLong("timeout", TimeUnit.MINUTES.toMillis(5));
provider = new FastBinProvider(uri, exportedAddress, timeout);
+ client = provider.getClient();
+ server = provider.getServer();
Dictionary<String, Object> props = new Hashtable<>();
props.put(RemoteConstants.REMOTE_INTENTS_SUPPORTED, new String[]{});
props.put(RemoteConstants.REMOTE_CONFIGS_SUPPORTED, provider.getSupportedTypes());
@@ -63,4 +71,16 @@ public class Activator extends BaseActivator implements ManagedService {
}
}
+ public ClientInvoker getClient() {
+ return client;
+ }
+
+ public ServerInvoker getServer() {
+ return server;
+ }
+
+ public static Activator getInstance() {
+ return INSTANCE;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java
index 1491d41..4cf3be7 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/FastBinProvider.java
@@ -72,6 +72,14 @@ public class FastBinProvider implements DistributionProvider {
server.stop();
}
+ public ClientInvoker getClient() {
+ return client;
+ }
+
+ public ServerInvoker getServer() {
+ return server;
+ }
+
@Override
public String[] getSupportedTypes() {
return new String[] {FASTBIN_CONFIG_TYPE};
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java
index dd3d83b..cc88aaa 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/io/ServerInvoker.java
@@ -18,6 +18,8 @@
*/
package org.apache.aries.rsa.provider.fastbin.io;
+import org.apache.aries.rsa.provider.fastbin.streams.StreamProvider;
+
public interface ServerInvoker extends Service {
String getConnectAddress();
@@ -26,6 +28,7 @@ public interface ServerInvoker extends Service {
void unregisterService(String id);
+ StreamProvider getStreamProvider();
public interface ServiceFactory {
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/Chunk.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/Chunk.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/Chunk.java
new file mode 100644
index 0000000..92ea426
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/Chunk.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.streams;
+
+import java.io.Serializable;
+
+/**
+ *
+ * Represents of chunk of data streamed between client and server
+ * <p>
+ * A chunk comes with a sequence number to verify the correct order of packages
+ *
+ */
+public class Chunk implements Serializable {
+
+ /** field <code>serialVersionUID</code> */
+ private static final long serialVersionUID = -2809449169706358272L;
+ private int chunkNumber;
+ private byte[] data;
+ private boolean last;
+
+ public Chunk(byte[] data, int chunkNumber) {
+ this(data,chunkNumber,false);
+ }
+
+ public Chunk(byte[] data, int chunkNumber, boolean last) {
+ this.data = data;
+ this.chunkNumber = chunkNumber;
+ this.last = last;
+ }
+
+
+ public byte[] getData() {
+ return data;
+ }
+
+ public int getChunkNumber() {
+ return chunkNumber;
+ }
+
+ public void setLast(boolean last) {
+ this.last = last;
+ }
+
+ public boolean isLast() {
+ return last;
+ }
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxy.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxy.java
new file mode 100644
index 0000000..f9d9e2f
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxy.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.streams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+
+import org.apache.aries.rsa.provider.fastbin.Activator;
+
+public class InputStreamProxy extends InputStream implements Serializable {
+
+ /** field <code>serialVersionUID</code> */
+ private static final long serialVersionUID = 4741860068546150748L;
+ private int streamID;
+ private String address;
+
+ private transient StreamProvider streamProvider;
+ private transient byte[] buffer;
+ private transient int position;
+ private transient int expectedChunkNumber = 0;
+ private transient boolean reachedEnd = false;
+
+ public InputStreamProxy(int streamID, String address) {
+ this.streamID = streamID;
+ this.address = address;
+ }
+
+ @Override
+ public int read() throws IOException {
+ try{
+ return readInternal();
+ }
+ catch (IOException e) {
+ // clean up on the server side
+ closeSilent();
+ throw e;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ try{
+ return super.read(b, off, len);
+ }
+ catch (IOException e) {
+ // clean up on the server side
+ closeSilent();
+ throw e;
+ }
+ }
+
+ /**
+ * @see java.io.InputStream#read()
+ */
+ public int readInternal() throws IOException {
+ if(buffer == null || position==buffer.length)
+ fillBuffer();
+
+ if(position==buffer.length) {
+ //still no data.
+ if(reachedEnd)
+ return -1;
+ //try again
+ return read();
+ }
+ return buffer[position++];
+ }
+
+ private void fillBuffer() throws IOException {
+ if(reachedEnd) {
+ return;
+ }
+ position = 0;
+ Chunk chunk = streamProvider.read(streamID);
+ if(expectedChunkNumber!=chunk.getChunkNumber())
+ throw new IOException("Stream corrupted. Received Chunk "+chunk.getChunkNumber()+" but expected "+expectedChunkNumber);
+ expectedChunkNumber++;
+ buffer = chunk.getData();
+ reachedEnd = chunk.isLast();
+ }
+
+ public int readInternal(byte[] b, int off, int len) throws IOException {
+ if(len==0)
+ return 0;
+ int available = available();
+ if(available <= 0) {
+ if(reachedEnd)
+ return -1;
+ fillBuffer();
+ return read(b, off, len);
+ }
+ int processed = 0;
+ int ready = Math.min(available, len);
+ System.arraycopy(buffer, position, b, off, ready);
+ processed += ready;
+ position += ready;
+ // delegate to the next chunk
+ if (processed == len) {
+ return processed;
+ }
+ int alsoRead = Math.max(0, read(b, off + processed, len - processed));
+ return processed + alsoRead;
+ }
+
+ @Override
+ public int available() throws IOException {
+ if(buffer == null)
+ return 0;
+ return buffer.length-position;
+ }
+
+ @Override
+ public void close() throws IOException {
+ streamProvider.close(streamID);
+ }
+
+ private void closeSilent() {
+ try{
+ close();
+ } catch (Exception e) {
+ //NOOP
+ }
+ }
+
+ private void readObject(ObjectInputStream stream)
+ throws IOException, ClassNotFoundException {
+ stream.defaultReadObject();
+ InvocationHandler handler = Activator.getInstance().getClient().getProxy(address, StreamProvider.STREAM_PROVIDER_SERVICE_NAME, getClass().getClassLoader());
+ streamProvider = (StreamProvider)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{StreamProvider.class}, handler);
+ }
+
+ protected void setStreamProvider(StreamProvider streamProvider) {
+ this.streamProvider = streamProvider;
+ }
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxy.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxy.java
new file mode 100644
index 0000000..849b47d
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxy.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.streams;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.aries.rsa.provider.fastbin.Activator;
+
+public class OutputStreamProxy extends OutputStream implements Serializable {
+
+ /** field <code>serialVersionUID</code> */
+ private static final long serialVersionUID = -6008791618074159841L;
+ private int streamID;
+ private String address;
+ private transient StreamProvider streamProvider;
+ private transient int position;
+ private transient byte[] buffer;
+ private transient AtomicInteger chunkCounter;
+
+ public OutputStreamProxy(int streamID, String address) {
+ this.streamID = streamID;
+ this.address = address;
+ init();
+ }
+
+
+ private final void init() {
+ buffer = new byte[StreamProviderImpl.CHUNK_SIZE];
+ chunkCounter = new AtomicInteger(-1);
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ streamProvider.close(streamID);
+ }
+
+ private void closeSilent() {
+ try{
+ close();
+ } catch (Exception e) {
+ //NOOP
+ }
+ }
+
+ private void readObject(ObjectInputStream stream)
+ throws IOException, ClassNotFoundException {
+ stream.defaultReadObject();
+ InvocationHandler handler = Activator.getInstance().getClient().getProxy(address, StreamProvider.STREAM_PROVIDER_SERVICE_NAME, getClass().getClassLoader());
+ streamProvider = (StreamProvider)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{StreamProvider.class}, handler);
+ init();
+ }
+
+ protected void setStreamProvider(StreamProvider streamProvider) {
+ this.streamProvider = streamProvider;
+ }
+
+
+ @Override
+ public void write(int b) throws IOException {
+ try{
+ writeInternal(b);
+ } catch(IOException e) {
+ closeSilent();
+ throw e;
+ }
+
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ try{
+ writeInternal(b, off, len);
+ } catch(IOException e) {
+ closeSilent();
+ throw e;
+ }
+ }
+
+ public void writeInternal(int b) throws IOException {
+ if(position == buffer.length)
+ flush();
+ buffer[position++] = (byte)b;
+
+ }
+
+ public void writeInternal(byte[] b, int off, int len) throws IOException {
+ if(len <= 0)
+ return;
+ int processed = 0;
+ while(processed < len) {
+ int available = buffer.length - position;
+ int chunkLength = Math.min(len-processed, available);
+ System.arraycopy(b, off, buffer, position, chunkLength);
+ position += chunkLength;
+ processed += chunkLength;
+ if(processed < len) {
+ //there is more to go, but now the buffer is full -> flush it
+ flush();
+ }
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ try{
+ flushInternal();
+ } catch(IOException e) {
+ closeSilent();
+ throw e;
+ }
+ }
+
+ public void flushInternal() throws IOException {
+ if(position==0)
+ return;
+ byte[] toSend = buffer;
+ if(position < buffer.length) {
+ toSend = new byte[position];
+ System.arraycopy(buffer, 0, toSend, 0, position);
+ }
+ Chunk chunk = new Chunk(toSend, chunkCounter.incrementAndGet());
+ streamProvider.write(streamID, chunk);
+ position = 0;
+ }
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProvider.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProvider.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProvider.java
new file mode 100644
index 0000000..4aed70c
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProvider.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.streams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.aries.rsa.provider.fastbin.io.ServerInvoker;
+
+/**
+ * StreamProvider is a well-known service that gets auto registered in the {@link ServerInvoker}
+ * to enable Input/OutputStreams in remote calls
+ */
+public interface StreamProvider {
+
+ public static final String STREAM_PROVIDER_SERVICE_NAME = "stream-provider";
+
+ /**
+ * closes the specified stream and makes it inaccessible from remote
+ * @param streamID
+ * @throws IOException
+ */
+ void close(int streamID) throws IOException;
+
+ /**
+ * reads the next chunk from the specified stream
+ * @param streamID
+ * @return the next chunk of data
+ * @throws IOException
+ */
+ Chunk read(int streamID) throws IOException;
+
+ /**
+ * writes the next chunk of data to the specified output stream
+ * @param streamID
+ * @param chunk
+ * @throws IOException
+ */
+ void write(int streamID, Chunk chunk) throws IOException;
+
+ /**
+ * registers a new (local) input stream that will be made available for remote calls.
+ * @param in
+ * @return the stream id
+ */
+ int registerStream(InputStream in);
+
+ /**
+ * registers a new (local) output stream that will be made available for remote calls.
+ * @param out
+ * @return the stream id
+ */
+ int registerStream(OutputStream out);
+
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProviderImpl.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProviderImpl.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProviderImpl.java
new file mode 100644
index 0000000..c23370f
--- /dev/null
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/streams/StreamProviderImpl.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.streams;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class StreamProviderImpl implements StreamProvider {
+
+ private ConcurrentHashMap<Integer, Closeable> streams = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<Integer, AtomicInteger> chunks = new ConcurrentHashMap<>();
+ private AtomicInteger counter = new AtomicInteger(0);
+ protected static final int CHUNK_SIZE = 4096*16; //64k
+ private static final byte[] EMPTY = new byte[0];
+
+ ThreadLocal<byte[]> buffer = new ThreadLocal<byte[]>(){
+ @Override
+ protected byte[] initialValue() {
+ return new byte[CHUNK_SIZE];
+ }
+ };
+
+ public int registerStream(InputStream in) {
+ int streamID = counter.incrementAndGet();
+ streams.put(streamID, in);
+ chunks.put(streamID, new AtomicInteger(-1));
+ return streamID;
+ }
+
+
+ @Override
+ public int registerStream(OutputStream out) {
+ int streamID = counter.incrementAndGet();
+ streams.put(streamID, out);
+ chunks.put(streamID, new AtomicInteger(-1));
+ return streamID;
+ }
+
+ @Override
+ public void close(int streamID) throws IOException {
+ Closeable stream = streams.remove(streamID);
+ chunks.remove(streamID);
+ if(stream != null) {
+ stream.close();
+ }
+ }
+
+ @Override
+ public Chunk read(int streamID) throws IOException {
+ InputStream inputStream = getStream(streamID);
+ AtomicInteger chunkNumber = chunks.get(streamID);
+ byte[] result = buffer.get();
+ int read = inputStream.read(result);
+ if(read<0) {
+ close(streamID); //we are finished, best clean it up right away
+ return new Chunk(EMPTY, chunkNumber.incrementAndGet(), true);
+ }
+ if(read!=result.length) {
+ byte[] tmp = new byte[read];
+ System.arraycopy(result, 0, tmp, 0, read);
+ result = tmp;
+ }
+ return new Chunk(result, chunkNumber.incrementAndGet());
+ }
+
+ @Override
+ public void write(int streamID, Chunk chunk) throws IOException {
+ OutputStream out = getStream(streamID);
+ int nextChunkNumber = chunks.get(streamID).incrementAndGet();
+ if(chunk.getChunkNumber() != nextChunkNumber) {
+ throw new IOException("Stream corrupted. Received Chunk "+chunk.getChunkNumber()+" but expected "+nextChunkNumber);
+ }
+ out.write(chunk.getData());
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private <T extends Closeable> T getStream(int id) throws IOException {
+ Closeable closeable = streams.get(id);
+ if(closeable == null)
+ throw new IOException("No Stream with id " + id + "available");
+ try {
+ T result = (T)closeable;
+ return result;
+ }
+ catch (ClassCastException e) {
+ throw new IOException("No Stream with id " + id + "available");
+ }
+ }
+
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java
index 75f06a9..2e9937a 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java
@@ -18,10 +18,15 @@
*/
package org.apache.aries.rsa.provider.fastbin.tcp;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.aries.rsa.provider.fastbin.Activator;
import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy;
+import org.apache.aries.rsa.provider.fastbin.streams.InputStreamProxy;
+import org.apache.aries.rsa.provider.fastbin.streams.OutputStreamProxy;
import org.fusesource.hawtbuf.DataByteArrayInputStream;
import org.fusesource.hawtbuf.DataByteArrayOutputStream;
import org.osgi.framework.ServiceException;
@@ -35,10 +40,41 @@ public abstract class AbstractInvocationStrategy implements InvocationStrategy
@Override
public ResponseFuture request(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream requestStream) throws Exception {
+ replaceStreamParameters(method, args);
encodeRequest(serializationStrategy, loader, method, args, requestStream);
return createResponse(serializationStrategy, loader,method, args);
}
+ protected void replaceStreamParameters(Method method, Object[] args) {
+ Class< ? >[] types = method.getParameterTypes();
+ if(args==null)
+ return;
+ for (int i = 0; i < args.length; i++) {
+ if(isStream(types[i])) {
+ args[i] = replaceStream(args[i]);
+ }
+ }
+ }
+
+ protected Object replaceStream(Object value) {
+ if (value instanceof InputStream) {
+ InputStream in = (InputStream)value;
+ int streamID = Activator.getInstance().getServer().getStreamProvider().registerStream(in);
+ value = new InputStreamProxy(streamID, Activator.getInstance().getServer().getConnectAddress());
+ }
+ else if (value instanceof OutputStream) {
+ OutputStream out = (OutputStream)value;
+ int streamID = Activator.getInstance().getServer().getStreamProvider().registerStream(out);
+ value = new OutputStreamProxy(streamID, Activator.getInstance().getServer().getConnectAddress());
+ }
+ return value;
+ }
+
+ protected boolean isStream(Class<?> clazz) {
+ return clazz==InputStream.class || clazz==OutputStream.class;
+ }
+
+
/**
* encodes the request to the stream
* @param serializationStrategy
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java
index 5190157..753b447 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java
@@ -100,6 +100,9 @@ public class BlockingInvocationStrategy extends AbstractInvocationStrategy {
final Object[] args = new Object[types.length];
serializationStrategy.decodeRequest(loader, types, requestStream, args);
value = method.invoke(target, args);
+ if(isStream(method.getReturnType())) {
+ value = replaceStream(value);
+ }
} catch (Throwable t) {
if (t instanceof InvocationTargetException) {
error = t.getCause();
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
index c365a56..d56d64c 100644
--- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
+++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
@@ -38,6 +38,8 @@ import org.apache.aries.rsa.provider.fastbin.io.Transport;
import org.apache.aries.rsa.provider.fastbin.io.TransportAcceptListener;
import org.apache.aries.rsa.provider.fastbin.io.TransportListener;
import org.apache.aries.rsa.provider.fastbin.io.TransportServer;
+import org.apache.aries.rsa.provider.fastbin.streams.StreamProvider;
+import org.apache.aries.rsa.provider.fastbin.streams.StreamProviderImpl;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.BufferEditor;
import org.fusesource.hawtbuf.DataByteArrayInputStream;
@@ -67,6 +69,7 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched {
private final Map<String, SerializationStrategy> serializationStrategies;
protected final TransportServer server;
protected final Map<UTF8Buffer, ServiceFactoryHolder> holders = new HashMap<UTF8Buffer, ServiceFactoryHolder>();
+ private StreamProvider streamProvider;
static class MethodData {
@@ -165,6 +168,11 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched {
return this.server.getConnectAddress();
}
+ @Override
+ public StreamProvider getStreamProvider() {
+ return streamProvider;
+ }
+
public void registerService(final String id, final ServiceFactory service, final ClassLoader classLoader) {
queue().execute(new Runnable() {
public void run() {
@@ -186,9 +194,27 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched {
}
public void start(Runnable onComplete) throws Exception {
+ registerStreamProvider();
this.server.start(onComplete);
}
+ private void registerStreamProvider() {
+ streamProvider = new StreamProviderImpl();
+ registerService(StreamProvider.STREAM_PROVIDER_SERVICE_NAME, new ServerInvoker.ServiceFactory() {
+
+ @Override
+ public Object get() {
+ return streamProvider;
+ }
+
+ @Override
+ public void unget(){
+ // nothing to do
+ }
+ }, getClass().getClassLoader());
+
+ }
+
public void stop() {
stop(null);
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/StreamInvocationTest.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/StreamInvocationTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/StreamInvocationTest.java
new file mode 100644
index 0000000..241eda0
--- /dev/null
+++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/StreamInvocationTest.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin;
+
+
+import static org.junit.Assert.*;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.aries.rsa.provider.fastbin.InvocationTest.HelloImpl;
+import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy;
+import org.apache.aries.rsa.provider.fastbin.io.ServerInvoker;
+import org.apache.aries.rsa.provider.fastbin.streams.StreamProvider;
+import org.apache.aries.rsa.provider.fastbin.tcp.ClientInvokerImpl;
+import org.apache.aries.rsa.provider.fastbin.tcp.ServerInvokerImpl;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class StreamInvocationTest {
+
+ private ServerInvokerImpl server;
+ private ClientInvokerImpl client;
+ private TestService testService;
+
+
+ @Before
+ public void setup() throws Exception
+ {
+ DispatchQueue queue = Dispatch.createQueue();
+ HashMap<String, SerializationStrategy> map = new HashMap<String, SerializationStrategy>();
+ server = new ServerInvokerImpl("tcp://localhost:0", queue, map);
+ server.start();
+
+ client = new ClientInvokerImpl(queue, map);
+ client.start();
+// server.stop();
+ server.registerService("service-id", new ServerInvoker.ServiceFactory()
+ {
+ public Object get()
+ {
+ return new TestServiceImpl();
+ }
+
+
+ public void unget()
+ {}
+ }, TestServiceImpl.class.getClassLoader());
+
+ InvocationHandler handler = client.getProxy(server.getConnectAddress(), "service-id", TestServiceImpl.class.getClassLoader());
+ testService = (TestService)Proxy.newProxyInstance(HelloImpl.class.getClassLoader(), new Class[]{TestService.class}, handler);
+ Activator.INSTANCE = new Activator();
+ Activator.INSTANCE.client = client;
+ Activator.INSTANCE.server = server;
+ }
+
+
+ @After
+ public void tearDown()
+ {
+ server.stop();
+ client.stop();
+ }
+
+
+ @Test
+ public void testToString() throws IOException {
+ assertEquals("Test",testService.toString(new ByteArrayInputStream("Test".getBytes())));
+
+ }
+
+ @Test(timeout=5000)
+ public void testToStringLarge() throws IOException {
+ InputStream in = fillStream('a', 1000000);
+ long time = System.currentTimeMillis();
+ String result = testService.toString(in); //roughly 1 MB of data
+ System.out.println("Transfered 1MB of data in "+(System.currentTimeMillis()-time)+"ms");
+ assertEquals(1000000, result.length());
+ for(int i=0;i<result.length();i++) {
+ assertEquals('a',result.charAt(i));
+ }
+
+ }
+
+
+ @Test
+ public void testToStream() throws IOException {
+ assertEquals("Test",new BufferedReader(new InputStreamReader(testService.toStream("Test"))).readLine());
+
+ }
+
+ @Test(timeout=5000)
+ public void testToStreamLarge() throws IOException {
+ String string = fillBuffer('a', 1000000);
+ long time = System.currentTimeMillis();
+ InputStream stream = testService.toStream(string); //roughly 1 MB of data
+ BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
+ String result = reader.readLine();
+ System.out.println("Transfered 1MB of data in "+(System.currentTimeMillis()-time)+"ms");
+ assertEquals(1000000, result.length());
+ for(int i=0;i<result.length();i++) {
+ assertEquals('a',result.charAt(i));
+ }
+
+ }
+
+ @Test
+ public void testIntoStream() throws IOException, InterruptedException {
+ ByteArrayOutputStream result = new ByteArrayOutputStream();
+ testService.intoStream(result, "Test");
+ Thread.sleep(100);
+ assertEquals("Test",new String(result.toByteArray()));
+
+ }
+
+ @Test
+ public void testFutureAndStream() throws IOException, InterruptedException, ExecutionException, NoSuchAlgorithmException {
+ String testString = "This is a test";
+ MessageDigest digester = MessageDigest.getInstance("MD5");
+ byte[] digest = digester.digest(testString.getBytes());
+ Future<byte[]> future = testService.digest(new ByteArrayInputStream(testString.getBytes()));
+ assertArrayEquals(digest,future.get());
+
+ }
+
+ public interface TestService {
+ String toString(InputStream in) throws IOException;
+
+ InputStream toStream(String s) throws IOException;
+
+ void intoStream(OutputStream out, String string) throws IOException;
+
+ Future<byte[]> digest(InputStream in) throws IOException;
+ }
+
+ public class TestServiceImpl implements TestService {
+ @Override
+ public String toString(InputStream in) throws IOException {
+ StringBuilder b = new StringBuilder();
+ try (BufferedReader r = new BufferedReader(new InputStreamReader(in))) {
+ b.append(r.readLine());
+ }
+ return b.toString();
+ }
+
+ @Override
+ public InputStream toStream(String s) throws IOException {
+ return new ByteArrayInputStream(s.getBytes());
+ }
+
+ @Override
+ public void intoStream(final OutputStream out, String string) throws IOException {
+ new Thread(() -> {
+ try{
+ out.write(string.getBytes());
+ out.close();
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }).start();
+ }
+
+ @Override
+ public Future<byte[]> digest(InputStream in) throws IOException {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ MessageDigest digest = MessageDigest.getInstance("MD5");
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ int i;
+ while((i = in.read()) != -1) {
+ out.write(i);
+ }
+ byte[] md5 = digest.digest(out.toByteArray());
+ return md5;
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ });
+ }
+ }
+
+ protected InputStream fillStream(char c, int repetitions) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ for (int i=0; i<repetitions; i++){
+ out.write(c);
+ }
+ return new ByteArrayInputStream(out.toByteArray());
+ }
+
+ protected String fillBuffer(char c, int repetitions) {
+ StringBuilder b = new StringBuilder(repetitions);
+ for (int i = 0; i < repetitions; i++) {
+ b.append(c);
+ }
+ return b.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxyTest.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxyTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxyTest.java
new file mode 100644
index 0000000..0abcdee
--- /dev/null
+++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/InputStreamProxyTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.streams;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class InputStreamProxyTest {
+
+ private StreamProvider streamProvider;
+
+ @Before
+ public void setUp() throws Exception {
+ streamProvider = new StreamProviderImpl();
+ }
+
+ @Test
+ public void testReadFully() throws IOException {
+ int charSize = 10;
+ OwnInputStream in = fillStream('c',charSize);
+ int id = streamProvider.registerStream(in);
+ @SuppressWarnings("resource")
+ InputStreamProxy fixture = new InputStreamProxy(id, "");
+ fixture.setStreamProvider(streamProvider);
+ for (int i = 0; i < charSize; i++) {
+ assertEquals('c',fixture.read());
+ }
+ assertEquals(-1, fixture.read());
+ }
+
+ @Test
+ public void testReadFullyExceedsChunkSize() throws IOException {
+ int charSize = StreamProviderImpl.CHUNK_SIZE+10;
+ OwnInputStream in = fillStream('c',charSize);
+ int id = streamProvider.registerStream(in);
+ @SuppressWarnings("resource")
+ InputStreamProxy fixture = new InputStreamProxy(id, "");
+ fixture.setStreamProvider(streamProvider);
+ for (int i = 0; i < charSize; i++) {
+ assertEquals('c',fixture.read());
+ }
+ assertEquals(-1, fixture.read());
+ }
+
+ @Test
+ public void testReadFullyExceedsChunkSize2() throws IOException {
+ int charSize = StreamProviderImpl.CHUNK_SIZE*2;
+ OwnInputStream in = fillStream('c',charSize);
+ int id = streamProvider.registerStream(in);
+ @SuppressWarnings("resource")
+ InputStreamProxy fixture = new InputStreamProxy(id, "");
+ fixture.setStreamProvider(streamProvider);
+ for (int i = 0; i < charSize; i++) {
+ assertEquals('c',fixture.read());
+ }
+ assertEquals(-1, fixture.read());
+ }
+
+ @Test
+ public void testReadArray() throws IOException {
+ OwnInputStream in = fillStream('c',1000000);
+ int id = streamProvider.registerStream(in);
+ @SuppressWarnings("resource")
+ InputStreamProxy fixture = new InputStreamProxy(id, "");
+ fixture.setStreamProvider(streamProvider);
+ assertEquals('c',fixture.read());
+ assertEquals(StreamProviderImpl.CHUNK_SIZE-1, fixture.available());
+ assertEquals('c',fixture.read());
+ assertEquals('c',fixture.read());
+ assertEquals('c',fixture.read());
+ assertEquals('c',fixture.read());
+ byte[] target = new byte[5];
+ fixture.read(target);
+ assertEquals("ccccc",new String(target));
+
+ target = new byte[1000000-10];
+ assertEquals(target.length,fixture.read(target));
+ assertEquals(1000000-10,new String(target).length());
+ assertEquals(-1, fixture.read(target));
+ }
+
+ @Test
+ public void testClose() throws IOException {
+ OwnInputStream in = fillStream('c',10);
+ int id = streamProvider.registerStream(in);
+ InputStreamProxy fixture = new InputStreamProxy(id, "");
+ fixture.setStreamProvider(streamProvider);
+ assertEquals('c',fixture.read());
+ fixture.close();
+ assertTrue(in.isClosed);
+ try{
+ streamProvider.read(id);
+ fail("must have been closed already");
+ } catch(IOException e) {};
+ }
+
+ private OwnInputStream fillStream(char c, int repetitions) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream(repetitions);
+ for (int i = 0; i < repetitions; i++) {
+ out.write(c);
+ }
+ return new OwnInputStream(new ByteArrayInputStream(out.toByteArray()));
+ }
+
+ private static class OwnInputStream extends FilterInputStream {
+
+ boolean isClosed;
+
+ protected OwnInputStream(InputStream in) {
+ super(in);
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ isClosed = true;
+ }
+ }
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/475f0132/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxyTest.java
----------------------------------------------------------------------
diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxyTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxyTest.java
new file mode 100644
index 0000000..57a8fd5
--- /dev/null
+++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/streams/OutputStreamProxyTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.streams;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class OutputStreamProxyTest {
+
+ private StreamProvider streamProvider;
+
+ @Before
+ public void setUp() throws Exception {
+ streamProvider = new StreamProviderImpl();
+ }
+
+ @Test
+ public void testWriteFully() throws IOException {
+ int charSize = 10;
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ int id = streamProvider.registerStream(out);
+ OutputStreamProxy fixture = new OutputStreamProxy(id, "");
+ fixture.setStreamProvider(streamProvider);
+ for (int i = 0; i < charSize; i++) {
+ fixture.write('x');
+ }
+ assertEquals(0, out.size());
+ fixture.close();
+ assertEquals(10, out.size());
+ assertEquals("xxxxxxxxxx", new String(out.toByteArray()));
+ }
+
+ @Test
+ public void testWriteMixed() throws IOException {
+ int charSize = StreamProviderImpl.CHUNK_SIZE*2;
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ int id = streamProvider.registerStream(out);
+ OutputStreamProxy fixture = new OutputStreamProxy(id, "");
+ fixture.setStreamProvider(streamProvider);
+ for (int i = 0; i < 10; i++) {
+ fixture.write('x');
+ }
+
+ ByteArrayOutputStream temp = new ByteArrayOutputStream();
+ for (int i = 0; i < charSize; i++) {
+ temp.write('x');
+ }
+ fixture.write(temp.toByteArray());
+ fixture.close();
+ assertEquals(10+charSize, out.size());
+ byte[] byteArray = out.toByteArray();
+ for (int i = 0; i < byteArray.length; i++) {
+ assertEquals('x',byteArray[i]);
+ }
+ }
+
+}
+
+
+