You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2007/11/30 21:20:27 UTC
svn commit: r599946 - in /activemq/camel/trunk: ./
components/camel-jhc/src/main/java/org/apache/camel/component/jhc/
components/camel-jhc/src/test/java/org/apache/camel/component/jhc/
Author: gnodet
Date: Fri Nov 30 12:20:26 2007
New Revision: 599946
URL: http://svn.apache.org/viewvc?rev=599946&view=rev
Log:
CAMEL-245: JHC componet should be able to create more than one consumer on the same port
Added:
activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/AsyncHttpRequestHandler.java
activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/AsyncResponseHandler.java
activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcServerEngine.java
activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcServerEngineFactory.java
Modified:
activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/AsyncBufferingHttpServiceHandler.java
activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/BufferingHttpServiceHandler.java
activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/DefaultListeningIOReactor.java
activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcConsumer.java
activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcProducer.java
activemq/camel/trunk/components/camel-jhc/src/test/java/org/apache/camel/component/jhc/HttpTest.java
activemq/camel/trunk/pom.xml
Modified: activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/AsyncBufferingHttpServiceHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/AsyncBufferingHttpServiceHandler.java?rev=599946&r1=599945&r2=599946&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/AsyncBufferingHttpServiceHandler.java (original)
+++ activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/AsyncBufferingHttpServiceHandler.java Fri Nov 30 12:20:26 2007
@@ -34,14 +34,9 @@
* Time: 6:57:34 PM
* To change this template use File | Settings | File Templates.
*/
-public abstract class AsyncBufferingHttpServiceHandler extends BufferingHttpServiceHandler {
-
- public interface AsyncHandler {
-
- void sendResponse(HttpResponse response) throws IOException, HttpException;
-
- }
+public class AsyncBufferingHttpServiceHandler extends BufferingHttpServiceHandler {
+
public AsyncBufferingHttpServiceHandler(final HttpParams params) {
super(createDefaultProcessor(),
new DefaultHttpResponseFactory(),
@@ -78,7 +73,7 @@
final HttpRequest request) throws IOException, HttpException {
HttpContext context = conn.getContext();
- HttpVersion ver = request.getRequestLine().getHttpVersion();
+ ProtocolVersion ver = request.getRequestLine().getProtocolVersion();
if (!ver.lessEquals(HttpVersion.HTTP_1_1)) {
// Downgrade protocol version if greater than HTTP/1.1
@@ -86,43 +81,53 @@
}
- context.setAttribute(HttpExecutionContext.HTTP_REQUEST, request);
- context.setAttribute(HttpExecutionContext.HTTP_CONNECTION, conn);
+ context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
+ context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
try {
this.httpProcessor.process(request, context);
HttpRequestHandler handler = null;
- if (this.handlerResolver != null) {
- String requestURI = request.getRequestLine().getUri();
- handler = this.handlerResolver.lookup(requestURI);
+ if (handlerResolver != null) {
+ String requestURI = request.getRequestLine().getUri();
+ handler = handlerResolver.lookup(requestURI);
}
- if (handler != null) {
- HttpResponse response = this.responseFactory.newHttpResponse(
- ver,
- HttpStatus.SC_OK,
- conn.getContext());
- HttpParamsLinker.link(response, this.params);
- context.setAttribute(HttpExecutionContext.HTTP_RESPONSE, response);
- handler.handle(request, response, context);
- sendResponse(conn, response);
- } else {
- asyncProcessRequest(request, context, new AsyncHandler() {
- public void sendResponse(HttpResponse response) throws IOException, HttpException {
- try {
- AsyncBufferingHttpServiceHandler.this.sendResponse(conn, response);
- } catch (HttpException ex) {
- response = AsyncBufferingHttpServiceHandler.this.responseFactory.newHttpResponse(
- HttpVersion.HTTP_1_0,
- HttpStatus.SC_INTERNAL_SERVER_ERROR,
- conn.getContext());
- HttpParamsLinker.link(response, AsyncBufferingHttpServiceHandler.this.params);
- AsyncBufferingHttpServiceHandler.this.handleException(ex, response);
- AsyncBufferingHttpServiceHandler.this.sendResponse(conn, response);
+ if (handler != null) {
+ if (handler instanceof AsyncHttpRequestHandler) {
+ ((AsyncHttpRequestHandler)handler).handle(request, context, new AsyncResponseHandler() {
+ public void sendResponse(HttpResponse response) throws IOException, HttpException {
+ try {
+ AsyncBufferingHttpServiceHandler.this.sendResponse(conn, response);
+ } catch (HttpException ex) {
+ response = AsyncBufferingHttpServiceHandler.this.responseFactory.newHttpResponse(
+ HttpVersion.HTTP_1_0,
+ HttpStatus.SC_INTERNAL_SERVER_ERROR,
+ conn.getContext());
+ HttpParamsLinker.link(response, AsyncBufferingHttpServiceHandler.this.params);
+ AsyncBufferingHttpServiceHandler.this.handleException(ex, response);
+ AsyncBufferingHttpServiceHandler.this.sendResponse(conn, response);
+ }
}
- }
- });
+ });
+ } else { // just hanlder the request with sync request handler
+ HttpResponse response = this.responseFactory.newHttpResponse(
+ ver,
+ HttpStatus.SC_OK,
+ conn.getContext());
+ HttpParamsLinker.link(response, this.params);
+ context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
+ handler.handle(request, response, context);
+ sendResponse(conn, response);
+ }
+ } else {
+ // add the default handler here
+ HttpResponse response = this.responseFactory.newHttpResponse(
+ ver,
+ HttpStatus.SC_OK,
+ conn.getContext());
+ response.setStatusCode(HttpStatus.SC_NOT_IMPLEMENTED);
+
}
} catch (HttpException ex) {
@@ -135,8 +140,6 @@
sendResponse(conn, response);
}
- }
-
- protected abstract void asyncProcessRequest(HttpRequest requet, HttpContext context, AsyncHandler handler);
+ }
}
Added: activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/AsyncHttpRequestHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/AsyncHttpRequestHandler.java?rev=599946&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/AsyncHttpRequestHandler.java (added)
+++ activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/AsyncHttpRequestHandler.java Fri Nov 30 12:20:26 2007
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jhc;
+
+import java.io.IOException;
+
+import org.apache.http.HttpException;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.protocol.HttpRequestHandler;
+
+
+public interface AsyncHttpRequestHandler extends HttpRequestHandler {
+
+ void handle(HttpRequest request, HttpContext context, AsyncResponseHandler handler)
+ throws HttpException, IOException;
+
+}
Added: activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/AsyncResponseHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/AsyncResponseHandler.java?rev=599946&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/AsyncResponseHandler.java (added)
+++ activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/AsyncResponseHandler.java Fri Nov 30 12:20:26 2007
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jhc;
+
+import java.io.IOException;
+
+import org.apache.http.HttpException;
+import org.apache.http.HttpResponse;
+
+public interface AsyncResponseHandler {
+
+ void sendResponse(HttpResponse response) throws IOException, HttpException;
+
+}
Modified: activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/BufferingHttpServiceHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/BufferingHttpServiceHandler.java?rev=599946&r1=599945&r2=599946&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/BufferingHttpServiceHandler.java (original)
+++ activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/BufferingHttpServiceHandler.java Fri Nov 30 12:20:26 2007
@@ -104,7 +104,7 @@
connState.setRequest(request);
connState.setInputState(ServerConnState.REQUEST_RECEIVED);
- HttpVersion ver = request.getRequestLine().getHttpVersion();
+ ProtocolVersion ver = request.getRequestLine().getProtocolVersion();
if (!ver.lessEquals(HttpVersion.HTTP_1_1)) {
// Downgrade protocol version if greater than HTTP/1.1
ver = HttpVersion.HTTP_1_1;
@@ -293,7 +293,7 @@
final HttpRequest request) throws IOException, HttpException {
HttpContext context = conn.getContext();
- HttpVersion ver = request.getRequestLine().getHttpVersion();
+ ProtocolVersion ver = request.getRequestLine().getProtocolVersion();
if (!ver.lessEquals(HttpVersion.HTTP_1_1)) {
// Downgrade protocol version if greater than HTTP/1.1
@@ -306,9 +306,9 @@
conn.getContext());
HttpParamsLinker.link(response, this.params);
- context.setAttribute(HttpExecutionContext.HTTP_REQUEST, request);
- context.setAttribute(HttpExecutionContext.HTTP_CONNECTION, conn);
- context.setAttribute(HttpExecutionContext.HTTP_RESPONSE, response);
+ context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
+ context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
+ context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
try {
Modified: activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/DefaultListeningIOReactor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/DefaultListeningIOReactor.java?rev=599946&r1=599945&r2=599946&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/DefaultListeningIOReactor.java (original)
+++ activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/DefaultListeningIOReactor.java Fri Nov 30 12:20:26 2007
@@ -33,7 +33,7 @@
import org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor;
import org.apache.http.impl.nio.reactor.ChannelEntry;
-import org.apache.http.nio.params.HttpNIOParams;
+
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.reactor.IOReactorExceptionHandler;
@@ -60,21 +60,15 @@
private volatile boolean closed = false;
private final HttpParams params;
- private final Selector selector;
-
- private IOReactorExceptionHandler exceptionHandler;
+
+
public DefaultListeningIOReactor(
int workerCount,
final ThreadFactory threadFactory,
final HttpParams params) throws IOReactorException {
- super(HttpNIOParams.getSelectInterval(params), workerCount, threadFactory);
- this.params = params;
- try {
- this.selector = Selector.open();
- } catch (IOException ex) {
- throw new IOReactorException("Failure opening selector", ex);
- }
+ super(workerCount, threadFactory, params);
+ this.params = params;
}
public DefaultListeningIOReactor(
@@ -83,37 +77,7 @@
this(workerCount, null, params);
}
- public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
- this.exceptionHandler = exceptionHandler;
- }
-
- public void execute(final IOEventDispatch eventDispatch)
- throws InterruptedIOException, IOReactorException {
- if (eventDispatch == null) {
- throw new IllegalArgumentException("Event dispatcher may not be null");
- }
- startWorkers(eventDispatch);
- for (;;) {
-
- int readyCount;
- try {
- readyCount = this.selector.select(getSelectTimeout());
- } catch (InterruptedIOException ex) {
- throw ex;
- } catch (IOException ex) {
- throw new IOReactorException("Unexpected selector failure", ex);
- }
-
- if (this.closed) {
- break;
- }
- if (readyCount > 0) {
- processEvents(this.selector.selectedKeys());
- }
- verifyWorkers();
- }
- }
-
+
private void processEvents(final Set selectedKeys)
throws IOReactorException {
for (Iterator it = selectedKeys.iterator(); it.hasNext(); ) {
@@ -159,15 +123,6 @@
}
}
- protected void prepareSocket(final Socket socket) throws IOException {
- socket.setTcpNoDelay(HttpConnectionParams.getTcpNoDelay(this.params));
- socket.setSoTimeout(HttpConnectionParams.getSoTimeout(this.params));
- int linger = HttpConnectionParams.getLinger(this.params);
- if (linger >= 0) {
- socket.setSoLinger(linger > 0, linger);
- }
- }
-
public SocketAddress listen(
final SocketAddress address) throws IOException {
if (this.closed) {
@@ -181,28 +136,10 @@
return serverChannel.socket().getLocalSocketAddress();
}
- public void shutdown() throws IOException {
- if (this.closed) {
- return;
- }
- this.closed = true;
-
- // Close out all channels
- Set keys = this.selector.keys();
- for (Iterator it = keys.iterator(); it.hasNext(); ) {
- try {
- SelectionKey key = (SelectionKey) it.next();
- Channel channel = key.channel();
- if (channel != null) {
- channel.close();
- }
- } catch (IOException ignore) {
- }
- }
- // Stop dispatching I/O events
- this.selector.close();
- // Stop the workers
- stopWorkers(500);
+ @Override
+ protected void processEvents(int count) throws IOReactorException {
+ processEvents(this.selector.selectedKeys());
+
}
}
Modified: activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcConsumer.java?rev=599946&r1=599945&r2=599946&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcConsumer.java (original)
+++ activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcConsumer.java Fri Nov 30 12:20:26 2007
@@ -36,6 +36,7 @@
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.ListeningIOReactor;
import org.apache.http.params.HttpParams;
+import org.apache.http.params.HttpParamsLinker;
import org.apache.http.protocol.*;
import org.apache.http.util.EncodingUtils;
import org.apache.http.util.concurrent.ThreadFactory;
@@ -62,14 +63,17 @@
public class JhcConsumer extends DefaultConsumer<JhcExchange> {
private static Log LOG = LogFactory.getLog(JhcConsumer.class);
-
- private int nbThreads = 2;
- private ListeningIOReactor ioReactor;
- private ThreadFactory threadFactory;
- private Thread runner;
+ private JhcServerEngine engine;
+ private MyHandler handler;
+
public JhcConsumer(JhcEndpoint endpoint, Processor processor) {
super(endpoint, processor);
+ engine = JhcServerEngineFactory.getJhcServerEngine(endpoint.getParams(),
+ endpoint.getPort(),
+ endpoint.getProtocol());
+ handler = new MyHandler(endpoint.getParams(), endpoint.getPath());
+
}
public JhcEndpoint getEndpoint() {
@@ -77,40 +81,18 @@
}
protected void doStart() throws Exception {
- super.doStart();
- final SocketAddress addr = new InetSocketAddress(getEndpoint().getPort());
- HttpParams params = getEndpoint().getParams();
- ioReactor = new DefaultListeningIOReactor(nbThreads, threadFactory, params);
-
- final IOEventDispatch ioEventDispatch;
- if ("https".equals(getEndpoint().getProtocol())) {
- SSLContext sslContext = null; // TODO
- ioEventDispatch = new SSLServerIOEventDispatch(new MyHandler(params), sslContext, params);
- } else {
- ioEventDispatch = new DefaultServerIOEventDispatch(new MyHandler(params), params);
- }
- runner = new Thread() {
- public void run() {
- try {
- ioReactor.listen(addr);
- ioReactor.execute(ioEventDispatch);
- } catch (InterruptedIOException ex) {
- LOG.info("Interrupted");
- } catch (IOException e) {
- LOG.warn("I/O error: " + e.getMessage());
- }
- LOG.debug("Shutdown");
- }
- };
- runner.start();
+ super.doStart();
+ engine.register(handler.getPath() + "*", handler);
+ if(! engine.isStarted()) {
+ engine.start();
+ }
}
protected void doStop() throws Exception {
- LOG.debug("Stopping");
- ioReactor.shutdown();
- LOG.debug("Waiting runner");
- runner.join();
- LOG.debug("Stopped");
+ engine.unregister(handler.getPath() + "*");
+ if (engine.getReferenceCounter() == 0) {
+ engine.stop();
+ }
super.doStop();
}
@@ -156,11 +138,24 @@
}
- class MyHandler extends AsyncBufferingHttpServiceHandler {
- public MyHandler(HttpParams params) {
- super(params);
+ class MyHandler implements AsyncHttpRequestHandler {
+ private final HttpParams params;
+ private final HttpResponseFactory responseFactory;
+ private final String path;
+
+ public MyHandler(HttpParams params, String path) {
+ this(params, path, new DefaultHttpResponseFactory());
+ }
+ public MyHandler(HttpParams params, String path, HttpResponseFactory responseFactory) {
+ this.params = params;
+ this.path = path;
+ this.responseFactory = responseFactory;
+ }
+
+ public String getPath() {
+ return path;
}
- protected void asyncProcessRequest(final HttpRequest request, final HttpContext context, final AsyncBufferingHttpServiceHandler.AsyncHandler handler) {
+ public void handle(final HttpRequest request, final HttpContext context, final AsyncResponseHandler handler) throws HttpException, IOException {
final Exchange exchange = getEndpoint().createExchange();
exchange.getIn().setHeader("http.uri", request.getRequestLine().getUri());
if (request instanceof HttpEntityEnclosingRequest) {
@@ -170,8 +165,10 @@
public void done(boolean doneSynchronously) {
LOG.debug("handleExchange");
// create the default response to this request
- HttpVersion httpVersion = request.getRequestLine().getHttpVersion();
+ ProtocolVersion httpVersion = (HttpVersion)request.getRequestLine().getProtocolVersion();
+
HttpResponse response = responseFactory.newHttpResponse(httpVersion, HttpStatus.SC_OK, context);
+ HttpParamsLinker.link(response, params);
HttpEntity entity = exchange.getOut().getBody(HttpEntity.class);
response.setEntity(entity);
response.setParams(getEndpoint().getParams());
@@ -182,6 +179,11 @@
}
}
});
+
+ }
+ public void handle(HttpRequest request, HttpResponse response, HttpContext context) throws HttpException, IOException {
+ // now we just handler the requset async, do nothing here
+
}
}
Modified: activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcProducer.java?rev=599946&r1=599945&r2=599946&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcProducer.java (original)
+++ activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcProducer.java Fri Nov 30 12:20:26 2007
@@ -235,6 +235,11 @@
AsyncCallback callback = (AsyncCallback) e.removeProperty(AsyncCallback.class.getName());
callback.done(false);
}
+
+ public void finalizeContext(HttpContext arg0) {
+ // TODO Auto-generated method stub
+
+ }
}
static class EventLogger implements EventListener {
Added: activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcServerEngine.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcServerEngine.java?rev=599946&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcServerEngine.java (added)
+++ activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcServerEngine.java Fri Nov 30 12:20:26 2007
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jhc;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.camel.component.jhc.JhcConsumer.MyHandler;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.http.impl.nio.DefaultServerIOEventDispatch;
+import org.apache.http.impl.nio.reactor.SSLServerIOEventDispatch;
+import org.apache.http.nio.reactor.IOEventDispatch;
+import org.apache.http.nio.reactor.IOReactorException;
+import org.apache.http.nio.reactor.ListeningIOReactor;
+import org.apache.http.params.HttpParams;
+import org.apache.http.protocol.HttpRequestHandler;
+import org.apache.http.protocol.HttpRequestHandlerRegistry;
+import org.apache.http.protocol.HttpRequestHandlerResolver;
+import org.apache.http.util.concurrent.ThreadFactory;
+
+public class JhcServerEngine {
+ private static Log LOG = LogFactory.getLog(JhcServerEngine.class);
+ private final HttpParams params;
+ private int port;
+ private String protocol;
+ private int nbThreads = 2;
+ private ListeningIOReactor ioReactor;
+ private ThreadFactory threadFactory;
+ private Thread runner;
+ private SSLContext sslContext;
+ private AsyncBufferingHttpServiceHandler serviceHandler;
+ private HttpRequestHandlerRegistry handlerRegistry;
+ private boolean isStarted;
+ private int referenceCounter;
+
+ JhcServerEngine(HttpParams params, int port, String protocol) {
+ this.params = params;
+ serviceHandler = new AsyncBufferingHttpServiceHandler(params);
+ handlerRegistry = new HttpRequestHandlerRegistry();
+ serviceHandler.setHandlerResolver(handlerRegistry);
+ this.port = port;
+ this.protocol = protocol;
+ }
+
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getProtocol() {
+ return this.protocol;
+ }
+
+ public void setSslContext(SSLContext sslContext) {
+ this.sslContext = sslContext;
+ }
+
+ public SSLContext getSslContext() {
+ return this.sslContext;
+ }
+
+ public synchronized void register(String pattern, AsyncHttpRequestHandler handler) {
+ handlerRegistry.register(pattern, handler);
+ referenceCounter ++;
+ }
+
+ public synchronized void unregister(String pattern) {
+ handlerRegistry.unregister(pattern);
+ referenceCounter --;
+ }
+
+ public int getReferenceCounter() {
+ return referenceCounter;
+ }
+
+ public boolean isStarted() {
+ return isStarted;
+ }
+
+ public void start() throws IOReactorException {
+ final SocketAddress addr = new InetSocketAddress(port);
+ ioReactor = new DefaultListeningIOReactor(nbThreads, threadFactory, params);
+
+ final IOEventDispatch ioEventDispatch;
+ if ("https".equals(protocol)) {
+ ioEventDispatch = new SSLServerIOEventDispatch(serviceHandler, sslContext, params);
+ } else {
+ ioEventDispatch = new DefaultServerIOEventDispatch(serviceHandler, params);
+ }
+ runner = new Thread() {
+ public void run() {
+ try {
+ ioReactor.listen(addr);
+ isStarted = true;
+ ioReactor.execute(ioEventDispatch);
+ } catch (InterruptedIOException ex) {
+ LOG.info("Interrupted");
+ } catch (IOException e) {
+ LOG.warn("I/O error: " + e.getMessage());
+ }
+ LOG.debug("Shutdown");
+ }
+ };
+ runner.start();
+ }
+
+ public void stop() throws IOException {
+ LOG.debug("Stopping the jhc ioReactor ");
+ ioReactor.shutdown();
+ LOG.debug("Waiting the runner");
+ try {
+ runner.join();
+ } catch (InterruptedException e) {
+ //do nothing here
+ }
+ isStarted = false;
+ LOG.debug("Runner stopped");
+ }
+}
Added: activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcServerEngineFactory.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcServerEngineFactory.java?rev=599946&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcServerEngineFactory.java (added)
+++ activemq/camel/trunk/components/camel-jhc/src/main/java/org/apache/camel/component/jhc/JhcServerEngineFactory.java Fri Nov 30 12:20:26 2007
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jhc;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.http.params.HttpParams;
+
+public class JhcServerEngineFactory {
+ private static Map<Integer, JhcServerEngine> portMap =
+ new HashMap<Integer, JhcServerEngine>();
+
+ public synchronized static JhcServerEngine getJhcServerEngine(HttpParams params, int port, String protocol) {
+ JhcServerEngine engine = portMap.get(port);
+ // check the engine parament
+ if (engine == null) {
+ engine = new JhcServerEngine(params, port, protocol.trim());
+ portMap.put(port, engine);
+ } else {
+ if (!engine.getProtocol().equals(protocol.trim())) {
+ throw new IllegalArgumentException("Jhc protocol error, the engine's protocol is " + engine.getProtocol()
+ + " you want is " + protocol);
+ }
+ }
+ return engine;
+ }
+
+}
Modified: activemq/camel/trunk/components/camel-jhc/src/test/java/org/apache/camel/component/jhc/HttpTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jhc/src/test/java/org/apache/camel/component/jhc/HttpTest.java?rev=599946&r1=599945&r2=599946&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jhc/src/test/java/org/apache/camel/component/jhc/HttpTest.java (original)
+++ activemq/camel/trunk/components/camel-jhc/src/test/java/org/apache/camel/component/jhc/HttpTest.java Fri Nov 30 12:20:26 2007
@@ -16,12 +16,6 @@
*/
package org.apache.camel.component.jhc;
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.net.HttpURLConnection;
@@ -29,6 +23,12 @@
import java.util.List;
import java.util.Map;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
/**
* @version $Revision: 520220 $
*/
@@ -66,11 +66,16 @@
assertNotNull("Should have a body!", body);
assertTrue("body should contain: " + expectedText, body.contains(expectedText));
}
-
+
public void testUsingURL() throws Exception {
+ invokeUsingURL("http://localhost:8192/test1", "<response> Test1<response/>");
+ invokeUsingURL("http://localhost:8192/test2", "<response> Test2<response/>");
+ }
+
+ public void invokeUsingURL(String url, String expect) throws Exception {
for (int i = 0; i < 10; i++) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- HttpURLConnection con = (HttpURLConnection) new URL("http://localhost:8192/").openConnection();
+ HttpURLConnection con = (HttpURLConnection) new URL(url).openConnection();
//HttpURLConnection con = (HttpURLConnection) new URL("http://www.google.com/").openConnection();
con.setDoInput(true);
con.setDoOutput(false);
@@ -89,7 +94,8 @@
}
String str = baos.toString();
System.err.println("Response: " + str);
- assertEquals("<response/>", str);
+ assertEquals("request using url" + url + "can't get the expert result",
+ expect, str);
}
}
@@ -97,8 +103,10 @@
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() {
- from("jhc:http://localhost:8192/").setOutBody(constant("<response/>"));
- from("direct:start").to("jhc:http://localhost:8192/pom.xml").to("mock:results");
+ from("jhc:http://localhost:8192/test1").setOutBody(constant("<response> Test1<response/>"));
+ from("direct:start").to("jhc:http://localhost:8192/test1/pom.xml").to("mock:results");
+ from("jhc:http://localhost:8192/test2").setOutBody(constant("<response> Test2<response/>"));
+
}
};
}
Modified: activemq/camel/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/pom.xml?rev=599946&r1=599945&r2=599946&view=diff
==============================================================================
--- activemq/camel/trunk/pom.xml (original)
+++ activemq/camel/trunk/pom.xml Fri Nov 30 12:20:26 2007
@@ -40,6 +40,7 @@
<spring-version>2.0.6</spring-version>
<geronimo-spec-version>1.1</geronimo-spec-version>
<jetty-version>6.1.5</jetty-version>
+ <httpcore-version>4.0-alpha6</httpcore-version>
<m1-repo-url>scpexe://minotaur.apache.org/www/people.apache.org/repo/m1-snapshot-repository</m1-repo-url>
<openjpa-version>0.9.6-incubating</openjpa-version>
<release-repo-url>scpexe://people.apache.org/www/people.apache.org/repo/m2-ibiblio-rsync-repository
@@ -580,20 +581,20 @@
<!-- optional jakarta http core support -->
<dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore</artifactId>
- <version>4.0-alpha5</version>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore-nio</artifactId>
- <version>4.0-alpha5</version>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore-niossl</artifactId>
- <version>4.0-alpha5</version>
- </dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>${httpcore-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore-nio</artifactId>
+ <version>${httpcore-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore-niossl</artifactId>
+ <version>${httpcore-version}</version>
+ </dependency>
<!-- optional scripting support -->
<dependency>