You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@olingo.apache.org by mi...@apache.org on 2016/03/04 21:36:45 UTC

[02/39] olingo-odata4 git commit: [OLINGO-832] Added additional methods for Java NIO Channels

[OLINGO-832] Added additional methods for Java NIO Channels


Project: http://git-wip-us.apache.org/repos/asf/olingo-odata4/repo
Commit: http://git-wip-us.apache.org/repos/asf/olingo-odata4/commit/885ec27e
Tree: http://git-wip-us.apache.org/repos/asf/olingo-odata4/tree/885ec27e
Diff: http://git-wip-us.apache.org/repos/asf/olingo-odata4/diff/885ec27e

Branch: refs/heads/OLINGO-856_ODataHandlerInAPI
Commit: 885ec27ef2411697c9e8b82e12c77e54e4c2710d
Parents: f4ad889
Author: Michael Bolz <mi...@sap.com>
Authored: Fri Jan 8 09:45:09 2016 +0100
Committer: Michael Bolz <mi...@sap.com>
Committed: Fri Jan 22 13:14:06 2016 +0100

----------------------------------------------------------------------
 .../apache/olingo/server/api/ODataResponse.java |  13 ++
 .../server/api/serializer/SerializerResult.java |   5 +
 .../server/core/ODataHttpHandlerImpl.java       |   8 +-
 .../serializer/ChannelSerializerResult.java     | 201 +++++++++++++++++++
 .../core/serializer/SerializerResultImpl.java   |  13 ++
 .../core/serializer/StreamSerializerResult.java |  14 ++
 .../json/ODataJsonStreamSerializer.java         |   5 +-
 .../serializer/utils/CircleStreamBuffer.java    |  20 ++
 .../processor/TechnicalEntityProcessor.java     |  11 +-
 9 files changed, 285 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataResponse.java
----------------------------------------------------------------------
diff --git a/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataResponse.java b/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataResponse.java
index 3b63af7..a4dc7e0 100644
--- a/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataResponse.java
+++ b/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataResponse.java
@@ -19,6 +19,7 @@
 package org.apache.olingo.server.api;
 
 import java.io.InputStream;
+import java.nio.channels.ReadableByteChannel;
 import java.util.List;
 import java.util.Map;
 
@@ -32,6 +33,7 @@ public class ODataResponse {
   private int statusCode = HttpStatusCode.INTERNAL_SERVER_ERROR.getStatusCode();
   private final HttpHeaders headers = new HttpHeaders();
   private InputStream content;
+  private ReadableByteChannel channel;
 
   /**
    * Sets the status code.
@@ -132,4 +134,15 @@ public class ODataResponse {
     return content;
   }
 
+  public void setChannel(final ReadableByteChannel channel) {
+    this.channel = channel;
+  }
+
+  public ReadableByteChannel getChannel() {
+    return channel;
+  }
+
+  public boolean isChannelAvailable() {
+    return channel != null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-api/src/main/java/org/apache/olingo/server/api/serializer/SerializerResult.java
----------------------------------------------------------------------
diff --git a/lib/server-api/src/main/java/org/apache/olingo/server/api/serializer/SerializerResult.java b/lib/server-api/src/main/java/org/apache/olingo/server/api/serializer/SerializerResult.java
index edf3ac8..fe23502 100644
--- a/lib/server-api/src/main/java/org/apache/olingo/server/api/serializer/SerializerResult.java
+++ b/lib/server-api/src/main/java/org/apache/olingo/server/api/serializer/SerializerResult.java
@@ -19,6 +19,7 @@
 package org.apache.olingo.server.api.serializer;
 
 import java.io.InputStream;
+import java.nio.channels.ReadableByteChannel;
 
 /**
  * Result type for {@link ODataSerializer} methods
@@ -29,4 +30,8 @@ public interface SerializerResult {
    * @return serialized content
    */
   InputStream getContent();
+
+  ReadableByteChannel getChannel();
+
+  boolean isNioSupported();
 }

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-core/src/main/java/org/apache/olingo/server/core/ODataHttpHandlerImpl.java
----------------------------------------------------------------------
diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/ODataHttpHandlerImpl.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/ODataHttpHandlerImpl.java
index 1624943..59d7972 100644
--- a/lib/server-core/src/main/java/org/apache/olingo/server/core/ODataHttpHandlerImpl.java
+++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/ODataHttpHandlerImpl.java
@@ -149,7 +149,7 @@ public class ODataHttpHandlerImpl implements ODataHttpHandler {
       }
     }
 
-    if (odResponse.getContent() != null) {
+    if (odResponse.getContent() != null || odResponse.isChannelAvailable()) {
       copyContent(odResponse, response);
     }
   }
@@ -160,7 +160,11 @@ public class ODataHttpHandlerImpl implements ODataHttpHandler {
     try {
       ByteBuffer inBuffer = ByteBuffer.allocate(COPY_BUFFER_SIZE);
       output = Channels.newChannel(servletResponse.getOutputStream());
-      input = Channels.newChannel(odataResponse.getContent());
+      if(odataResponse.isChannelAvailable()) {
+        input = odataResponse.getChannel();
+      } else {
+        input = Channels.newChannel(odataResponse.getContent());
+      }
       while (input.read(inBuffer) > 0) {
         inBuffer.flip();
         output.write(inBuffer);

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/ChannelSerializerResult.java
----------------------------------------------------------------------
diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/ChannelSerializerResult.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/ChannelSerializerResult.java
new file mode 100644
index 0000000..1d4c32f
--- /dev/null
+++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/ChannelSerializerResult.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.olingo.server.core.serializer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.charset.Charset;
+
+import org.apache.olingo.commons.api.data.Entity;
+import org.apache.olingo.commons.api.data.EntityStreamCollection;
+import org.apache.olingo.commons.api.edm.EdmEntityType;
+import org.apache.olingo.server.api.ServiceMetadata;
+import org.apache.olingo.server.api.serializer.EntitySerializerOptions;
+import org.apache.olingo.server.api.serializer.SerializerException;
+import org.apache.olingo.server.api.serializer.SerializerResult;
+import org.apache.olingo.server.core.serializer.json.ODataJsonStreamSerializer;
+import org.apache.olingo.server.core.serializer.utils.CircleStreamBuffer;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+
+public class ChannelSerializerResult implements SerializerResult {
+  private ReadableByteChannel channel;
+
+  private static class StreamChannel implements ReadableByteChannel {
+    private static final Charset DEFAULT = Charset.forName("UTF-8");
+    private ByteBuffer head;
+    private ByteBuffer tail;
+    private ODataJsonStreamSerializer jsonSerializer;
+    private EntityStreamCollection coll;
+    private ServiceMetadata metadata;
+    private EdmEntityType entityType;
+    private EntitySerializerOptions options;
+
+    public StreamChannel(EntityStreamCollection coll, EdmEntityType entityType, String head,
+        ODataJsonStreamSerializer jsonSerializer, ServiceMetadata metadata,
+        EntitySerializerOptions options, String tail) {
+      this.coll = coll;
+      this.entityType = entityType;
+      this.head = ByteBuffer.wrap(head.getBytes(DEFAULT));
+      this.jsonSerializer = jsonSerializer;
+      this.metadata = metadata;
+      this.options = options;
+      this.tail = ByteBuffer.wrap(tail.getBytes(DEFAULT));
+    }
+
+    @Override
+    public int read(ByteBuffer dest) throws IOException {
+      ByteBuffer buffer = getCurrentBuffer();
+      if (buffer != null && buffer.hasRemaining()) {
+        int r = buffer.remaining();
+        if(r <= dest.remaining()) {
+          dest.put(buffer);
+        } else {
+          byte[] buf = new byte[dest.remaining()];
+          buffer.get(buf);
+          dest.put(buf);
+        }
+        return r;
+      }
+      return -1;
+    }
+
+    ByteBuffer currentBuffer;
+
+    private ByteBuffer getCurrentBuffer() {
+      if(currentBuffer == null) {
+        currentBuffer = head;
+      } if(!currentBuffer.hasRemaining()) {
+        if (coll.hasNext()) {
+          try {
+            // FIXME: mibo_160108: Inefficient buffer handling, replace
+            currentBuffer = serEntity(coll.nextEntity());
+            if(coll.hasNext()) {
+              ByteBuffer b = ByteBuffer.allocate(currentBuffer.position() + 1);
+              currentBuffer.flip();
+              b.put(currentBuffer).put(",".getBytes(DEFAULT));
+              currentBuffer = b;
+            }
+            currentBuffer.flip();
+          } catch (SerializerException e) {
+            return getCurrentBuffer();
+          }
+        } else if(tail.hasRemaining()) {
+          currentBuffer = tail;
+        } else {
+          return null;
+        }
+      }
+      return currentBuffer;
+    }
+
+    private ByteBuffer serEntity(Entity entity) throws SerializerException {
+      try {
+        CircleStreamBuffer buffer = new CircleStreamBuffer();
+        OutputStream outputStream = buffer.getOutputStream();
+        JsonGenerator json = new JsonFactory().createGenerator(outputStream);
+        jsonSerializer.writeEntity(metadata, entityType, entity, null,
+            options == null ? null : options.getExpand(),
+            options == null ? null : options.getSelect(),
+            options != null && options.getWriteOnlyReferences(),
+            json);
+
+        json.close();
+        outputStream.close();
+        return buffer.getBuffer();
+      } catch (final IOException e) {
+        return ByteBuffer.wrap(("ERROR" + e.getMessage()).getBytes());
+      }
+    }
+
+
+    @Override
+    public boolean isOpen() {
+      return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+  }
+
+  @Override
+  public InputStream getContent() {
+    return Channels.newInputStream(this.channel);
+  }
+
+  @Override
+  public ReadableByteChannel getChannel() {
+    return this.channel;
+  }
+
+  @Override
+  public boolean isNioSupported() {
+    return true;
+  }
+
+  private ChannelSerializerResult(ReadableByteChannel channel) {
+    this.channel = channel;
+  }
+
+  public static SerializerResultBuilder with(EntityStreamCollection coll, EdmEntityType entityType,
+      ODataJsonStreamSerializer jsonSerializer, ServiceMetadata metadata, EntitySerializerOptions options) {
+    return new SerializerResultBuilder(coll, entityType, jsonSerializer, metadata, options);
+  }
+
+  public static class SerializerResultBuilder {
+    private ODataJsonStreamSerializer jsonSerializer;
+    private EntityStreamCollection coll;
+    private ServiceMetadata metadata;
+    private EdmEntityType entityType;
+    private EntitySerializerOptions options;
+    private String head;
+    private String tail;
+
+    public SerializerResultBuilder(EntityStreamCollection coll, EdmEntityType entityType,
+        ODataJsonStreamSerializer jsonSerializer, ServiceMetadata metadata, EntitySerializerOptions options) {
+      this.coll = coll;
+      this.entityType = entityType;
+      this.jsonSerializer = jsonSerializer;
+      this.metadata = metadata;
+      this.options = options;
+    }
+
+    public SerializerResultBuilder addHead(String head) {
+      this.head = head;
+      return this;
+    }
+
+    public SerializerResultBuilder addTail(String tail) {
+      this.tail = tail;
+      return this;
+    }
+
+    public SerializerResult build() {
+      ReadableByteChannel input = new StreamChannel(coll, entityType, head, jsonSerializer, metadata, options, tail);
+      return new ChannelSerializerResult(input);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/SerializerResultImpl.java
----------------------------------------------------------------------
diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/SerializerResultImpl.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/SerializerResultImpl.java
index 53dca19..5a5364a 100644
--- a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/SerializerResultImpl.java
+++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/SerializerResultImpl.java
@@ -19,6 +19,9 @@
 package org.apache.olingo.server.core.serializer;
 
 import java.io.InputStream;
+import java.nio.channels.Channel;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
 
 import org.apache.olingo.server.api.serializer.SerializerResult;
 
@@ -30,6 +33,16 @@ public class SerializerResultImpl implements SerializerResult {
     return content;
   }
 
+  @Override
+  public ReadableByteChannel getChannel() {
+    return Channels.newChannel(getContent());
+  }
+
+  @Override
+  public boolean isNioSupported() {
+    return false;
+  }
+
   public static SerializerResultBuilder with() {
     return new SerializerResultBuilder();
   }

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/StreamSerializerResult.java
----------------------------------------------------------------------
diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/StreamSerializerResult.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/StreamSerializerResult.java
index d45c594..e4c8051 100644
--- a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/StreamSerializerResult.java
+++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/StreamSerializerResult.java
@@ -34,6 +34,10 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.channels.Channel;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
 
 public class StreamSerializerResult implements SerializerResult {
   private InputStream content;
@@ -121,6 +125,16 @@ public class StreamSerializerResult implements SerializerResult {
     return content;
   }
 
+  @Override
+  public ReadableByteChannel getChannel() {
+    return Channels.newChannel(getContent());
+  }
+
+  @Override
+  public boolean isNioSupported() {
+    return true;
+  }
+
   private StreamSerializerResult(InputStream content) {
     this.content = content;
   }

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/json/ODataJsonStreamSerializer.java
----------------------------------------------------------------------
diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/json/ODataJsonStreamSerializer.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/json/ODataJsonStreamSerializer.java
index 08a30c6..110d416 100644
--- a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/json/ODataJsonStreamSerializer.java
+++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/json/ODataJsonStreamSerializer.java
@@ -58,6 +58,7 @@ import org.apache.olingo.server.api.uri.queryoption.ExpandItem;
 import org.apache.olingo.server.api.uri.queryoption.ExpandOption;
 import org.apache.olingo.server.api.uri.queryoption.SelectOption;
 import org.apache.olingo.server.core.serializer.AbstractODataSerializer;
+import org.apache.olingo.server.core.serializer.ChannelSerializerResult;
 import org.apache.olingo.server.core.serializer.SerializerResultImpl;
 import org.apache.olingo.server.core.serializer.StreamSerializerResult;
 import org.apache.olingo.server.core.serializer.utils.CircleStreamBuffer;
@@ -134,7 +135,9 @@ public class ODataJsonStreamSerializer extends ODataJsonSerializer {
         opt.expand(options.getExpand()).select(options
             .getSelect()).writeOnlyReferences(options.getWriteOnlyReferences());
       }
-      return StreamSerializerResult.with(coll, entityType, this, metadata, opt.build())
+//      return StreamSerializerResult.with(coll, entityType, this, metadata, opt.build())
+//          .addHead(head).addTail(tail).build();
+      return ChannelSerializerResult.with(coll, entityType, this, metadata, opt.build())
           .addHead(head).addTail(tail).build();
     } catch (final IOException e) {
       cachedException =

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/utils/CircleStreamBuffer.java
----------------------------------------------------------------------
diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/utils/CircleStreamBuffer.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/utils/CircleStreamBuffer.java
index 20d9ca5..b7ba2f2 100644
--- a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/utils/CircleStreamBuffer.java
+++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/utils/CircleStreamBuffer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -193,6 +194,25 @@ public class CircleStreamBuffer {
     return readBuffer.get();
   }
 
+  public ByteBuffer getBuffer() throws IOException {
+    if (readClosed) {
+      throw new IOException("Tried to read from closed stream.");
+    }
+    writeMode = false;
+
+    // FIXME: mibo_160108: This is not efficient and only for test/poc reasons
+    int reqSize = 0;
+    for (ByteBuffer byteBuffer : bufferQueue) {
+      reqSize += byteBuffer.position();
+    }
+    ByteBuffer tmp = ByteBuffer.allocateDirect(reqSize);
+    for (ByteBuffer byteBuffer : bufferQueue) {
+      byteBuffer.flip();
+      tmp.put(byteBuffer);
+    }
+    return tmp;
+  }
+
   // #############################################
   // #
   // # Writing parts

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/885ec27e/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java
----------------------------------------------------------------------
diff --git a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java
index 6644f1e..f8fa7c8 100644
--- a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java
+++ b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java
@@ -536,8 +536,11 @@ public class TechnicalEntityProcessor extends TechnicalProcessor
         serializeEntityStreamCollectionFixed(request,
             entitySetSerialization, edmEntitySet, edmEntityType, requestedContentType,
             expand, select, countOption, id);
-    response.setContent(serializerResult.getContent());
-
+    if(serializerResult.isNioSupported()) {
+      response.setChannel(serializerResult.getChannel());
+    } else {
+      response.setContent(serializerResult.getContent());
+    }
     response.setStatusCode(HttpStatusCode.OK.getStatusCode());
     response.setHeader(HttpHeader.CONTENT_TYPE, requestedContentType.toContentTypeString());
     if (pageSize != null) {
@@ -631,6 +634,9 @@ public class TechnicalEntityProcessor extends TechnicalProcessor
 
       @Override
       public Entity nextEntity() {
+        try {
+          TimeUnit.MILLISECONDS.sleep(1000);
+        } catch (InterruptedException e) { }
         return test.next();
       }
     };
@@ -647,6 +653,7 @@ public class TechnicalEntityProcessor extends TechnicalProcessor
             .build());
   }
 
+
   private SerializerResult serializeEntityCollection(final ODataRequest request, final EntityCollection
       entityCollection, final EdmEntitySet edmEntitySet, final EdmEntityType edmEntityType,
       final ContentType requestedFormat, final ExpandOption expand, final SelectOption select,