You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rd...@apache.org on 2008/11/25 23:08:15 UTC
svn commit: r720623 - in
/james/protocol-tester/trunk/main/src/test/java/org/apache/james/mpt:
DiscardProtocol.java TestDiscardProtocol.java
Author: rdonkin
Date: Tue Nov 25 14:08:14 2008
New Revision: 720623
URL: http://svn.apache.org/viewvc?rev=720623&view=rev
Log:
Fixture for testing the tester. Not altogether convinced that the tests are raliable - multiple threaded tests are always tricky :-/
Added:
james/protocol-tester/trunk/main/src/test/java/org/apache/james/mpt/DiscardProtocol.java
james/protocol-tester/trunk/main/src/test/java/org/apache/james/mpt/TestDiscardProtocol.java
Added: james/protocol-tester/trunk/main/src/test/java/org/apache/james/mpt/DiscardProtocol.java
URL: http://svn.apache.org/viewvc/james/protocol-tester/trunk/main/src/test/java/org/apache/james/mpt/DiscardProtocol.java?rev=720623&view=auto
==============================================================================
--- james/protocol-tester/trunk/main/src/test/java/org/apache/james/mpt/DiscardProtocol.java (added)
+++ james/protocol-tester/trunk/main/src/test/java/org/apache/james/mpt/DiscardProtocol.java Tue Nov 25 14:08:14 2008
@@ -0,0 +1,307 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mpt;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Simple <a href='http://tools.ietf.org/html/rfc863'>RFC 863</a> implementation.
+ */
+public class DiscardProtocol {
+
+ private static final Charset ASCII = Charset.forName("US-ASCII");
+
+ private static final int SOCKET_CONNECTION_WAIT_MILLIS = 30;
+
+ private static final int IDLE_TIMEOUT = 120000;
+
+ private static final Log LOG = LogFactory.getLog(DiscardProtocol.class);
+
+ /** Serve on this port */
+ private final int port;
+
+ /**
+ * Queues requests for recordings.
+ * Also, used as lock.
+ */
+ private final Queue queue;
+
+ private final Collection runningServers;
+
+ /**
+ * Server socket when started, null otherwise.
+ * Null indicates to the socket serving thread that the server is stopped.
+ */
+ private volatile ServerSocketChannel socket;
+
+
+ public DiscardProtocol(final int port) {
+ super();
+ this.port = port;
+ queue = new LinkedList();
+ runningServers = new LinkedList();
+ }
+
+ /**
+ * Starts serving.
+ * @throws IOException when connection fails
+ * @throws IllegalStateException when already started
+ */
+ public void start() throws IOException {
+ synchronized (queue)
+ {
+ if (socket == null) {
+ socket = ServerSocketChannel.open();
+ socket.socket().bind(new InetSocketAddress(port));
+ // only going to record a single conversation
+ socket.configureBlocking(false);
+
+ final Thread socketMonitorThread = new Thread(new SocketMonitor());
+ socketMonitorThread.start();
+
+ } else {
+ throw new IllegalStateException("Already started");
+ }
+ }
+ }
+
+
+ public Record recordNext() {
+ synchronized (queue)
+ {
+ Server server = new Server();
+ queue.add(server);
+ return server;
+ }
+ }
+
+ private void abort() {
+ synchronized (queue)
+ {
+ stop();
+ for (Iterator it=queue.iterator();it.hasNext();) {
+ final Server server = (Server) it.next();
+ server.abort();
+ }
+ queue.clear();
+ }
+ }
+
+ /**
+ * Stops serving.
+ * @return ASCII bytes sent to socket by first
+ */
+ public void stop() {
+ synchronized (queue)
+ {
+ try {
+ if (socket != null) {
+ if (socket.isOpen()) {
+ socket.close();
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to close socket", e);
+ }
+ socket = null;
+ for (Iterator it = runningServers.iterator(); it.hasNext();) {
+ final Server server = (Server) it.next();
+ server.abort();
+ }
+ }
+ }
+
+ private final class SocketMonitor implements Runnable {
+ public void run() {
+ try
+ {
+ long lastConnection = System.currentTimeMillis();
+ while(socket != null) {
+ final SocketChannel socketChannel = socket.accept();
+ if (socketChannel == null) {
+ if (System.currentTimeMillis() - lastConnection > IDLE_TIMEOUT) {
+ throw new Exception ("Idle timeout");
+ }
+ Thread.sleep(SOCKET_CONNECTION_WAIT_MILLIS);
+ } else {
+ synchronized(queue) {
+ Server nextServer = (Server) queue.poll();
+ if (nextServer == null) {
+ nextServer = new Server();
+ }
+ nextServer.setSocketChannel(socketChannel);
+
+ final Thread channelThread = new Thread(nextServer);
+ channelThread.start();
+ runningServers.add(nextServer);
+ lastConnection = System.currentTimeMillis();
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.fatal("Cannot accept connection", e);
+ abort();
+ }
+ }
+ }
+
+ public interface Record {
+ /** Blocks until completion of conversation */
+ public String complete() throws Exception;
+ }
+
+ /**
+ * Basic server.
+ */
+ private final static class Server implements Runnable, Record {
+
+ private static final int COMPLETION_TIMEOUT = 60000;
+
+ private static final int COMPLETION_PAUSE = 1000;
+
+ private static final int INITIAL_BUFFER_CAPACITY = 2048;
+
+ private final ByteBuffer buffer;
+ /**
+ * Safe for concurrent access by multiple threads.
+ */
+ private final StringBuffer out;
+
+ /**
+ * Initialised by setter
+ */
+ private SocketChannel socketChannel;
+
+ private volatile boolean aborted;
+ private volatile boolean complete;
+
+ public Server() {
+ complete = false;
+ out = new StringBuffer(INITIAL_BUFFER_CAPACITY);
+ buffer = ByteBuffer.allocate(INITIAL_BUFFER_CAPACITY);
+ aborted = false;
+ socketChannel = null;
+ }
+
+
+ public SocketChannel getSocketChannel() {
+ return socketChannel;
+ }
+
+ public void setSocketChannel(SocketChannel socketChannel) {
+ this.socketChannel = socketChannel;
+ }
+
+ public void run() {
+ try
+ {
+ if (socketChannel == null)
+ {
+ LOG.fatal("Socket channel must be set before instance is run.");
+ }
+ else
+ {
+ try {
+ while(!socketChannel.finishConnect()) {
+ Thread.sleep(SOCKET_CONNECTION_WAIT_MILLIS);
+ }
+
+ int read = 0;
+ while(!aborted && socketChannel.isOpen() && read >= 0) {
+ read = socketChannel.read(buffer);
+ if (!buffer.hasRemaining()) {
+ decant();
+ }
+ }
+
+ } catch (Exception e) {
+ LOG.fatal("Socket communication failed", e);
+ aborted = true;
+
+ // Tidy up
+ } finally {
+ try {
+ socketChannel.close();
+ } catch (Exception e) {
+ LOG.debug("Ignoring failure to close socket.", e);
+ }
+ }
+ }
+ } finally {
+ synchronized (this)
+ {
+ // Ensure completion is flagged
+ complete = true;
+ // Signal to any waiting threads
+ notifyAll();
+ }
+ }
+ }
+
+ /**
+ * Transfers all data from buffer to builder
+ *
+ */
+ private void decant() {
+ buffer.flip();
+ final CharBuffer decoded = ASCII.decode(buffer);
+ out.append(decoded);
+ buffer.clear();
+ }
+
+
+ public void abort() {
+ aborted = true;
+ }
+
+ /**
+ * Blocks until connection is complete (closed)
+ */
+ public synchronized String complete() throws Exception {
+ if (aborted) {
+ throw new Exception("Aborted");
+ }
+ final long startTime = System.currentTimeMillis();
+ boolean isTimedOut = false;
+ while (!complete && !isTimedOut) {
+ wait(COMPLETION_PAUSE);
+ isTimedOut = (System.currentTimeMillis() - startTime) > COMPLETION_TIMEOUT;
+ }
+ if (isTimedOut && !complete) {
+ throw new Exception("Timed out wait for be notified that read is complete");
+ }
+ decant();
+ return out.toString();
+ }
+ }
+}
Added: james/protocol-tester/trunk/main/src/test/java/org/apache/james/mpt/TestDiscardProtocol.java
URL: http://svn.apache.org/viewvc/james/protocol-tester/trunk/main/src/test/java/org/apache/james/mpt/TestDiscardProtocol.java?rev=720623&view=auto
==============================================================================
--- james/protocol-tester/trunk/main/src/test/java/org/apache/james/mpt/TestDiscardProtocol.java (added)
+++ james/protocol-tester/trunk/main/src/test/java/org/apache/james/mpt/TestDiscardProtocol.java Tue Nov 25 14:08:14 2008
@@ -0,0 +1,99 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mpt;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.Socket;
+
+import javax.net.SocketFactory;
+
+import junit.framework.TestCase;
+
+public class TestDiscardProtocol extends TestCase {
+
+ private final class InputLater implements Runnable {
+ private Exception e;
+
+ public void run() {
+ try {
+ Thread.sleep(1000);
+ input();
+ } catch (Exception e) {
+ this.e = e;
+ }
+ }
+
+ public void assertExecutedSuccessfully() throws Exception {
+ if (e != null) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+ }
+
+ private static final String INPUT = "One, two, three - Testing";
+
+ private static final int PORT = 10001;
+
+ private DiscardProtocol protocol;
+ private Socket socket;
+
+ private DiscardProtocol.Record record;
+
+ //@Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ protocol = new DiscardProtocol(PORT);
+ protocol.start();
+ socket = SocketFactory.getDefault().createSocket("127.0.0.1", PORT);
+ record = protocol.recordNext();
+ }
+
+ //@Override
+ protected void tearDown() throws Exception {
+ protocol.stop();
+ super.tearDown();
+ }
+
+ public void testRecord() throws Exception {
+ assertTrue(socket.isConnected());
+ input();
+ String output = record.complete();
+ assertEquals(INPUT, output);
+ }
+
+ private void input() throws IOException {
+ Writer out = new OutputStreamWriter(socket.getOutputStream());
+ out.append(INPUT);
+ out.close();
+ socket.close();
+ }
+
+ public void testComplete() throws Exception {
+ InputLater inputLater = new InputLater();
+ Thread thread = new Thread(inputLater);
+ thread.start();
+ String output = record.complete();
+ assertEquals(INPUT, output);
+ inputLater.assertExecutedSuccessfully();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org