You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/02/23 09:03:40 UTC

[04/10] camel git commit: CAMEL-10883: read delayed payloads

CAMEL-10883: read delayed payloads


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/72bcb075
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/72bcb075
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/72bcb075

Branch: refs/heads/master
Commit: 72bcb07522bac6404cb7ee90d1332f3760a2eb62
Parents: 4c043a8
Author: rohan <ro...@fronde.com>
Authored: Tue Feb 21 15:11:22 2017 +1300
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Thu Feb 23 10:00:24 2017 +0100

----------------------------------------------------------------------
 .../undertow/DefaultUndertowHttpBinding.java    | 32 +++++++++++++++-----
 1 file changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/72bcb075/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java
index adbe2dd..5aa774a 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHttpBinding.java
@@ -27,6 +27,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Locale;
 import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 
 import javax.activation.FileDataSource;
 
@@ -387,27 +389,41 @@ public class DefaultUndertowHttpBinding implements UndertowHttpBinding {
             if (res == -1) {
                 return out.toByteArray();
             } else if (res == 0) {
+                BlockingQueue<Integer> ping = new ArrayBlockingQueue<Integer>(1);
                 source.getReadSetter().set(new ChannelListener<StreamSourceChannel>() {
                     @Override
                     public void handleEvent(StreamSourceChannel channel) {
                         for (;;) {
                             try {
                                 int res = channel.read(buffer);
-                                if (res == -1 || res == 0) {
-                                    out.toByteArray();
-                                    return;
-                                } else {
-                                    buffer.flip();
-                                    out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.arrayOffset() + buffer.limit());
-                                    buffer.clear();
+                                switch (res) {
+                                    case -1:
+                                        ping.put(res);
+                                        return;
+                                    case 0:
+                                        // await next chunk
+                                        source.getReadSetter().set(this);
+                                        source.resumeReads();
+                                        return;
+                                    default:
+                                        buffer.flip();
+                                        out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.arrayOffset() + buffer.limit());
+                                        buffer.clear();
                                 }
-                            } catch (IOException e) {
+                            } catch (IOException | InterruptedException e) {
                                 LOG.error("Exception reading from channel {}", e);
                             }
                         }
                     }
                 });
                 source.resumeReads();
+                try {
+                    // wait for the listener to complete
+                    ping.take();
+                } catch (InterruptedException e) {
+                    LOG.error("Exception reading from channel {}", e);
+                }
+                return out.toByteArray();
             } else {
                 buffer.flip();
                 out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.arrayOffset() + buffer.limit());