You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/23 13:47:22 UTC

[GitHub] sijie closed pull request #2815: Add http output buffer configuration to proxy

sijie closed pull request #2815: Add http output buffer configuration to proxy
URL: https://github.com/apache/pulsar/pull/2815
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/proxy.conf b/conf/proxy.conf
index 68f0e8dd1d..dbac95aa23 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -134,3 +134,10 @@ tlsRequireTrustedClientCertOnConnect=false
 
 # Deprecated. Use configurationStoreServers
 globalZookeeperServers=
+
+# Http output buffer size. The amount of data that will be buffered for http requests
+# before it is flushed to the channel. A larger buffer size may result in higher http throughput
+# though it may take longer for the client to see data.
+# If using HTTP streaming via the reverse proxy, this should be set to the minimum value, 1,
+# so that clients see the data as soon as possible.
+httpOutputBufferSize=32768
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 18a09a6d32..b84bf3c507 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -28,6 +28,7 @@
 import java.util.regex.Pattern;
 
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
 
 import com.google.common.collect.Sets;
@@ -133,6 +134,14 @@
     // Http redirects to redirect to non-pulsar services
     private Set<HttpReverseProxyConfig> httpReverseProxyConfigs = Sets.newHashSet();
 
+    // Http output buffer size. The amount of data that will be buffered for http requests
+    // before it is flushed to the channel. A larger buffer size may result in higher http throughput
+    // though it may take longer for the client to see data.
+    // If using HTTP streaming via the reverse proxy, this should be set to the minimum value, 1,
+    // so that clients see the data as soon as possible.
+    @FieldContext(minValue = 1)
+    private int httpOutputBufferSize = 32*1024;
+
     private Properties properties = new Properties();
 
     public boolean forwardAuthorizationCredentials() {
@@ -448,6 +457,14 @@ public void setTlsRequireTrustedClientCertOnConnect(boolean tlsRequireTrustedCli
         this.tlsRequireTrustedClientCertOnConnect = tlsRequireTrustedClientCertOnConnect;
     }
 
+    public int getHttpOutputBufferSize() {
+        return httpOutputBufferSize;
+    }
+
+    public void setHttpOutputBufferSize(int httpOutputBufferSize) {
+        this.httpOutputBufferSize = httpOutputBufferSize;
+    }
+
     public Set<HttpReverseProxyConfig> getHttpReverseProxyConfigs() {
         return httpReverseProxyConfigs;
     }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index c5618bb448..cf4469bfcd 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -43,6 +43,8 @@
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.server.Slf4jRequestLog;
@@ -87,7 +89,10 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication
 
         List<ServerConnector> connectors = Lists.newArrayList();
 
-        ServerConnector connector = new ServerConnector(server, 1, 1);
+        HttpConfiguration http_config = new HttpConfiguration();
+        http_config.setOutputBufferSize(config.getHttpOutputBufferSize());
+
+        ServerConnector connector = new ServerConnector(server, 1, 1, new HttpConnectionFactory(http_config));
         connector.setPort(externalServicePort);
         connectors.add(connector);
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
index d08619da03..d5efb2362e 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
@@ -18,10 +18,18 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import java.io.IOException;
 import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.BooleanSupplier;
 
+import javax.servlet.AsyncContext;
 import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
@@ -32,9 +40,13 @@
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.Result;
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
 import org.glassfish.jersey.client.ClientConfig;
 import org.glassfish.jersey.logging.LoggingFeature;
 
@@ -81,6 +93,46 @@ public void handle(String target, Request baseRequest,
         };
     }
 
+    private static ServletContextHandler newStreamingHandler(LinkedBlockingQueue<String> dataQueue) {
+        ServletContextHandler context = new ServletContextHandler();
+        context.setContextPath("/");
+        ServletHolder asyncHolder = new ServletHolder(new HttpServlet() {
+                @Override
+                protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+                        throws ServletException, IOException {
+                    final AsyncContext ctx = req.startAsync();
+                    resp.setContentType("text/plain;charset=utf-8");
+                    resp.setStatus(HttpServletResponse.SC_OK);
+
+                    ctx.start(() -> {
+                            log.info("Doing async processing");
+                            try {
+                                while (true) {
+                                    String data = dataQueue.take();
+                                    if (data.equals("DONE")) {
+                                        ctx.complete();
+                                        break;
+                                    } else {
+                                        ctx.getResponse().getWriter().print(data);
+                                        ctx.getResponse().getWriter().flush();
+                                    }
+                                }
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                log.error("Async handler interrupted");
+                                ctx.complete();
+                            } catch (Exception e) {
+                                log.error("Unexpected error in async handler", e);
+                                ctx.complete();
+                            }
+                        });
+                }
+            });
+        asyncHolder.setAsyncSupported(true);
+        context.addServlet(asyncHolder, "/");
+        return context;
+    }
+
     @Override
     @AfterClass
     protected void cleanup() throws Exception {
@@ -296,4 +348,83 @@ public void testPathEndsInSlash() throws Exception {
 
     }
 
+    @Test
+    public void testStreaming() throws Exception {
+        LinkedBlockingQueue<String> dataQueue = new LinkedBlockingQueue<>();
+        Server streamingServer = new Server(0);
+        streamingServer.setHandler(newStreamingHandler(dataQueue));
+        streamingServer.start();
+
+        Properties props = new Properties();
+        props.setProperty("httpOutputBufferSize", "1");
+        props.setProperty("httpReverseProxy.foobar.path", "/stream");
+        props.setProperty("httpReverseProxy.foobar.proxyTo", streamingServer.getURI().toString());
+        props.setProperty("servicePort", "0");
+        props.setProperty("webServicePort", "0");
+
+        ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
+        AuthenticationService authService = new AuthenticationService(
+                PulsarConfigurationLoader.convertFrom(proxyConfig));
+
+        WebServer webServer = new WebServer(proxyConfig, authService);
+        ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig,
+                                                 new BrokerDiscoveryProvider(proxyConfig, mockZooKeeperClientFactory));
+        webServer.start();
+
+        HttpClient httpClient = new HttpClient();
+        httpClient.start();
+        try {
+            LinkedBlockingQueue<Byte> responses = new LinkedBlockingQueue<>();
+            CompletableFuture<Result> promise = new CompletableFuture<>();
+            httpClient.newRequest(webServer.getServiceUri()).path("/stream")
+                .onResponseContent((response, content) -> {
+                        while (content.hasRemaining()) {
+                            try {
+                                responses.put(content.get());
+                            } catch (Exception e) {
+                                log.error("Error reading response", e);
+                                promise.completeExceptionally(e);
+                            }
+                        }
+                    })
+                .send((result) -> {
+                        log.info("Response complete");
+                        promise.complete(result);
+                    });
+
+            dataQueue.put("Some data");
+            assertEventuallyTrue(() -> responses.size() == "Some data".length());
+            Assert.assertEquals("Some data", drainToString(responses));
+            Assert.assertFalse(promise.isDone());
+
+            dataQueue.put("More data");
+            assertEventuallyTrue(() -> responses.size() == "More data".length());
+            Assert.assertEquals("More data", drainToString(responses));
+            Assert.assertFalse(promise.isDone());
+
+            dataQueue.put("DONE");
+            assertEventuallyTrue(() -> promise.isDone());
+            Assert.assertTrue(promise.get().isSucceeded());
+        } finally {
+            webServer.stop();
+            httpClient.stop();
+            streamingServer.stop();
+        }
+    }
+
+    static String drainToString(Queue<Byte> queue) throws Exception {
+        byte[] bytes = new byte[queue.size()];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = queue.poll();
+        }
+        return new String(bytes, UTF_8);
+    }
+
+     static void assertEventuallyTrue(BooleanSupplier predicate) throws Exception {
+        // wait up to 3 seconds
+        for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) {
+            Thread.sleep(100);
+        }
+        Assert.assertTrue(predicate.getAsBoolean());
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services