You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@olingo.apache.org by mi...@apache.org on 2016/03/04 21:36:45 UTC
[02/39] olingo-odata4 git commit: [OLINGO-832] Added additional
methods for Java NIO Channels
[OLINGO-832] Added additional methods for Java NIO Channels
Project: http://git-wip-us.apache.org/repos/asf/olingo-odata4/repo
Commit: http://git-wip-us.apache.org/repos/asf/olingo-odata4/commit/885ec27e
Tree: http://git-wip-us.apache.org/repos/asf/olingo-odata4/tree/885ec27e
Diff: http://git-wip-us.apache.org/repos/asf/olingo-odata4/diff/885ec27e
Branch: refs/heads/OLINGO-856_ODataHandlerInAPI
Commit: 885ec27ef2411697c9e8b82e12c77e54e4c2710d
Parents: f4ad889
Author: Michael Bolz <mi...@sap.com>
Authored: Fri Jan 8 09:45:09 2016 +0100
Committer: Michael Bolz <mi...@sap.com>
Committed: Fri Jan 22 13:14:06 2016 +0100
----------------------------------------------------------------------
.../apache/olingo/server/api/ODataResponse.java | 13 ++
.../server/api/serializer/SerializerResult.java | 5 +
.../server/core/ODataHttpHandlerImpl.java | 8 +-
.../serializer/ChannelSerializerResult.java | 201 +++++++++++++++++++
.../core/serializer/SerializerResultImpl.java | 13 ++
.../core/serializer/StreamSerializerResult.java | 14 ++
.../json/ODataJsonStreamSerializer.java | 5 +-
.../serializer/utils/CircleStreamBuffer.java | 20 ++
.../processor/TechnicalEntityProcessor.java | 11 +-
9 files changed, 285 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataResponse.java
----------------------------------------------------------------------
diff --git a/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataResponse.java b/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataResponse.java
index 3b63af7..a4dc7e0 100644
--- a/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataResponse.java
+++ b/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataResponse.java
@@ -19,6 +19,7 @@
package org.apache.olingo.server.api;
import java.io.InputStream;
+import java.nio.channels.ReadableByteChannel;
import java.util.List;
import java.util.Map;
@@ -32,6 +33,7 @@ public class ODataResponse {
private int statusCode = HttpStatusCode.INTERNAL_SERVER_ERROR.getStatusCode();
private final HttpHeaders headers = new HttpHeaders();
private InputStream content;
+ private ReadableByteChannel channel;
/**
* Sets the status code.
@@ -132,4 +134,15 @@ public class ODataResponse {
return content;
}
+ public void setChannel(final ReadableByteChannel channel) {
+ this.channel = channel;
+ }
+
+ public ReadableByteChannel getChannel() {
+ return channel;
+ }
+
+ public boolean isChannelAvailable() {
+ return channel != null;
+ }
}
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-api/src/main/java/org/apache/olingo/server/api/serializer/SerializerResult.java
----------------------------------------------------------------------
diff --git a/lib/server-api/src/main/java/org/apache/olingo/server/api/serializer/SerializerResult.java b/lib/server-api/src/main/java/org/apache/olingo/server/api/serializer/SerializerResult.java
index edf3ac8..fe23502 100644
--- a/lib/server-api/src/main/java/org/apache/olingo/server/api/serializer/SerializerResult.java
+++ b/lib/server-api/src/main/java/org/apache/olingo/server/api/serializer/SerializerResult.java
@@ -19,6 +19,7 @@
package org.apache.olingo.server.api.serializer;
import java.io.InputStream;
+import java.nio.channels.ReadableByteChannel;
/**
* Result type for {@link ODataSerializer} methods
@@ -29,4 +30,8 @@ public interface SerializerResult {
* @return serialized content
*/
InputStream getContent();
+
+ ReadableByteChannel getChannel();
+
+ boolean isNioSupported();
}
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-core/src/main/java/org/apache/olingo/server/core/ODataHttpHandlerImpl.java
----------------------------------------------------------------------
diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/ODataHttpHandlerImpl.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/ODataHttpHandlerImpl.java
index 1624943..59d7972 100644
--- a/lib/server-core/src/main/java/org/apache/olingo/server/core/ODataHttpHandlerImpl.java
+++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/ODataHttpHandlerImpl.java
@@ -149,7 +149,7 @@ public class ODataHttpHandlerImpl implements ODataHttpHandler {
}
}
- if (odResponse.getContent() != null) {
+ if (odResponse.getContent() != null || odResponse.isChannelAvailable()) {
copyContent(odResponse, response);
}
}
@@ -160,7 +160,11 @@ public class ODataHttpHandlerImpl implements ODataHttpHandler {
try {
ByteBuffer inBuffer = ByteBuffer.allocate(COPY_BUFFER_SIZE);
output = Channels.newChannel(servletResponse.getOutputStream());
- input = Channels.newChannel(odataResponse.getContent());
+ if(odataResponse.isChannelAvailable()) {
+ input = odataResponse.getChannel();
+ } else {
+ input = Channels.newChannel(odataResponse.getContent());
+ }
while (input.read(inBuffer) > 0) {
inBuffer.flip();
output.write(inBuffer);
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/ChannelSerializerResult.java
----------------------------------------------------------------------
diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/ChannelSerializerResult.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/ChannelSerializerResult.java
new file mode 100644
index 0000000..1d4c32f
--- /dev/null
+++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/ChannelSerializerResult.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.olingo.server.core.serializer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.charset.Charset;
+
+import org.apache.olingo.commons.api.data.Entity;
+import org.apache.olingo.commons.api.data.EntityStreamCollection;
+import org.apache.olingo.commons.api.edm.EdmEntityType;
+import org.apache.olingo.server.api.ServiceMetadata;
+import org.apache.olingo.server.api.serializer.EntitySerializerOptions;
+import org.apache.olingo.server.api.serializer.SerializerException;
+import org.apache.olingo.server.api.serializer.SerializerResult;
+import org.apache.olingo.server.core.serializer.json.ODataJsonStreamSerializer;
+import org.apache.olingo.server.core.serializer.utils.CircleStreamBuffer;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+
+public class ChannelSerializerResult implements SerializerResult {
+ private ReadableByteChannel channel;
+
+ private static class StreamChannel implements ReadableByteChannel {
+ private static final Charset DEFAULT = Charset.forName("UTF-8");
+ private ByteBuffer head;
+ private ByteBuffer tail;
+ private ODataJsonStreamSerializer jsonSerializer;
+ private EntityStreamCollection coll;
+ private ServiceMetadata metadata;
+ private EdmEntityType entityType;
+ private EntitySerializerOptions options;
+
+ public StreamChannel(EntityStreamCollection coll, EdmEntityType entityType, String head,
+ ODataJsonStreamSerializer jsonSerializer, ServiceMetadata metadata,
+ EntitySerializerOptions options, String tail) {
+ this.coll = coll;
+ this.entityType = entityType;
+ this.head = ByteBuffer.wrap(head.getBytes(DEFAULT));
+ this.jsonSerializer = jsonSerializer;
+ this.metadata = metadata;
+ this.options = options;
+ this.tail = ByteBuffer.wrap(tail.getBytes(DEFAULT));
+ }
+
+ @Override
+ public int read(ByteBuffer dest) throws IOException {
+ ByteBuffer buffer = getCurrentBuffer();
+ if (buffer != null && buffer.hasRemaining()) {
+ int r = buffer.remaining();
+ if(r <= dest.remaining()) {
+ dest.put(buffer);
+ } else {
+ byte[] buf = new byte[dest.remaining()];
+ buffer.get(buf);
+ dest.put(buf);
+ }
+ return r;
+ }
+ return -1;
+ }
+
+ ByteBuffer currentBuffer;
+
+ private ByteBuffer getCurrentBuffer() {
+ if(currentBuffer == null) {
+ currentBuffer = head;
+ } if(!currentBuffer.hasRemaining()) {
+ if (coll.hasNext()) {
+ try {
+ // FIXME: mibo_160108: Inefficient buffer handling, replace
+ currentBuffer = serEntity(coll.nextEntity());
+ if(coll.hasNext()) {
+ ByteBuffer b = ByteBuffer.allocate(currentBuffer.position() + 1);
+ currentBuffer.flip();
+ b.put(currentBuffer).put(",".getBytes(DEFAULT));
+ currentBuffer = b;
+ }
+ currentBuffer.flip();
+ } catch (SerializerException e) {
+ return getCurrentBuffer();
+ }
+ } else if(tail.hasRemaining()) {
+ currentBuffer = tail;
+ } else {
+ return null;
+ }
+ }
+ return currentBuffer;
+ }
+
+ private ByteBuffer serEntity(Entity entity) throws SerializerException {
+ try {
+ CircleStreamBuffer buffer = new CircleStreamBuffer();
+ OutputStream outputStream = buffer.getOutputStream();
+ JsonGenerator json = new JsonFactory().createGenerator(outputStream);
+ jsonSerializer.writeEntity(metadata, entityType, entity, null,
+ options == null ? null : options.getExpand(),
+ options == null ? null : options.getSelect(),
+ options != null && options.getWriteOnlyReferences(),
+ json);
+
+ json.close();
+ outputStream.close();
+ return buffer.getBuffer();
+ } catch (final IOException e) {
+ return ByteBuffer.wrap(("ERROR" + e.getMessage()).getBytes());
+ }
+ }
+
+
+ @Override
+ public boolean isOpen() {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ }
+
+ @Override
+ public InputStream getContent() {
+ return Channels.newInputStream(this.channel);
+ }
+
+ @Override
+ public ReadableByteChannel getChannel() {
+ return this.channel;
+ }
+
+ @Override
+ public boolean isNioSupported() {
+ return true;
+ }
+
+ private ChannelSerializerResult(ReadableByteChannel channel) {
+ this.channel = channel;
+ }
+
+ public static SerializerResultBuilder with(EntityStreamCollection coll, EdmEntityType entityType,
+ ODataJsonStreamSerializer jsonSerializer, ServiceMetadata metadata, EntitySerializerOptions options) {
+ return new SerializerResultBuilder(coll, entityType, jsonSerializer, metadata, options);
+ }
+
+ public static class SerializerResultBuilder {
+ private ODataJsonStreamSerializer jsonSerializer;
+ private EntityStreamCollection coll;
+ private ServiceMetadata metadata;
+ private EdmEntityType entityType;
+ private EntitySerializerOptions options;
+ private String head;
+ private String tail;
+
+ public SerializerResultBuilder(EntityStreamCollection coll, EdmEntityType entityType,
+ ODataJsonStreamSerializer jsonSerializer, ServiceMetadata metadata, EntitySerializerOptions options) {
+ this.coll = coll;
+ this.entityType = entityType;
+ this.jsonSerializer = jsonSerializer;
+ this.metadata = metadata;
+ this.options = options;
+ }
+
+ public SerializerResultBuilder addHead(String head) {
+ this.head = head;
+ return this;
+ }
+
+ public SerializerResultBuilder addTail(String tail) {
+ this.tail = tail;
+ return this;
+ }
+
+ public SerializerResult build() {
+ ReadableByteChannel input = new StreamChannel(coll, entityType, head, jsonSerializer, metadata, options, tail);
+ return new ChannelSerializerResult(input);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/SerializerResultImpl.java
----------------------------------------------------------------------
diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/SerializerResultImpl.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/SerializerResultImpl.java
index 53dca19..5a5364a 100644
--- a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/SerializerResultImpl.java
+++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/SerializerResultImpl.java
@@ -19,6 +19,9 @@
package org.apache.olingo.server.core.serializer;
import java.io.InputStream;
+import java.nio.channels.Channel;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
import org.apache.olingo.server.api.serializer.SerializerResult;
@@ -30,6 +33,16 @@ public class SerializerResultImpl implements SerializerResult {
return content;
}
+ @Override
+ public ReadableByteChannel getChannel() {
+ return Channels.newChannel(getContent());
+ }
+
+ @Override
+ public boolean isNioSupported() {
+ return false;
+ }
+
public static SerializerResultBuilder with() {
return new SerializerResultBuilder();
}
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/StreamSerializerResult.java
----------------------------------------------------------------------
diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/StreamSerializerResult.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/StreamSerializerResult.java
index d45c594..e4c8051 100644
--- a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/StreamSerializerResult.java
+++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/StreamSerializerResult.java
@@ -34,6 +34,10 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.channels.Channel;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
public class StreamSerializerResult implements SerializerResult {
private InputStream content;
@@ -121,6 +125,16 @@ public class StreamSerializerResult implements SerializerResult {
return content;
}
+ @Override
+ public ReadableByteChannel getChannel() {
+ return Channels.newChannel(getContent());
+ }
+
+ @Override
+ public boolean isNioSupported() {
+ return true;
+ }
+
private StreamSerializerResult(InputStream content) {
this.content = content;
}
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/json/ODataJsonStreamSerializer.java
----------------------------------------------------------------------
diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/json/ODataJsonStreamSerializer.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/json/ODataJsonStreamSerializer.java
index 08a30c6..110d416 100644
--- a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/json/ODataJsonStreamSerializer.java
+++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/json/ODataJsonStreamSerializer.java
@@ -58,6 +58,7 @@ import org.apache.olingo.server.api.uri.queryoption.ExpandItem;
import org.apache.olingo.server.api.uri.queryoption.ExpandOption;
import org.apache.olingo.server.api.uri.queryoption.SelectOption;
import org.apache.olingo.server.core.serializer.AbstractODataSerializer;
+import org.apache.olingo.server.core.serializer.ChannelSerializerResult;
import org.apache.olingo.server.core.serializer.SerializerResultImpl;
import org.apache.olingo.server.core.serializer.StreamSerializerResult;
import org.apache.olingo.server.core.serializer.utils.CircleStreamBuffer;
@@ -134,7 +135,9 @@ public class ODataJsonStreamSerializer extends ODataJsonSerializer {
opt.expand(options.getExpand()).select(options
.getSelect()).writeOnlyReferences(options.getWriteOnlyReferences());
}
- return StreamSerializerResult.with(coll, entityType, this, metadata, opt.build())
+// return StreamSerializerResult.with(coll, entityType, this, metadata, opt.build())
+// .addHead(head).addTail(tail).build();
+ return ChannelSerializerResult.with(coll, entityType, this, metadata, opt.build())
.addHead(head).addTail(tail).build();
} catch (final IOException e) {
cachedException =
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/utils/CircleStreamBuffer.java
----------------------------------------------------------------------
diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/utils/CircleStreamBuffer.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/utils/CircleStreamBuffer.java
index 20d9ca5..b7ba2f2 100644
--- a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/utils/CircleStreamBuffer.java
+++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/utils/CircleStreamBuffer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -193,6 +194,25 @@ public class CircleStreamBuffer {
return readBuffer.get();
}
+ public ByteBuffer getBuffer() throws IOException {
+ if (readClosed) {
+ throw new IOException("Tried to read from closed stream.");
+ }
+ writeMode = false;
+
+ // FIXME: mibo_160108: This is not efficient and only for test/poc reasons
+ int reqSize = 0;
+ for (ByteBuffer byteBuffer : bufferQueue) {
+ reqSize += byteBuffer.position();
+ }
+ ByteBuffer tmp = ByteBuffer.allocateDirect(reqSize);
+ for (ByteBuffer byteBuffer : bufferQueue) {
+ byteBuffer.flip();
+ tmp.put(byteBuffer);
+ }
+ return tmp;
+ }
+
// #############################################
// #
// # Writing parts
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java
----------------------------------------------------------------------
diff --git a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java
index 6644f1e..f8fa7c8 100644
--- a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java
+++ b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java
@@ -536,8 +536,11 @@ public class TechnicalEntityProcessor extends TechnicalProcessor
serializeEntityStreamCollectionFixed(request,
entitySetSerialization, edmEntitySet, edmEntityType, requestedContentType,
expand, select, countOption, id);
- response.setContent(serializerResult.getContent());
-
+ if(serializerResult.isNioSupported()) {
+ response.setChannel(serializerResult.getChannel());
+ } else {
+ response.setContent(serializerResult.getContent());
+ }
response.setStatusCode(HttpStatusCode.OK.getStatusCode());
response.setHeader(HttpHeader.CONTENT_TYPE, requestedContentType.toContentTypeString());
if (pageSize != null) {
@@ -631,6 +634,9 @@ public class TechnicalEntityProcessor extends TechnicalProcessor
@Override
public Entity nextEntity() {
+ try {
+ TimeUnit.MILLISECONDS.sleep(1000);
+ } catch (InterruptedException e) { }
return test.next();
}
};
@@ -647,6 +653,7 @@ public class TechnicalEntityProcessor extends TechnicalProcessor
.build());
}
+
private SerializerResult serializeEntityCollection(final ODataRequest request, final EntityCollection
entityCollection, final EdmEntitySet edmEntitySet, final EdmEntityType edmEntityType,
final ContentType requestedFormat, final ExpandOption expand, final SelectOption select,