You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by jf...@apache.org on 2012/03/22 22:49:13 UTC

svn commit: r1304085 [9/10] - in /thrift/trunk: ./ aclocal/ compiler/cpp/ compiler/cpp/src/generate/ lib/ lib/d/ lib/d/src/ lib/d/src/thrift/ lib/d/src/thrift/async/ lib/d/src/thrift/codegen/ lib/d/src/thrift/internal/ lib/d/src/thrift/internal/test/ l...

Added: thrift/trunk/lib/d/test/async_test.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/test/async_test.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/test/async_test.d (added)
+++ thrift/trunk/lib/d/test/async_test.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless enforced by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module async_test;
+
+import core.atomic;
+import core.sync.condition : Condition;
+import core.sync.mutex : Mutex;
+import core.thread : dur, Thread, ThreadGroup;
+import std.conv : text;
+import std.datetime;
+import std.getopt;
+import std.exception : collectException, enforce;
+import std.parallelism : TaskPool;
+import std.stdio;
+import std.string;
+import std.variant : Variant;
+import thrift.base;
+import thrift.async.base;
+import thrift.async.libevent;
+import thrift.async.socket;
+import thrift.async.ssl;
+import thrift.codegen.async_client;
+import thrift.codegen.async_client_pool;
+import thrift.codegen.base;
+import thrift.codegen.processor;
+import thrift.protocol.base;
+import thrift.protocol.binary;
+import thrift.server.base;
+import thrift.server.simple;
+import thrift.server.transport.socket;
+import thrift.server.transport.ssl;
+import thrift.transport.base;
+import thrift.transport.buffered;
+import thrift.transport.ssl;
+import thrift.util.cancellation;
+
+version (Posix) {
+  import core.stdc.signal;
+  import core.sys.posix.signal;
+
+  // Disable SIGPIPE because SSL server will write to broken socket after
+  // client disconnected (see TSSLSocket docs).
+  shared static this() {
+    signal(SIGPIPE, SIG_IGN);
+  }
+}
+
+interface AsyncTest {
+  string echo(string value);
+  string delayedEcho(string value, long milliseconds);
+
+  void fail(string reason);
+  void delayedFail(string reason, long milliseconds);
+
+  enum methodMeta = [
+    TMethodMeta("fail", [], [TExceptionMeta("ate", 1, "AsyncTestException")]),
+    TMethodMeta("delayedFail", [], [TExceptionMeta("ate", 1, "AsyncTestException")])
+  ];
+  alias .AsyncTestException AsyncTestException;
+}
+
+class AsyncTestException : TException {
+  string reason;
+  mixin TStructHelpers!();
+}
+
+void main(string[] args) {
+  ushort port = 9090;
+  ushort managerCount = 2;
+  ushort serversPerManager = 5;
+  ushort threadsPerServer = 10;
+  uint iterations = 10;
+  bool ssl;
+  bool trace;
+
+  getopt(args,
+    "iterations", &iterations,
+    "managers", &managerCount,
+    "port", &port,
+    "servers-per-manager", &serversPerManager,
+    "ssl", &ssl,
+    "threads-per-server", &threadsPerServer,
+    "trace", &trace,
+  );
+
+  TTransportFactory clientTransportFactory;
+  TSSLContext serverSSLContext;
+  if (ssl) {
+    auto clientSSLContext = new TSSLContext();
+    with (clientSSLContext) {
+      ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH";
+      authenticate = true;
+      loadTrustedCertificates("./trusted-ca-certificate.pem");
+    }
+    clientTransportFactory = new TAsyncSSLSocketFactory(clientSSLContext);
+
+    serverSSLContext = new TSSLContext();
+    with (serverSSLContext) {
+      serverSide = true;
+      loadCertificate("./server-certificate.pem");
+      loadPrivateKey("./server-private-key.pem");
+      ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH";
+    }
+  } else {
+    clientTransportFactory = new TBufferedTransportFactory;
+  }
+
+
+  auto serverCancel = new TCancellationOrigin;
+  scope(exit) {
+    writeln("Triggering server shutdown...");
+    serverCancel.trigger();
+    writeln("done.");
+  }
+
+  auto managers = new TLibeventAsyncManager[managerCount];
+  scope (exit) foreach (ref m; managers) clear(m);
+
+  auto clientsThreads = new ThreadGroup;
+  foreach (managerIndex, ref manager; managers) {
+    manager = new TLibeventAsyncManager;
+    foreach (serverIndex; 0 .. serversPerManager) {
+      auto currentPort = cast(ushort)
+        (port + managerIndex * serversPerManager + serverIndex);
+
+      // Start the server and wait until it is up and running.
+      auto servingMutex = new Mutex;
+      auto servingCondition = new Condition(servingMutex);
+      auto handler = new PreServeNotifyHandler(servingMutex, servingCondition);
+      synchronized (servingMutex) {
+        (new ServerThread!TSimpleServer(currentPort, serverSSLContext, trace,
+          serverCancel, handler)).start();
+        servingCondition.wait();
+      }
+
+      // We only run the timing tests for the first server on each async
+      // manager, so that we don't get spurious timing errors becaue of
+      // ordering issues.
+      auto runTimingTests = (serverIndex == 0);
+
+      auto c = new ClientsThread(manager, currentPort, clientTransportFactory,
+        threadsPerServer, iterations, runTimingTests, trace);
+      clientsThreads.add(c);
+      c.start();
+    }
+  }
+  clientsThreads.joinAll();
+}
+
+class AsyncTestHandler : AsyncTest {
+  this(bool trace) {
+    trace_ = trace;
+  }
+
+  override string echo(string value) {
+    if (trace_) writefln(`echo("%s")`, value);
+    return value;
+  }
+
+  override string delayedEcho(string value, long milliseconds) {
+    if (trace_) writef(`delayedEcho("%s", %s ms)... `, value, milliseconds);
+    Thread.sleep(dur!"msecs"(milliseconds));
+    if (trace_) writeln("returning.");
+
+    return value;
+  }
+
+  override void fail(string reason) {
+    if (trace_) writefln(`fail("%s")`, reason);
+    auto ate = new AsyncTestException;
+    ate.reason = reason;
+    throw ate;
+  }
+
+  override void delayedFail(string reason, long milliseconds) {
+    if (trace_) writef(`delayedFail("%s", %s ms)... `, reason, milliseconds);
+    Thread.sleep(dur!"msecs"(milliseconds));
+    if (trace_) writeln("returning.");
+
+    auto ate = new AsyncTestException;
+    ate.reason = reason;
+    throw ate;
+  }
+
+private:
+  bool trace_;
+  AsyncTestException ate_;
+}
+
+class PreServeNotifyHandler : TServerEventHandler {
+  this(Mutex servingMutex, Condition servingCondition) {
+    servingMutex_ = servingMutex;
+    servingCondition_ = servingCondition;
+  }
+
+  void preServe() {
+    synchronized (servingMutex_) {
+      servingCondition_.notifyAll();
+    }
+  }
+  Variant createContext(TProtocol input, TProtocol output) { return Variant.init; }
+  void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {}
+  void preProcess(Variant serverContext, TTransport transport) {}
+
+private:
+  Mutex servingMutex_;
+  Condition servingCondition_;
+}
+
+class ServerThread(ServerType) : Thread {
+  this(ushort port, TSSLContext sslContext, bool trace,
+    TCancellation cancellation, TServerEventHandler eventHandler
+  ) {
+    port_ = port;
+    sslContext_ = sslContext;
+    trace_ = trace;
+    cancellation_ = cancellation;
+    eventHandler_ = eventHandler;
+
+    super(&run);
+  }
+
+  void run() {
+    TServerSocket serverSocket;
+    if (sslContext_) {
+      serverSocket = new TSSLServerSocket(port_, sslContext_);
+    } else {
+      serverSocket = new TServerSocket(port_);
+    }
+    auto transportFactory = new TBufferedTransportFactory;
+    auto protocolFactory = new TBinaryProtocolFactory!();
+    auto processor = new TServiceProcessor!AsyncTest(new AsyncTestHandler(trace_));
+
+    auto server = new ServerType(processor, serverSocket, transportFactory,
+      protocolFactory);
+    server.eventHandler = eventHandler_;
+
+    writefln("Starting server on port %s...", port_);
+    server.serve(cancellation_);
+    writefln("Server thread on port %s done.", port_);
+  }
+
+private:
+  ushort port_;
+  bool trace_;
+  TCancellation cancellation_;
+  TSSLContext sslContext_;
+  TServerEventHandler eventHandler_;
+}
+
+class ClientsThread : Thread {
+  this(TAsyncSocketManager manager, ushort port, TTransportFactory tf,
+    ushort threads, uint iterations, bool runTimingTests, bool trace
+  ) {
+    manager_ = manager;
+    port_ = port;
+    transportFactory_ = tf;
+    threads_ = threads;
+    iterations_ = iterations;
+    runTimingTests_ = runTimingTests;
+    trace_ = trace;
+    super(&run);
+  }
+
+  void run() {
+    auto transport = new TAsyncSocket(manager_, "localhost", port_);
+
+    {
+      auto client = new TAsyncClient!AsyncTest(
+        transport,
+        transportFactory_,
+        new TBinaryProtocolFactory!()
+      );
+      transport.open();
+      auto clientThreads = new ThreadGroup;
+      foreach (clientId; 0 .. threads_) {
+        clientThreads.create({
+          auto c = clientId;
+          return {
+            foreach (i; 0 .. iterations_) {
+              immutable id = text(port_, ":", c, ":", i);
+
+              {
+                if (trace_) writefln(`Calling echo("%s")... `, id);
+                auto a = client.echo(id);
+                enforce(a == id);
+                if (trace_) writefln(`echo("%s") done.`, id);
+              }
+
+              {
+                if (trace_) writefln(`Calling fail("%s")... `, id);
+                auto a = cast(AsyncTestException)collectException(client.fail(id).waitGet());
+                enforce(a && a.reason == id);
+                if (trace_) writefln(`fail("%s") done.`, id);
+              }
+            }
+          };
+        }());
+      }
+      clientThreads.joinAll();
+      transport.close();
+    }
+
+    if (runTimingTests_) {
+      auto client = new TAsyncClient!AsyncTest(
+        transport,
+        transportFactory_,
+        new TBinaryProtocolFactory!TBufferedTransport
+      );
+
+      // Temporarily redirect error logs to stdout, as SSL errors on the server
+      // side are expected when the client terminates aburptly (as is the case
+      // in the timeout test).
+      auto oldErrorLogSink = g_errorLogSink;
+      g_errorLogSink = g_infoLogSink;
+      scope (exit) g_errorLogSink = oldErrorLogSink;
+
+      foreach (i; 0 .. iterations_) {
+        transport.open();
+
+        immutable id = text(port_, ":", i);
+
+        {
+          if (trace_) writefln(`Calling delayedEcho("%s", 100 ms)...`, id);
+          auto a = client.delayedEcho(id, 100);
+          enforce(!a.completion.wait(dur!"usecs"(1)),
+            text("wait() succeded early (", a.get(), ", ", id, ")."));
+          enforce(!a.completion.wait(dur!"usecs"(1)),
+            text("wait() succeded early (", a.get(), ", ", id, ")."));
+          enforce(a.completion.wait(dur!"msecs"(200)),
+            text("wait() didn't succeed as expected (", id, ")."));
+          enforce(a.get() == id);
+          if (trace_) writefln(`... delayedEcho("%s") done.`, id);
+        }
+
+        {
+          if (trace_) writefln(`Calling delayedFail("%s", 100 ms)... `, id);
+          auto a = client.delayedFail(id, 100);
+          enforce(!a.completion.wait(dur!"usecs"(1)),
+            text("wait() succeded early (", id, ", ", collectException(a.get()), ")."));
+          enforce(!a.completion.wait(dur!"usecs"(1)),
+            text("wait() succeded early (", id, ", ", collectException(a.get()), ")."));
+          enforce(a.completion.wait(dur!"msecs"(200)),
+            text("wait() didn't succeed as expected (", id, ")."));
+          auto e = cast(AsyncTestException)collectException(a.get());
+          enforce(e && e.reason == id);
+          if (trace_) writefln(`... delayedFail("%s") done.`, id);
+        }
+
+        {
+          transport.recvTimeout = dur!"msecs"(50);
+
+          if (trace_) write(`Calling delayedEcho("socketTimeout", 100 ms)... `);
+          auto a = client.delayedEcho("socketTimeout", 100);
+          auto e = cast(TTransportException)collectException(a.waitGet());
+          enforce(e, text("Operation didn't fail as expected (", id, ")."));
+          enforce(e.type == TTransportException.Type.TIMED_OUT,
+            text("Wrong timeout exception type (", id, "): ", e));
+          if (trace_) writeln(`timed out as expected.`);
+
+          // Wait until the server thread reset before the next iteration.
+          Thread.sleep(dur!"msecs"(50));
+          transport.recvTimeout = dur!"hnsecs"(0);
+        }
+
+        transport.close();
+      }
+    }
+
+    writefln("Clients thread for port %s done.", port_);
+  }
+
+  TAsyncSocketManager manager_;
+  ushort port_;
+  TTransportFactory transportFactory_;
+  ushort threads_;
+  uint iterations_;
+  bool runTimingTests_;
+  bool trace_;
+}

Added: thrift/trunk/lib/d/test/async_test_runner.sh
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/test/async_test_runner.sh?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/test/async_test_runner.sh (added)
+++ thrift/trunk/lib/d/test/async_test_runner.sh Thu Mar 22 21:49:10 2012
@@ -0,0 +1,6 @@
+#!/bin/bash
+# Runs the async test in both SSL and non-SSL mode.
+./async_test > /dev/null || exit 1
+echo "Non-SSL tests done."
+./async_test --ssl > /dev/null || exit 1
+echo "SSL tests done."

Added: thrift/trunk/lib/d/test/client_pool_test.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/test/client_pool_test.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/test/client_pool_test.d (added)
+++ thrift/trunk/lib/d/test/client_pool_test.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module client_pool_test;
+
+import core.time : Duration, dur;
+import core.thread : Thread;
+import std.algorithm;
+import std.array;
+import std.conv;
+import std.exception;
+import std.getopt;
+import std.range;
+import std.stdio;
+import std.typecons;
+import thrift.base;
+import thrift.async.libevent;
+import thrift.async.socket;
+import thrift.codegen.base;
+import thrift.codegen.async_client;
+import thrift.codegen.async_client_pool;
+import thrift.codegen.client;
+import thrift.codegen.client_pool;
+import thrift.codegen.processor;
+import thrift.protocol.binary;
+import thrift.server.simple;
+import thrift.server.transport.socket;
+import thrift.transport.buffered;
+import thrift.transport.socket;
+import thrift.util.cancellation;
+import thrift.util.future;
+
+// We use this as our RPC-layer exception here to make sure socket/… problems
+// (that would usually considered to be RPC layer faults) cause the tests to
+// fail, even though we are testing the RPC exception handling.
+class TestServiceException : TException {
+  int port;
+}
+
+interface TestService {
+  int getPort();
+  alias .TestServiceException TestServiceException;
+  enum methodMeta = [TMethodMeta("getPort", [],
+    [TExceptionMeta("a", 1, "TestServiceException")])];
+}
+
+// Use some derived service, just to check that the pools handle inheritance
+// correctly.
+interface ExTestService : TestService {
+  int[] getPortInArray();
+  enum methodMeta = [TMethodMeta("getPortInArray", [],
+    [TExceptionMeta("a", 1, "TestServiceException")])];
+}
+
+class ExTestHandler : ExTestService {
+  this(ushort port, Duration delay, bool failing, bool trace) {
+    this.port = port;
+    this.delay = delay;
+    this.failing = failing;
+    this.trace = trace;
+  }
+
+  override int getPort() {
+    if (trace) {
+      stderr.writefln("getPort() called on %s (delay: %s, failing: %s)", port,
+        delay, failing);
+    }
+    sleep();
+    failIfEnabled();
+    return port;
+  }
+
+  override int[] getPortInArray() {
+    return [getPort()];
+  }
+
+  ushort port;
+  Duration delay;
+  bool failing;
+  bool trace;
+
+private:
+  void sleep() {
+    if (delay > dur!"hnsecs"(0)) Thread.sleep(delay);
+  }
+
+  void failIfEnabled() {
+    if (!failing) return;
+
+    auto e = new TestServiceException;
+    e.port = port;
+    throw e;
+  }
+}
+
+class ServerThread : Thread {
+  this(ExTestHandler handler, TCancellation cancellation) {
+    super(&run);
+    handler_ = handler;
+    cancellation_ = cancellation;
+  }
+private:
+  void run() {
+    try {
+      auto protocolFactory = new TBinaryProtocolFactory!();
+      auto processor = new TServiceProcessor!ExTestService(handler_);
+      auto serverTransport = new TServerSocket(handler_.port);
+      serverTransport.recvTimeout = dur!"seconds"(3);
+      auto transportFactory = new TBufferedTransportFactory;
+
+      auto server = new TSimpleServer(
+        processor, serverTransport, transportFactory, protocolFactory);
+      server.serve(cancellation_);
+    } catch (Exception e) {
+      writefln("Server thread on port %s failed: %s", handler_.port, e);
+    }
+  }
+
+  TCancellation cancellation_;
+  ExTestHandler handler_;
+}
+
+void main(string[] args) {
+  bool trace;
+  ushort port = 9090;
+  getopt(args, "port", &port, "trace", &trace);
+
+  auto serverCancellation = new TCancellationOrigin;
+  scope (exit) serverCancellation.trigger();
+
+  immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6)));
+
+version (none) {
+  // Cannot use this due to multiple DMD @@BUG@@s:
+  // 1. »function D main is a nested function and cannot be accessed from array«
+  //    when calling array() on the result of the outer map() – would have to
+  //    manually do the eager evaluation/array conversion.
+  // 2. »Zip.opSlice cannot get frame pointer to map« for the delay argument,
+  //    can be worked around by calling array() on the map result first.
+  // 3. Even when using the workarounds for the last two points, the DMD-built
+  //    executable crashes when building without (sic!) inlining enabled,
+  //    the backtrace points into the first delegate literal.
+  auto handlers = array(map!((args){
+    return new ExTestHandler(args._0, args._1, args._2, trace);
+  })(zip(
+    ports,
+    map!((a){ return dur!`msecs`(a); })([1, 10, 100, 1, 10, 100]),
+    [false, false, false, true, true, true]
+  )));
+} else {
+  auto handlers = [
+    new ExTestHandler(cast(ushort)(port + 0), dur!"msecs"(1), false, trace),
+    new ExTestHandler(cast(ushort)(port + 1), dur!"msecs"(10), false, trace),
+    new ExTestHandler(cast(ushort)(port + 2), dur!"msecs"(100), false, trace),
+    new ExTestHandler(cast(ushort)(port + 3), dur!"msecs"(1), true, trace),
+    new ExTestHandler(cast(ushort)(port + 4), dur!"msecs"(10), true, trace),
+    new ExTestHandler(cast(ushort)(port + 5), dur!"msecs"(100), true, trace)
+  ];
+}
+
+  // Fire up the server threads.
+  foreach (h; handlers) (new ServerThread(h, serverCancellation)).start();
+
+  // Give the servers some time to get up. This should really be accomplished
+  // via a barrier here and in the preServe() hook.
+  Thread.sleep(dur!"msecs"(10));
+
+  syncClientPoolTest(ports, handlers);
+  asyncClientPoolTest(ports, handlers);
+  asyncFastestClientPoolTest(ports, handlers);
+  asyncAggregatorTest(ports, handlers);
+}
+
+
+void syncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
+  auto clients = array(map!((a){
+    return cast(TClientBase!ExTestService)tClient!ExTestService(
+      tBinaryProtocol(new TSocket("127.0.0.1", a))
+    );
+  })(ports));
+
+  scope(exit) foreach (c; clients) c.outputProtocol.transport.close();
+
+  // Try the case where the first client succeeds.
+  {
+    enforce(makePool(clients).getPort() == ports[0]);
+  }
+
+  // Try the case where all clients fail.
+  {
+    auto pool = makePool(clients[3 .. $]);
+    auto e = cast(TCompoundOperationException)collectException(pool.getPort());
+    enforce(e);
+    enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
+      ports[3 .. $]));
+  }
+
+  // Try the case where the first clients fail, but a later one succeeds.
+  {
+    auto pool = makePool(clients[3 .. $] ~ clients[0 .. 3]);
+    enforce(pool.getPortInArray() == [ports[0]]);
+  }
+
+  // Make sure a client is properly deactivated when it has failed too often.
+  {
+    auto pool = makePool(clients);
+    pool.faultDisableCount = 1;
+    pool.faultDisableDuration = dur!"msecs"(50);
+
+    handlers[0].failing = true;
+    enforce(pool.getPort() == ports[1]);
+
+    handlers[0].failing = false;
+    enforce(pool.getPort() == ports[1]);
+
+    Thread.sleep(dur!"msecs"(50));
+    enforce(pool.getPort() == ports[0]);
+  }
+}
+
+auto makePool(TClientBase!ExTestService[] clients) {
+  auto p = tClientPool(clients);
+  p.permuteClients = false;
+  p.rpcFaultFilter = (Exception e) {
+    return (cast(TestServiceException)e !is null);
+  };
+  return p;
+}
+
+
+void asyncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
+  auto manager = new TLibeventAsyncManager;
+  scope (exit) manager.stop(dur!"hnsecs"(0));
+
+  auto clients = makeAsyncClients(manager, ports);
+  scope(exit) foreach (c; clients) c.transport.close();
+
+  // Try the case where the first client succeeds.
+  {
+    enforce(makeAsyncPool(clients).getPort() == ports[0]);
+  }
+
+  // Try the case where all clients fail.
+  {
+    auto pool = makeAsyncPool(clients[3 .. $]);
+    auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
+    enforce(e);
+    enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
+      ports[3 .. $]));
+  }
+
+  // Try the case where the first clients fail, but a later one succeeds.
+  {
+    auto pool = makeAsyncPool(clients[3 .. $] ~ clients[0 .. 3]);
+    enforce(pool.getPortInArray() == [ports[0]]);
+  }
+
+  // Make sure a client is properly deactivated when it has failed too often.
+  {
+    auto pool = makeAsyncPool(clients);
+    pool.faultDisableCount = 1;
+    pool.faultDisableDuration = dur!"msecs"(50);
+
+    handlers[0].failing = true;
+    enforce(pool.getPort() == ports[1]);
+
+    handlers[0].failing = false;
+    enforce(pool.getPort() == ports[1]);
+
+    Thread.sleep(dur!"msecs"(50));
+    enforce(pool.getPort() == ports[0]);
+  }
+}
+
+auto makeAsyncPool(TAsyncClientBase!ExTestService[] clients) {
+  auto p = tAsyncClientPool(clients);
+  p.permuteClients = false;
+  p.rpcFaultFilter = (Exception e) {
+    return (cast(TestServiceException)e !is null);
+  };
+  return p;
+}
+
+auto makeAsyncClients(TLibeventAsyncManager manager, in ushort[] ports) {
+  // DMD @@BUG@@ workaround: Using array on the lazyHandlers map result leads
+  // to »function D main is a nested function and cannot be accessed from array«.
+  // Thus, we manually do the array conversion.
+  auto lazyClients = map!((a){
+    return new TAsyncClient!ExTestService(
+      new TAsyncSocket(manager, "127.0.0.1", a),
+      new TBufferedTransportFactory,
+      new TBinaryProtocolFactory!(TBufferedTransport)
+    );
+  })(ports);
+  TAsyncClientBase!ExTestService[] clients;
+  foreach (c; lazyClients) clients ~= c;
+  return clients;
+}
+
+
+void asyncFastestClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
+  auto manager = new TLibeventAsyncManager;
+  scope (exit) manager.stop(dur!"hnsecs"(0));
+
+  auto clients = makeAsyncClients(manager, ports);
+  scope(exit) foreach (c; clients) c.transport.close();
+
+  // Make sure the fastest client wins, even if they are called in some other
+  // order.
+  {
+    auto result = makeAsyncFastestPool(array(retro(clients))).getPort().waitGet();
+    enforce(result == ports[0]);
+  }
+
+  // Try the case where all clients fail.
+  {
+    auto pool = makeAsyncFastestPool(clients[3 .. $]);
+    auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
+    enforce(e);
+    enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
+      ports[3 .. $]));
+  }
+
+  // Try the case where the first clients fail, but a later one succeeds.
+  {
+    auto pool = makeAsyncFastestPool(clients[1 .. $]);
+    enforce(pool.getPortInArray() == [ports[1]]);
+  }
+}
+
+auto makeAsyncFastestPool(TAsyncClientBase!ExTestService[] clients) {
+  auto p = tAsyncFastestClientPool(clients);
+  p.rpcFaultFilter = (Exception e) {
+    return (cast(TestServiceException)e !is null);
+  };
+  return p;
+}
+
+
+void asyncAggregatorTest(const(ushort)[] ports, ExTestHandler[] handlers) {
+  auto manager = new TLibeventAsyncManager;
+  scope (exit) manager.stop(dur!"hnsecs"(0));
+
+  auto clients = makeAsyncClients(manager, ports);
+  scope(exit) foreach (c; clients) c.transport.close();
+
+  auto aggregator = tAsyncAggregator(
+    cast(TAsyncClientBase!ExTestService[])clients);
+
+  // Test aggregator range interface.
+  {
+    auto range = aggregator.getPort().range(dur!"msecs"(50));
+    enforce(equal(range, ports[0 .. 2][]));
+    enforce(equal(map!"a.port"(cast(TestServiceException[])range.exceptions),
+      ports[3 .. $ - 1]));
+    enforce(range.completedCount == 4);
+  }
+
+  // Test default accumulator for scalars.
+  {
+    auto fullResult = aggregator.getPort().accumulate();
+    enforce(fullResult.waitGet() == ports[0 .. 3]);
+
+    auto partialResult = aggregator.getPort().accumulate();
+    Thread.sleep(dur!"msecs"(20));
+    enforce(partialResult.finishGet() == ports[0 .. 2]);
+
+  }
+
+  // Test default accumulator for arrays.
+  {
+    auto fullResult = aggregator.getPortInArray().accumulate();
+    enforce(fullResult.waitGet() == ports[0 .. 3]);
+
+    auto partialResult = aggregator.getPortInArray().accumulate();
+    Thread.sleep(dur!"msecs"(20));
+    enforce(partialResult.finishGet() == ports[0 .. 2]);
+  }
+
+  // Test custom accumulator.
+  {
+    auto fullResult = aggregator.getPort().accumulate!(function(int[] results){
+      return reduce!"a + b"(results);
+    })();
+    enforce(fullResult.waitGet() == ports[0] + ports[1] + ports[2]);
+
+    auto partialResult = aggregator.getPort().accumulate!(
+      function(int[] results, Exception[] exceptions) {
+        // Return a tuple of the parameters so we can check them outside of
+        // this function (to verify the values, we need access to »ports«, but
+        // due to DMD @@BUG5710@@, we can't use a delegate literal).f
+        return tuple(results, exceptions);
+      }
+    )();
+    Thread.sleep(dur!"msecs"(20));
+    auto resultTuple = partialResult.finishGet();
+    enforce(resultTuple._0 == ports[0 .. 2]);
+    enforce(equal(map!"a.port"(cast(TestServiceException[])resultTuple._1),
+      ports[3 .. $ - 1]));
+  }
+}

Added: thrift/trunk/lib/d/test/openssl.test.cnf
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/test/openssl.test.cnf?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/test/openssl.test.cnf (added)
+++ thrift/trunk/lib/d/test/openssl.test.cnf Thu Mar 22 21:49:10 2012
@@ -0,0 +1,14 @@
+[ req ]
+default_bits = 2048
+default_keyfile = server-private-key.pem
+distinguished_name = req_distinguished_name
+x509_extensions = v3_ca
+prompt = no
+
+[ req_distinguished_name ]
+CN = localhost
+
+[ v3_ca ]
+# Add ::1 to the list of allowed IPs so we can use ::1 to explicitly connect
+# to localhost via IPv6.
+subjectAltName = IP:::1

Added: thrift/trunk/lib/d/test/serialization_benchmark.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/test/serialization_benchmark.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/test/serialization_benchmark.d (added)
+++ thrift/trunk/lib/d/test/serialization_benchmark.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,70 @@
+/**
+ * An implementation of the mini serialization benchmark also available for
+ * C++ and Java.
+ *
+ * For meaningful results, you might want to make sure that
+ * the Thrift library is compiled with release build flags,
+ * e.g. by including the source files with the build instead
+ * of linking libthriftd:
+ *
+   dmd -w -O -release -inline -I../src -Igen-d -ofserialization_benchmark \
+   $(find ../src/thrift -name '*.d' -not -name index.d) \
+   gen-d/DebugProtoTest_types.d serialization_benchmark.d
+ */
+module serialization_benchmark;
+
+import std.datetime : AutoStart, StopWatch;
+import std.math : PI;
+import std.stdio;
+import thrift.protocol.binary;
+import thrift.transport.memory;
+import thrift.transport.range;
+import DebugProtoTest_types;
+
+void main() {
+  auto buf = new TMemoryBuffer;
+  enum ITERATIONS = 10_000_000;
+
+  {
+    auto ooe = OneOfEach();
+    ooe.im_true   = true;
+    ooe.im_false  = false;
+    ooe.a_bite    = 0x7f;
+    ooe.integer16 = 27_000;
+    ooe.integer32 = 1 << 24;
+    ooe.integer64 = 6_000_000_000;
+    ooe.double_precision = PI;
+    ooe.some_characters = "JSON THIS! \"\1";
+    ooe.zomg_unicode = "\xd7\n\a\t";
+    ooe.base64 = "\1\2\3\255";
+
+    auto prot = tBinaryProtocol(buf);
+    auto sw = StopWatch(AutoStart.yes);
+    foreach (i; 0 .. ITERATIONS) {
+      buf.reset(120);
+      ooe.write(prot);
+    }
+    sw.stop();
+
+    auto msecs = sw.peek().msecs;
+    writefln("Write: %s ms (%s kHz)", msecs, ITERATIONS / msecs);
+  }
+
+  auto data = buf.getContents().dup;
+
+  {
+    auto readBuf = tInputRangeTransport(data);
+    auto prot = tBinaryProtocol(readBuf);
+    auto ooe = OneOfEach();
+
+    auto sw = StopWatch(AutoStart.yes);
+    foreach (i; 0 .. ITERATIONS) {
+      readBuf.reset(data);
+      ooe.read(prot);
+    }
+    sw.stop();
+
+    auto msecs = sw.peek().msecs;
+    writefln(" Read: %s ms (%s kHz)", msecs, ITERATIONS / msecs);
+  }
+}

Added: thrift/trunk/lib/d/test/stress_test_server.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/test/stress_test_server.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/test/stress_test_server.d (added)
+++ thrift/trunk/lib/d/test/stress_test_server.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module stress_test_server;
+
+import std.getopt;
+import std.parallelism : totalCPUs;
+import std.stdio;
+import std.typetuple;
+import thrift.codegen.processor;
+import thrift.protocol.binary;
+import thrift.server.base;
+import thrift.server.transport.socket;
+import thrift.transport.buffered;
+import thrift.transport.memory;
+import thrift.transport.socket;
+import thrift.util.hashset;
+import test_utils;
+
+import thrift.test.stress.Service;
+
+class ServiceHandler : Service {
+  void echoVoid() { return; }
+  byte echoByte(byte arg) { return arg; }
+  int echoI32(int arg) { return arg; }
+  long echoI64(long arg) { return arg; }
+  byte[] echoList(byte[] arg) { return arg; }
+  HashSet!byte echoSet(HashSet!byte arg) { return arg; }
+  byte[byte] echoMap(byte[byte] arg) { return arg; }
+
+  string echoString(string arg) {
+    if (arg != "hello") {
+      stderr.writefln(`Wrong string received: %s instead of "hello"`, arg);
+      throw new Exception("Wrong string received.");
+    }
+    return arg;
+  }
+}
+
+void main(string[] args) {
+  ushort port = 9091;
+  auto serverType = ServerType.threaded;
+  TransportType transportType;
+  size_t numIOThreads = 1;
+  size_t taskPoolSize = totalCPUs;
+
+  getopt(args, "port", &port, "server-type", &serverType,
+    "transport-type", &transportType, "task-pool-size", &taskPoolSize,
+    "num-io-threads", &numIOThreads);
+
+  alias TypeTuple!(TBufferedTransport, TMemoryBuffer) AvailableTransports;
+
+  auto processor = new TServiceProcessor!(Service,
+    staticMap!(TBinaryProtocol, AvailableTransports))(new ServiceHandler());
+  auto serverSocket = new TServerSocket(port);
+  auto transportFactory = createTransportFactory(transportType);
+  auto protocolFactory = new TBinaryProtocolFactory!AvailableTransports;
+
+  auto server = createServer(serverType, taskPoolSize, numIOThreads,
+    processor, serverSocket, transportFactory, protocolFactory);
+
+  writefln("Starting %s %s StressTest server on port %s...", transportType,
+    serverType, port);
+  server.serve();
+  writeln("done.");
+}

Added: thrift/trunk/lib/d/test/test_utils.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/test/test_utils.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/test/test_utils.d (added)
+++ thrift/trunk/lib/d/test/test_utils.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Various helpers used by more than a single test.
+ */
+module test_utils;
+
+import std.parallelism : TaskPool;
+import thrift.protocol.base;
+import thrift.protocol.processor;
+import thrift.server.base;
+import thrift.server.nonblocking;
+import thrift.server.simple;
+import thrift.server.taskpool;
+import thrift.server.threaded;
+import thrift.server.transport.socket;
+import thrift.transport.base;
+import thrift.transport.buffered;
+import thrift.transport.framed;
+import thrift.transport.http;
+
+// This is a likely victim of @@BUG4744@@ when used with command argument
+// parsing.
+enum ServerType {
+  simple,
+  nonblocking,
+  pooledNonblocking,
+  taskpool,
+  threaded
+}
+
+TServer createServer(ServerType type, size_t taskPoolSize, size_t numIOThreads,
+  TProcessor processor, TServerSocket serverTransport,
+  TTransportFactory transportFactory, TProtocolFactory protocolFactory)
+{
+  final switch (type) {
+    case ServerType.simple:
+      return new TSimpleServer(processor, serverTransport,
+        transportFactory, protocolFactory);
+    case ServerType.nonblocking:
+      auto nb = new TNonblockingServer(processor, serverTransport.port,
+        transportFactory, protocolFactory);
+      nb.numIOThreads = numIOThreads;
+      return nb;
+    case ServerType.pooledNonblocking:
+      auto nb = new TNonblockingServer(processor, serverTransport.port,
+        transportFactory, protocolFactory, new TaskPool(taskPoolSize));
+      nb.numIOThreads = numIOThreads;
+      return nb;
+    case ServerType.taskpool:
+      auto tps = new TTaskPoolServer(processor, serverTransport,
+        transportFactory, protocolFactory);
+      tps.taskPool = new TaskPool(taskPoolSize);
+      return tps;
+    case ServerType.threaded:
+      return new TThreadedServer(processor, serverTransport,
+        transportFactory, protocolFactory);
+  }
+}
+
+enum TransportType {
+  buffered,
+  framed,
+  http,
+  raw
+}
+
+TTransportFactory createTransportFactory(TransportType type) {
+  final switch (type) {
+    case TransportType.buffered:
+      return new TBufferedTransportFactory;
+    case TransportType.framed:
+      return new TFramedTransportFactory;
+    case TransportType.http:
+      return new TServerHttpTransportFactory;
+    case TransportType.raw:
+      return new TTransportFactory;
+  }
+}

Added: thrift/trunk/lib/d/test/thrift_test_client.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/test/thrift_test_client.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/test/thrift_test_client.d (added)
+++ thrift/trunk/lib/d/test/thrift_test_client.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift_test_client;
+
+import std.conv;
+import std.datetime;
+import std.exception : enforce;
+import std.getopt;
+import std.stdio;
+import std.string;
+import std.traits;
+import thrift.codegen.client;
+import thrift.protocol.base;
+import thrift.protocol.binary;
+import thrift.protocol.compact;
+import thrift.protocol.json;
+import thrift.transport.base;
+import thrift.transport.buffered;
+import thrift.transport.framed;
+import thrift.transport.http;
+import thrift.transport.socket;
+import thrift.transport.ssl;
+import thrift.util.hashset;
+
+import thrift_test_common;
+import thrift.test.ThriftTest;
+import thrift.test.ThriftTest_types;
+
+enum TransportType {
+  buffered,
+  framed,
+  http,
+  raw
+}
+
+TProtocol createProtocol(T)(T trans, ProtocolType type) {
+  final switch (type) {
+    case ProtocolType.binary:
+      return tBinaryProtocol(trans);
+    case ProtocolType.compact:
+      return tCompactProtocol(trans);
+    case ProtocolType.json:
+      return tJsonProtocol(trans);
+  }
+}
+
+void main(string[] args) {
+  string host = "localhost";
+  ushort port = 9090;
+  uint numTests = 1;
+  bool ssl;
+  ProtocolType protocolType;
+  TransportType transportType;
+  bool trace;
+
+  getopt(args,
+    "numTests|n", &numTests,
+    "protocol", &protocolType,
+    "ssl", &ssl,
+    "transport", &transportType,
+    "trace", &trace,
+    "host", (string _, string value) {
+      auto parts = split(value, ":");
+      if (parts.length > 1) {
+        // IPv6 addresses can contain colons, so take the last part for the
+        // port.
+        host = join(parts[0 .. $ - 1], ":");
+        port = to!ushort(parts[$ - 1]);
+      } else {
+        host = value;
+      }
+    }
+  );
+
+  TSocket socket;
+  if (ssl) {
+    auto sslContext = new TSSLContext();
+    sslContext.ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH";
+    sslContext.authenticate = true;
+    sslContext.loadTrustedCertificates("./trusted-ca-certificate.pem");
+    socket = new TSSLSocket(sslContext, host, port);
+  } else {
+    socket = new TSocket(host, port);
+  }
+
+  TProtocol protocol;
+  final switch (transportType) {
+    case TransportType.buffered:
+      protocol = createProtocol(new TBufferedTransport(socket), protocolType);
+      break;
+    case TransportType.framed:
+      protocol = createProtocol(new TFramedTransport(socket), protocolType);
+      break;
+    case TransportType.http:
+      protocol = createProtocol(
+        new TClientHttpTransport(socket, host, "/service"), protocolType);
+      break;
+    case TransportType.raw:
+      protocol = createProtocol(socket, protocolType);
+      break;
+  }
+
+  auto client = tClient!ThriftTest(protocol);
+
+  ulong time_min;
+  ulong time_max;
+  ulong time_tot;
+
+  StopWatch sw;
+  foreach(test; 0 .. numTests) {
+    sw.start();
+
+    protocol.transport.open();
+
+    if (trace) writefln("Test #%s, connect %s:%s", test + 1, host, port);
+
+    if (trace) write("testVoid()");
+    client.testVoid();
+    if (trace) writeln(" = void");
+
+    if (trace) write("testString(\"Test\")");
+    string s = client.testString("Test");
+    if (trace) writefln(" = \"%s\"", s);
+    enforce(s == "Test");
+
+    if (trace) write("testByte(1)");
+    byte u8 = client.testByte(1);
+    if (trace) writefln(" = %s", u8);
+    enforce(u8 == 1);
+
+    if (trace) write("testI32(-1)");
+    int i32 = client.testI32(-1);
+    if (trace) writefln(" = %s", i32);
+    enforce(i32 == -1);
+
+    if (trace) write("testI64(-34359738368)");
+    long i64 = client.testI64(-34359738368L);
+    if (trace) writefln(" = %s", i64);
+    enforce(i64 == -34359738368L);
+
+    if (trace) write("testDouble(-5.2098523)");
+    double dub = client.testDouble(-5.2098523);
+    if (trace) writefln(" = %s", dub);
+    enforce(dub == -5.2098523);
+
+    Xtruct out1;
+    out1.string_thing = "Zero";
+    out1.byte_thing = 1;
+    out1.i32_thing = -3;
+    out1.i64_thing = -5;
+    if (trace) writef("testStruct(%s)", out1);
+    auto in1 = client.testStruct(out1);
+    if (trace) writefln(" = %s", in1);
+    enforce(in1 == out1);
+
+    if (trace) write("testNest({1, {\"Zero\", 1, -3, -5}), 5}");
+    Xtruct2 out2;
+    out2.byte_thing = 1;
+    out2.struct_thing = out1;
+    out2.i32_thing = 5;
+    auto in2 = client.testNest(out2);
+    in1 = in2.struct_thing;
+    if (trace) writefln(" = {%s, {\"%s\", %s, %s, %s}, %s}", in2.byte_thing,
+      in1.string_thing, in1.byte_thing, in1.i32_thing, in1.i64_thing,
+      in2.i32_thing);
+    enforce(in2 == out2);
+
+    int[int] mapout;
+    for (int i = 0; i < 5; ++i) {
+      mapout[i] = i - 10;
+    }
+    if (trace) writef("testMap({%s})", mapout);
+    auto mapin = client.testMap(mapout);
+    if (trace) writefln(" = {%s}", mapin);
+    enforce(mapin == mapout);
+
+    auto setout = new HashSet!int;
+    for (int i = -2; i < 3; ++i) {
+      setout ~= i;
+    }
+    if (trace) writef("testSet(%s)", setout);
+    auto setin = client.testSet(setout);
+    if (trace) writefln(" = %s", setin);
+    enforce(setin == setout);
+
+    int[] listout;
+    for (int i = -2; i < 3; ++i) {
+      listout ~= i;
+    }
+    if (trace) writef("testList(%s)", listout);
+    auto listin = client.testList(listout);
+    if (trace) writefln(" = %s", listin);
+    enforce(listin == listout);
+
+    {
+      if (trace) write("testEnum(ONE)");
+      auto ret = client.testEnum(Numberz.ONE);
+      if (trace) writefln(" = %s", ret);
+      enforce(ret == Numberz.ONE);
+
+      if (trace) write("testEnum(TWO)");
+      ret = client.testEnum(Numberz.TWO);
+      if (trace) writefln(" = %s", ret);
+      enforce(ret == Numberz.TWO);
+
+      if (trace) write("testEnum(THREE)");
+      ret = client.testEnum(Numberz.THREE);
+      if (trace) writefln(" = %s", ret);
+      enforce(ret == Numberz.THREE);
+
+      if (trace) write("testEnum(FIVE)");
+      ret = client.testEnum(Numberz.FIVE);
+      if (trace) writefln(" = %s", ret);
+      enforce(ret == Numberz.FIVE);
+
+      if (trace) write("testEnum(EIGHT)");
+      ret = client.testEnum(Numberz.EIGHT);
+      if (trace) writefln(" = %s", ret);
+      enforce(ret == Numberz.EIGHT);
+    }
+
+    if (trace) write("testTypedef(309858235082523)");
+    UserId uid = client.testTypedef(309858235082523L);
+    if (trace) writefln(" = %s", uid);
+    enforce(uid == 309858235082523L);
+
+    if (trace) write("testMapMap(1)");
+    auto mm = client.testMapMap(1);
+    if (trace) writefln(" = {%s}", mm);
+    // Simply doing == doesn't seem to work for nested AAs.
+    foreach (key, value; mm) {
+      enforce(testMapMapReturn[key] == value);
+    }
+    foreach (key, value; testMapMapReturn) {
+      enforce(mm[key] == value);
+    }
+
+    Insanity insane;
+    insane.userMap[Numberz.FIVE] = 5000;
+    Xtruct truck;
+    truck.string_thing = "Truck";
+    truck.byte_thing = 8;
+    truck.i32_thing = 8;
+    truck.i64_thing = 8;
+    insane.xtructs ~= truck;
+    if (trace) write("testInsanity()");
+    auto whoa = client.testInsanity(insane);
+    if (trace) writefln(" = %s", whoa);
+
+    // Commented for now, this is cumbersome to write without opEqual getting
+    // called on AA comparison.
+    // enforce(whoa == testInsanityReturn);
+
+    {
+      try {
+        if (trace) write("client.testException(\"Xception\") =>");
+        client.testException("Xception");
+        if (trace) writeln("  void\nFAILURE");
+        throw new Exception("testException failed.");
+      } catch (Xception e) {
+        if (trace) writefln("  {%s, \"%s\"}", e.errorCode, e.message);
+      }
+
+      try {
+        if (trace) write("client.testException(\"success\") =>");
+        client.testException("success");
+        if (trace) writeln("  void");
+      } catch (Exception e) {
+        if (trace) writeln("  exception\nFAILURE");
+        throw new Exception("testException failed.");
+      }
+    }
+
+    {
+      try {
+        if (trace) write("client.testMultiException(\"Xception\", \"test 1\") =>");
+        auto result = client.testMultiException("Xception", "test 1");
+        if (trace) writeln("  result\nFAILURE");
+        throw new Exception("testMultiException failed.");
+      } catch (Xception e) {
+        if (trace) writefln("  {%s, \"%s\"}", e.errorCode, e.message);
+      }
+
+      try {
+        if (trace) write("client.testMultiException(\"Xception2\", \"test 2\") =>");
+        auto result = client.testMultiException("Xception2", "test 2");
+        if (trace) writeln("  result\nFAILURE");
+        throw new Exception("testMultiException failed.");
+      } catch (Xception2 e) {
+        if (trace) writefln("  {%s, {\"%s\"}}",
+          e.errorCode, e.struct_thing.string_thing);
+      }
+
+      try {
+        if (trace) writef("client.testMultiException(\"success\", \"test 3\") =>");
+        auto result = client.testMultiException("success", "test 3");
+        if (trace) writefln("  {{\"%s\"}}", result.string_thing);
+      } catch (Exception e) {
+        if (trace) writeln("  exception\nFAILURE");
+        throw new Exception("testMultiException failed.");
+      }
+    }
+
+    // Do not run oneway test when doing multiple iterations, as it blocks the
+    // server for three seconds.
+    if (numTests == 1) {
+      if (trace) writef("client.testOneway(3) =>");
+      auto onewayWatch = StopWatch(AutoStart.yes);
+      client.testOneway(3);
+      onewayWatch.stop();
+      if (onewayWatch.peek().msecs > 200) {
+        if (trace) {
+          writefln("  FAILURE - took %s ms", onewayWatch.peek().usecs / 1000.0);
+        }
+        throw new Exception("testOneway failed.");
+      } else {
+        if (trace) {
+          writefln("  success - took %s ms", onewayWatch.peek().usecs / 1000.0);
+        }
+      }
+
+      // Redo a simple test after the oneway to make sure we aren't "off by
+      // one", which would be the case if the server treated oneway methods
+      // like normal ones.
+      if (trace) write("re-test testI32(-1)");
+      i32 = client.testI32(-1);
+      if (trace) writefln(" = %s", i32);
+    }
+
+    // Time metering.
+    sw.stop();
+
+    immutable tot = sw.peek().usecs;
+    if (trace) writefln("Total time: %s us\n", tot);
+
+    time_tot += tot;
+    if (time_min == 0 || tot < time_min) {
+      time_min = tot;
+    }
+    if (tot > time_max) {
+      time_max = tot;
+    }
+    protocol.transport.close();
+
+    sw.reset();
+  }
+
+  writeln("All tests done.");
+
+  if (numTests > 1) {
+    auto time_avg = time_tot / numTests;
+    writefln("Min time: %s us", time_min);
+    writefln("Max time: %s us", time_max);
+    writefln("Avg time: %s us", time_avg);
+  }
+}

Added: thrift/trunk/lib/d/test/thrift_test_common.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/test/thrift_test_common.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/test/thrift_test_common.d (added)
+++ thrift/trunk/lib/d/test/thrift_test_common.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,92 @@
+module thrift_test_common;
+
+import std.stdio;
+import thrift.test.ThriftTest_types;
+
+enum ProtocolType {
+  binary,
+  compact,
+  json
+}
+
+void writeInsanityReturn(in Insanity[Numberz][UserId] insane) {
+  write("{");
+  foreach(key1, value1; insane) {
+    writef("%s => {", key1);
+    foreach(key2, value2; value1) {
+      writef("%s => {", key2);
+      write("{");
+      foreach(key3, value3; value2.userMap) {
+        writef("%s => %s, ", key3, value3);
+      }
+      write("}, ");
+
+      write("{");
+      foreach (x; value2.xtructs) {
+        writef("{\"%s\", %s, %s, %s}, ",
+          x.string_thing, x.byte_thing, x.i32_thing, x.i64_thing);
+      }
+      write("}");
+
+      write("}, ");
+    }
+    write("}, ");
+  }
+  write("}");
+}
+
+Insanity[Numberz][UserId] testInsanityReturn;
+int[int][int] testMapMapReturn;
+
+static this() {
+  testInsanityReturn = {
+    Insanity[Numberz][UserId] insane;
+
+    Xtruct hello;
+    hello.string_thing = "Hello2";
+    hello.byte_thing = 2;
+    hello.i32_thing = 2;
+    hello.i64_thing = 2;
+
+    Xtruct goodbye;
+    goodbye.string_thing = "Goodbye4";
+    goodbye.byte_thing = 4;
+    goodbye.i32_thing = 4;
+    goodbye.i64_thing = 4;
+
+    Insanity crazy;
+    crazy.userMap[Numberz.EIGHT] = 8;
+    crazy.xtructs ~= goodbye;
+
+    Insanity looney;
+    // The C++ TestServer also assigns these to crazy, but that is probably
+    // an oversight.
+    looney.userMap[Numberz.FIVE] = 5;
+    looney.xtructs ~= hello;
+
+    Insanity[Numberz] first_map;
+    first_map[Numberz.TWO] = crazy;
+    first_map[Numberz.THREE] = crazy;
+    insane[1] = first_map;
+
+    Insanity[Numberz] second_map;
+    second_map[Numberz.SIX] = looney;
+    insane[2] = second_map;
+    return insane;
+  }();
+
+  testMapMapReturn = {
+    int[int] pos;
+    int[int] neg;
+
+    for (int i = 1; i < 5; i++) {
+      pos[i] = i;
+      neg[-i] = -i;
+    }
+
+    int[int][int] result;
+    result[4] = pos;
+    result[-4] = neg;
+    return result;
+  }();
+}

Added: thrift/trunk/lib/d/test/thrift_test_runner.sh
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/test/thrift_test_runner.sh?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/test/thrift_test_runner.sh (added)
+++ thrift/trunk/lib/d/test/thrift_test_runner.sh Thu Mar 22 21:49:10 2012
@@ -0,0 +1,65 @@
+#!/bin/bash
+# Runs the D ThriftTest client and servers for all combinations of transport,
+# protocol, SSL-mode and server type.
+# Pass -k to keep going after failed tests.
+
+protocols="binary compact json"
+transports="buffered framed http raw"
+servers="simple taskpool threaded"
+framed_only_servers="nonblocking pooledNonblocking"
+
+# Don't leave any server instances behind when interrupted (e.g. by Ctrl+C)
+# or terminated.
+trap "kill $(jobs -p) 2>/dev/null" INT TERM
+
+for protocol in $protocols; do
+  for ssl in "" " --ssl"; do
+    for transport in $transports; do
+      for server in $servers $framed_only_servers; do
+        case $framed_only_servers in
+          *$server*) if [ $transport != "framed" ] || [ $ssl != "" ]; then continue; fi;;
+        esac
+
+        args="--transport=$transport --protocol=$protocol$ssl"
+        ./thrift_test_server $args --server-type=$server > /dev/null &
+        server_pid=$!
+
+        # Give the server some time to get up and check if it runs (yes, this
+        # is a huge kludge, should add a connect timeout to test client).
+        client_rc=-1
+        sleep 0.01
+        kill -0 $server_pid 2>/dev/null
+        if [ $? -eq 0 ]; then
+          ./thrift_test_client $args --numTests=10 > /dev/null
+          client_rc=$?
+
+          # Temporarily redirect stderr to null to avoid job control messages,
+          # restore it afterwards.
+          exec 3>&2
+          exec 2>/dev/null
+          kill $server_pid
+          exec 3>&2
+        fi
+
+        # Get the server exit code (wait should immediately return).
+        wait $server_pid
+        server_rc=$?
+
+        if [ $client_rc -ne 0 -o $server_rc -eq 1 ]; then
+          echo -e "\nTests failed for: $args --server-type=$server"
+          failed="true"
+          if [ "$1" != "-k" ]; then
+            exit 1
+          fi
+        else
+           echo -n "."
+        fi
+      done
+    done
+  done
+done
+
+echo
+if [ -z "$failed" ]; then
+  echo "All tests passed."
+fi

Added: thrift/trunk/lib/d/test/thrift_test_server.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/test/thrift_test_server.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/test/thrift_test_server.d (added)
+++ thrift/trunk/lib/d/test/thrift_test_server.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift_test_server;
+
+import core.thread : dur, Thread;
+import std.algorithm;
+import std.exception : enforce;
+import std.getopt;
+import std.parallelism : totalCPUs;
+import std.string;
+import std.stdio;
+import std.typetuple : TypeTuple, staticMap;
+import thrift.base;
+import thrift.codegen.processor;
+import thrift.protocol.base;
+import thrift.protocol.binary;
+import thrift.protocol.compact;
+import thrift.protocol.json;
+import thrift.server.base;
+import thrift.server.transport.socket;
+import thrift.server.transport.ssl;
+import thrift.transport.base;
+import thrift.transport.buffered;
+import thrift.transport.framed;
+import thrift.transport.http;
+import thrift.transport.ssl;
+import thrift.util.hashset;
+import test_utils;
+
+import thrift_test_common;
+import thrift.test.ThriftTest_types;
+import thrift.test.ThriftTest;
+
+class TestHandler : ThriftTest {
+  this(bool trace) {
+    trace_ = trace;
+  }
+
+  override void testVoid() {
+    if (trace_) writeln("testVoid()");
+  }
+
+  override string testString(string thing) {
+    if (trace_) writefln("testString(\"%s\")", thing);
+    return thing;
+  }
+
+  override byte testByte(byte thing) {
+    if (trace_) writefln("testByte(%s)", thing);
+    return thing;
+  }
+
+  override int testI32(int thing) {
+    if (trace_) writefln("testI32(%s)", thing);
+    return thing;
+  }
+
+  override long testI64(long thing) {
+    if (trace_) writefln("testI64(%s)", thing);
+    return thing;
+  }
+
+  override double testDouble(double thing) {
+    if (trace_) writefln("testDouble(%s)", thing);
+    return thing;
+  }
+
+  override Xtruct testStruct(ref const(Xtruct) thing) {
+    if (trace_) writefln("testStruct({\"%s\", %s, %s, %s})",
+      thing.string_thing, thing.byte_thing, thing.i32_thing, thing.i64_thing);
+    return thing;
+  }
+
+  override Xtruct2 testNest(ref const(Xtruct2) nest) {
+    auto thing = nest.struct_thing;
+    if (trace_) writefln("testNest({%s, {\"%s\", %s, %s, %s}, %s})",
+      nest.byte_thing, thing.string_thing, thing.byte_thing, thing.i32_thing,
+      thing.i64_thing, nest.i32_thing);
+    return nest;
+  }
+
+  override int[int] testMap(int[int] thing) {
+    if (trace_) writefln("testMap({%s})", thing);
+    return thing;
+  }
+
+  override HashSet!int testSet(HashSet!int thing) {
+    if (trace_) writefln("testSet({%s})",
+      join(map!`to!string(a)`(thing[]), ", "));
+    return thing;
+  }
+
+  override int[] testList(int[] thing) {
+    if (trace_) writefln("testList(%s)", thing);
+    return thing;
+  }
+
+  override Numberz testEnum(Numberz thing) {
+    if (trace_) writefln("testEnum(%s)", thing);
+    return thing;
+  }
+
+  override UserId testTypedef(UserId thing) {
+    if (trace_) writefln("testTypedef(%s)", thing);
+    return thing;
+  }
+
+  override string[string] testStringMap(string[string] thing) {
+    if (trace_) writefln("testStringMap(%s)", thing);
+    return thing;
+  }
+
+  override int[int][int] testMapMap(int hello) {
+    if (trace_) writefln("testMapMap(%s)", hello);
+    return testMapMapReturn;
+  }
+
+  override Insanity[Numberz][UserId] testInsanity(ref const(Insanity) argument) {
+    if (trace_) writeln("testInsanity()");
+    return testInsanityReturn;
+  }
+
+  override Xtruct testMulti(byte arg0, int arg1, long arg2, string[short] arg3,
+    Numberz arg4, UserId arg5)
+  {
+    if (trace_) writeln("testMulti()");
+    return Xtruct("Hello2", arg0, arg1, arg2);
+  }
+
+  override void testException(string arg) {
+    if (trace_) writefln("testException(%s)", arg);
+    if (arg == "Xception") {
+      auto e = new Xception();
+      e.errorCode = 1001;
+      e.message = arg;
+      throw e;
+    } else if (arg == "ApplicationException") {
+      throw new TException();
+    }
+  }
+
+  override Xtruct testMultiException(string arg0, string arg1) {
+    if (trace_) writefln("testMultiException(%s, %s)", arg0, arg1);
+
+    if (arg0 == "Xception") {
+      auto e = new Xception();
+      e.errorCode = 1001;
+      e.message = "This is an Xception";
+      throw e;
+    } else if (arg0 == "Xception2") {
+      auto e = new Xception2();
+      e.errorCode = 2002;
+      e.struct_thing.string_thing = "This is an Xception2";
+      throw e;
+    } else {
+      return Xtruct(arg1);
+    }
+  }
+
+  override void testOneway(int sleepFor) {
+    if (trace_) writefln("testOneway(%s): Sleeping...", sleepFor);
+    Thread.sleep(dur!"seconds"(sleepFor));
+    if (trace_) writefln("testOneway(%s): done sleeping!", sleepFor);
+  }
+
+private:
+  bool trace_;
+}
+
+void main(string[] args) {
+  ushort port = 9090;
+  ServerType serverType;
+  ProtocolType protocolType;
+  size_t numIOThreads = 1;
+  TransportType transportType;
+  bool ssl;
+  bool trace;
+  size_t taskPoolSize = totalCPUs;
+
+  getopt(args, "port", &port, "protocol", &protocolType, "server-type",
+    &serverType, "ssl", &ssl, "num-io-threads", &numIOThreads,
+    "task-pool-size", &taskPoolSize, "trace", &trace,
+    "transport", &transportType);
+
+  if (serverType == ServerType.nonblocking ||
+    serverType == ServerType.pooledNonblocking
+  ) {
+    enforce(transportType == TransportType.framed,
+      "Need to use framed transport with non-blocking server.");
+    enforce(!ssl, "The non-blocking server does not support SSL yet.");
+
+    // Don't wrap the contents into another layer of framing.
+    transportType = TransportType.raw;
+  }
+
+  version (ThriftTestTemplates) {
+    // Only exercise the specialized template code paths if explicitly enabled
+    // to reduce memory consumption on regular test suite runs – there should
+    // not be much that can go wrong with that specifically anyway.
+    alias TypeTuple!(TBufferedTransport, TFramedTransport, TServerHttpTransport)
+      AvailableTransports;
+    alias TypeTuple!(
+      staticMap!(TBinaryProtocol, AvailableTransports),
+      staticMap!(TCompactProtocol, AvailableTransports)
+    ) AvailableProtocols;
+  } else {
+    alias TypeTuple!() AvailableTransports;
+    alias TypeTuple!() AvailableProtocols;
+  }
+
+  TProtocolFactory protocolFactory;
+  final switch (protocolType) {
+    case ProtocolType.binary:
+      protocolFactory = new TBinaryProtocolFactory!AvailableTransports;
+      break;
+    case ProtocolType.compact:
+      protocolFactory = new TCompactProtocolFactory!AvailableTransports;
+      break;
+    case ProtocolType.json:
+      protocolFactory = new TJsonProtocolFactory!AvailableTransports;
+      break;
+  }
+
+  auto processor = new TServiceProcessor!(ThriftTest, AvailableProtocols)(
+    new TestHandler(trace));
+
+  TServerSocket serverSocket;
+  if (ssl) {
+    auto sslContext = new TSSLContext();
+    sslContext.serverSide = true;
+    sslContext.loadCertificate("./server-certificate.pem");
+    sslContext.loadPrivateKey("./server-private-key.pem");
+    sslContext.ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH";
+    serverSocket = new TSSLServerSocket(port, sslContext);
+  } else {
+    serverSocket = new TServerSocket(port);
+  }
+
+  auto transportFactory = createTransportFactory(transportType);
+
+  auto server = createServer(serverType, numIOThreads, taskPoolSize,
+    processor, serverSocket, transportFactory, protocolFactory);
+
+  writefln("Starting %s/%s %s ThriftTest server %son port %s...", protocolType,
+    transportType, serverType, ssl ? "(using SSL) ": "", port);
+  server.serve();
+  writeln("done.");
+}

Added: thrift/trunk/lib/d/test/transport_test.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/test/transport_test.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/test/transport_test.d (added)
+++ thrift/trunk/lib/d/test/transport_test.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Exercises various transports, combined with the buffered/framed wrappers.
+ *
+ * Originally ported from the C++ version, with Windows support code added.
+ */
+module transport_test;
+
+import core.atomic;
+import core.time : Duration;
+import core.thread : Thread;
+import std.conv : to;
+import std.datetime;
+import std.exception : enforce;
+static import std.file;
+import std.getopt;
+import std.random : rndGen, uniform, unpredictableSeed;
+import std.socket;
+import std.stdio;
+import std.string;
+import std.typetuple;
+import thrift.transport.base;
+import thrift.transport.buffered;
+import thrift.transport.framed;
+import thrift.transport.file;
+import thrift.transport.http;
+import thrift.transport.memory;
+import thrift.transport.socket;
+import thrift.transport.zlib;
+
+/*
+ * Size generation helpers – used to be able to run the same testing code
+ * with both constant and random total/chunk sizes.
+ */
+
+interface SizeGenerator {
+  size_t nextSize();
+  string toString();
+}
+
+class ConstantSizeGenerator : SizeGenerator {
+  this(size_t value) {
+    value_ = value;
+  }
+
+  override size_t nextSize() {
+    return value_;
+  }
+
+  override string toString() const {
+    return to!string(value_);
+  }
+
+private:
+  size_t value_;
+}
+
+class RandomSizeGenerator : SizeGenerator {
+  this(size_t min, size_t max) {
+    min_ = min;
+    max_ = max;
+  }
+
+  override size_t nextSize() {
+    return uniform!"[]"(min_, max_);
+  }
+
+  override string toString() const {
+    return format("rand(%s, %s)", min_, max_);
+  }
+
+  size_t min() const @property {
+    return min_;
+  }
+
+  size_t max() const @property {
+    return max_;
+  }
+
+private:
+  size_t min_;
+  size_t max_;
+}
+
+
+/*
+ * Classes to set up coupled transports
+ */
+
+/**
+ * Helper class to represent a coupled pair of transports.
+ *
+ * Data written to the output transport can be read from the input transport.
+ *
+ * This is used as the base class for the various coupled transport
+ * implementations. It shouldn't be used directly.
+ */
+class CoupledTransports(Transport) if (isTTransport!Transport) {
+  Transport input;
+  Transport output;
+}
+
+template isCoupledTransports(T) {
+  static if (is(T _ : CoupledTransports!U, U)) {
+    enum isCoupledTransports = true;
+  } else {
+    enum isCoupledTransports = false;
+  }
+}
+
+/**
+ * Helper template class for creating coupled transports that wrap
+ * another transport.
+ */
+class CoupledWrapperTransports(WrapperTransport, InnerCoupledTransports) if (
+  isTTransport!WrapperTransport && isCoupledTransports!InnerCoupledTransports
+) : CoupledTransports!WrapperTransport {
+  this() {
+    inner_ = new InnerCoupledTransports();
+    if (inner_.input) {
+      input = new WrapperTransport(inner_.input);
+    }
+    if (inner_.output) {
+      output = new WrapperTransport(inner_.output);
+    }
+  }
+
+  ~this() {
+    clear(inner_);
+  }
+
+private:
+  InnerCoupledTransports inner_;
+}
+
+import thrift.internal.codegen : PApply;
+alias PApply!(CoupledWrapperTransports, TBufferedTransport) CoupledBufferedTransports;
+alias PApply!(CoupledWrapperTransports, TFramedTransport) CoupledFramedTransports;
+alias PApply!(CoupledWrapperTransports, TZlibTransport) CoupledZlibTransports;
+
+/**
+ * Coupled TMemoryBuffers.
+ */
+class CoupledMemoryBuffers : CoupledTransports!TMemoryBuffer {
+  this() {
+    buf = new TMemoryBuffer;
+    input = buf;
+    output = buf;
+  }
+
+  TMemoryBuffer buf;
+}
+
+/**
+ * Coupled TSockets.
+ */
+class CoupledSocketTransports : CoupledTransports!TSocket {
+  this() {
+    auto sockets = socketPair();
+    input = new TSocket(sockets[0]);
+    output = new TSocket(sockets[1]);
+  }
+
+  ~this() {
+    input.close();
+    output.close();
+  }
+}
+
+/**
+ * Coupled TFileTransports
+ */
+class CoupledFileTransports : CoupledTransports!TTransport {
+  this() {
+    // We actually need the file name of the temp file here, so we can't just
+    // use the usual tempfile facilities.
+    do {
+      fileName_ = tmpDir ~ "/thrift.transport_test." ~ to!string(rndGen().front);
+      rndGen().popFront();
+    } while (std.file.exists(fileName_));
+
+    writefln("Using temp file: %s", fileName_);
+
+    auto writer = new TFileWriterTransport(fileName_);
+    writer.open();
+    output = writer;
+
+    // Wait until the file has been created.
+    writer.flush();
+
+    auto reader = new TFileReaderTransport(fileName_);
+    reader.open();
+    reader.readTimeout(dur!"msecs"(-1));
+    input = reader;
+  }
+
+  ~this() {
+    input.close();
+    output.close();
+    std.file.remove(fileName_);
+  }
+
+  static string tmpDir;
+
+private:
+  string fileName_;
+}
+
+
+/*
+ * Test functions
+ */
+
+/**
+ * Test interleaved write and read calls.
+ *
+ * Generates a buffer totalSize bytes long, then writes it to the transport,
+ * and verifies the written data can be read back correctly.
+ *
+ * Mode of operation:
+ * - call wChunkGenerator to figure out how large of a chunk to write
+ *   - call wSizeGenerator to get the size for individual write() calls,
+ *     and do this repeatedly until the entire chunk is written.
+ * - call rChunkGenerator to figure out how large of a chunk to read
+ *   - call rSizeGenerator to get the size for individual read() calls,
+ *     and do this repeatedly until the entire chunk is read.
+ * - repeat until the full buffer is written and read back,
+ *   then compare the data read back against the original buffer
+ *
+ *
+ * - If any of the size generators return 0, this means to use the maximum
+ *   possible size.
+ *
+ * - If maxOutstanding is non-zero, write chunk sizes will be chosen such that
+ *   there are never more than maxOutstanding bytes waiting to be read back.
+ */
+void testReadWrite(CoupledTransports)(
+  size_t totalSize,
+  SizeGenerator wSizeGenerator,
+  SizeGenerator rSizeGenerator,
+  SizeGenerator wChunkGenerator,
+  SizeGenerator rChunkGenerator,
+  size_t maxOutstanding
+) if (
+  isCoupledTransports!CoupledTransports
+) {
+  scope transports = new CoupledTransports;
+  assert(transports.input);
+  assert(transports.output);
+
+  auto wbuf = new ubyte[totalSize];
+  auto rbuf = new ubyte[totalSize];
+
+  // Store some data in wbuf.
+  foreach (i, ref b; wbuf) {
+    b = i & 0xff;
+  }
+
+  size_t totalWritten;
+  size_t totalRead;
+  while (totalRead < totalSize) {
+    // Determine how large a chunk of data to write.
+    auto wChunkSize = wChunkGenerator.nextSize();
+    if (wChunkSize == 0 || wChunkSize > totalSize - totalWritten) {
+      wChunkSize = totalSize - totalWritten;
+    }
+
+    // Make sure (totalWritten - totalRead) + wChunkSize is less than
+    // maxOutstanding.
+    if (maxOutstanding > 0 &&
+        wChunkSize > maxOutstanding - (totalWritten - totalRead)) {
+      wChunkSize = maxOutstanding - (totalWritten - totalRead);
+    }
+
+    // Write the chunk.
+    size_t chunkWritten = 0;
+    while (chunkWritten < wChunkSize) {
+      auto writeSize = wSizeGenerator.nextSize();
+      if (writeSize == 0 || writeSize > wChunkSize - chunkWritten) {
+        writeSize = wChunkSize - chunkWritten;
+      }
+
+      transports.output.write(wbuf[totalWritten .. totalWritten + writeSize]);
+      chunkWritten += writeSize;
+      totalWritten += writeSize;
+    }
+
+    // Flush the data, so it will be available in the read transport
+    // Don't flush if wChunkSize is 0. (This should only happen if
+    // totalWritten == totalSize already, and we're only reading now.)
+    if (wChunkSize > 0) {
+      transports.output.flush();
+    }
+
+    // Determine how large a chunk of data to read back.
+    auto rChunkSize = rChunkGenerator.nextSize();
+    if (rChunkSize == 0 || rChunkSize > totalWritten - totalRead) {
+      rChunkSize = totalWritten - totalRead;
+    }
+
+    // Read the chunk.
+    size_t chunkRead;
+    while (chunkRead < rChunkSize) {
+      auto readSize = rSizeGenerator.nextSize();
+      if (readSize == 0 || readSize > rChunkSize - chunkRead) {
+        readSize = rChunkSize - chunkRead;
+      }
+
+      size_t bytesRead;
+      try {
+        bytesRead = transports.input.read(
+          rbuf[totalRead .. totalRead + readSize]);
+      } catch (TTransportException e) {
+        throw new Exception(format(`read(pos = %s, size = %s) threw ` ~
+          `exception "%s"; written so far: %s/%s bytes`, totalRead, readSize,
+          e.msg, totalWritten, totalSize));
+      }
+
+      enforce(bytesRead > 0, format(`read(pos = %s, size = %s) returned %s; ` ~
+        `written so far: %s/%s bytes`, totalRead, readSize, bytesRead,
+        totalWritten, totalSize));
+
+      chunkRead += bytesRead;
+      totalRead += bytesRead;
+    }
+  }
+
+  // make sure the data read back is identical to the data written
+  if (rbuf != wbuf) {
+    stderr.writefln("%s vs. %s", wbuf[$ - 4 .. $], rbuf[$ - 4 .. $]);
+    stderr.writefln("rbuf: %s vs. wbuf: %s", rbuf.length, wbuf.length);
+  }
+  enforce(rbuf == wbuf);
+}
+
+void testReadPartAvailable(CoupledTransports)() if (
+  isCoupledTransports!CoupledTransports
+) {
+  scope transports = new CoupledTransports;
+  assert(transports.input);
+  assert(transports.output);
+
+  ubyte[10] writeBuf = 'a';
+  ubyte[10] readBuf;
+
+  // Attemping to read 10 bytes when only 9 are available should return 9
+  // immediately.
+  transports.output.write(writeBuf[0 .. 9]);
+  transports.output.flush();
+
+  auto t = Trigger(dur!"seconds"(3), transports.output, 1);
+  auto bytesRead = transports.input.read(readBuf);
+  enforce(t.fired == 0);
+  enforce(bytesRead == 9);
+}
+
+void testReadPartialMidframe(CoupledTransports)() if (
+  isCoupledTransports!CoupledTransports
+) {
+  scope transports = new CoupledTransports;
+  assert(transports.input);
+  assert(transports.output);
+
+  ubyte[13] writeBuf = 'a';
+  ubyte[14] readBuf;
+
+  // Attempt to read 10 bytes, when only 9 are available, but after we have
+  // already read part of the data that is available.  This exercises a
+  // different code path for several of the transports.
+  //
+  // For transports that add their own framing (e.g., TFramedTransport and
+  // TFileTransport), the two flush calls break up the data in to a 10 byte
+  // frame and a 3 byte frame.  The first read then puts us partway through the
+  // first frame, and then we attempt to read past the end of that frame, and
+  // through the next frame, too.
+  //
+  // For buffered transports that perform read-ahead (e.g.,
+  // TBufferedTransport), the read-ahead will most likely see all 13 bytes
+  // written on the first read.  The next read will then attempt to read past
+  // the end of the read-ahead buffer.
+  //
+  // Flush 10 bytes, then 3 bytes.  This creates 2 separate frames for
+  // transports that track framing internally.
+  transports.output.write(writeBuf[0 .. 10]);
+  transports.output.flush();
+  transports.output.write(writeBuf[10 .. 13]);
+  transports.output.flush();
+
+  // Now read 4 bytes, so that we are partway through the written data.
+  auto bytesRead = transports.input.read(readBuf[0 .. 4]);
+  enforce(bytesRead == 4);
+
+  // Now attempt to read 10 bytes.  Only 9 more are available.
+  //
+  // We should be able to get all 9 bytes, but it might take multiple read
+  // calls, since it is valid for read() to return fewer bytes than requested.
+  // (Most transports do immediately return 9 bytes, but the framing transports
+  // tend to only return to the end of the current frame, which is 6 bytes in
+  // this case.)
+  size_t totalRead = 0;
+  while (totalRead < 9) {
+    auto t = Trigger(dur!"seconds"(3), transports.output, 1);
+    bytesRead = transports.input.read(readBuf[4 + totalRead .. 14]);
+    enforce(t.fired == 0);
+    enforce(bytesRead > 0);
+    totalRead += bytesRead;
+    enforce(totalRead <= 9);
+  }
+
+  enforce(totalRead == 9);
+}
+
+void testBorrowPartAvailable(CoupledTransports)() if (
+  isCoupledTransports!CoupledTransports
+) {
+  scope transports = new CoupledTransports;
+  assert(transports.input);
+  assert(transports.output);
+
+  ubyte[9] writeBuf = 'a';
+  ubyte[10] readBuf;
+
+  // Attemping to borrow 10 bytes when only 9 are available should return NULL
+  // immediately.
+  transports.output.write(writeBuf);
+  transports.output.flush();
+
+  auto t = Trigger(dur!"seconds"(3), transports.output, 1);
+  auto borrowLen = readBuf.length;
+  auto borrowedBuf = transports.input.borrow(readBuf.ptr, borrowLen);
+  enforce(t.fired == 0);
+  enforce(borrowedBuf is null);
+}
+
+void testReadNoneAvailable(CoupledTransports)() if (
+  isCoupledTransports!CoupledTransports
+) {
+  scope transports = new CoupledTransports;
+  assert(transports.input);
+  assert(transports.output);
+
+  // Attempting to read when no data is available should either block until
+  // some data is available, or fail immediately.  (e.g., TSocket blocks,
+  // TMemoryBuffer just fails.)
+  //
+  // If the transport blocks, it should succeed once some data is available,
+  // even if less than the amount requested becomes available.
+  ubyte[10] readBuf;
+
+  auto t = Trigger(dur!"seconds"(1), transports.output, 2);
+  t.add(dur!"seconds"(1), transports.output, 8);
+
+  auto bytesRead = transports.input.read(readBuf);
+  if (bytesRead == 0) {
+    enforce(t.fired == 0);
+  } else {
+    enforce(t.fired == 1);
+    enforce(bytesRead == 2);
+  }
+}
+
+void testBorrowNoneAvailable(CoupledTransports)() if (
+  isCoupledTransports!CoupledTransports
+) {
+  scope transports = new CoupledTransports;
+  assert(transports.input);
+  assert(transports.output);
+
+  ubyte[16] writeBuf = 'a';
+
+  // Attempting to borrow when no data is available should fail immediately
+  auto t = Trigger(dur!"seconds"(1), transports.output, 10);
+
+  auto borrowLen = 10;
+  auto borrowedBuf = transports.input.borrow(null, borrowLen);
+  enforce(borrowedBuf is null);
+  enforce(t.fired == 0);
+}
+
+
+void doRwTest(CoupledTransports)(
+  size_t totalSize,
+  SizeGenerator wSizeGen,
+  SizeGenerator rSizeGen,
+  SizeGenerator wChunkSizeGen = new ConstantSizeGenerator(0),
+  SizeGenerator rChunkSizeGen = new ConstantSizeGenerator(0),
+  size_t maxOutstanding = 0
+) if (
+  isCoupledTransports!CoupledTransports
+) {
+  totalSize = cast(size_t)(totalSize * g_sizeMultiplier);
+
+  scope(failure) {
+    writefln("Test failed for %s: testReadWrite(%s, %s, %s, %s, %s, %s)",
+      CoupledTransports.stringof, totalSize, wSizeGen, rSizeGen,
+      wChunkSizeGen, rChunkSizeGen, maxOutstanding);
+  }
+
+  testReadWrite!CoupledTransports(totalSize, wSizeGen, rSizeGen,
+    wChunkSizeGen, rChunkSizeGen, maxOutstanding);
+}
+
+void doBlockingTest(CoupledTransports)() if (
+  isCoupledTransports!CoupledTransports
+) {
+  void writeFailure(string name) {
+    writefln("Test failed for %s: %s()", CoupledTransports.stringof, name);
+  }
+
+  {
+    scope(failure) writeFailure("testReadPartAvailable");
+    testReadPartAvailable!CoupledTransports();
+  }
+
+  {
+    scope(failure) writeFailure("testReadPartialMidframe");
+    testReadPartialMidframe!CoupledTransports();
+  }
+
+  {
+    scope(failure) writeFailure("testReadNoneAvaliable");
+    testReadNoneAvailable!CoupledTransports();
+  }
+
+  {
+    scope(failure) writeFailure("testBorrowPartAvailable");
+    testBorrowPartAvailable!CoupledTransports();
+  }
+
+  {
+    scope(failure) writeFailure("testBorrowNoneAvailable");
+    testBorrowNoneAvailable!CoupledTransports();
+  }
+}
+
+SizeGenerator getGenerator(T)(T t) {
+  static if (is(T : SizeGenerator)) {
+    return t;
+  } else {
+    return new ConstantSizeGenerator(t);
+  }
+}
+
+template WrappedTransports(T) if (isCoupledTransports!T) {
+  alias TypeTuple!(
+    T,
+    CoupledBufferedTransports!T,
+    CoupledFramedTransports!T,
+    CoupledZlibTransports!T
+  ) WrappedTransports;
+}
+
+void testRw(C, R, S)(
+  size_t totalSize,
+  R wSize,
+  S rSize
+) if (
+  isCoupledTransports!C && is(typeof(getGenerator(wSize))) &&
+  is(typeof(getGenerator(rSize)))
+) {
+  testRw!C(totalSize, wSize, rSize, 0, 0, 0);
+}
+
+void testRw(C, R, S, T, U)(
+  size_t totalSize,
+  R wSize,
+  S rSize,
+  T wChunkSize,
+  U rChunkSize,
+  size_t maxOutstanding = 0
+) if (
+  isCoupledTransports!C && is(typeof(getGenerator(wSize))) &&
+  is(typeof(getGenerator(rSize))) && is(typeof(getGenerator(wChunkSize))) &&
+  is(typeof(getGenerator(rChunkSize)))
+) {
+  foreach (T; WrappedTransports!C) {
+    doRwTest!T(
+      totalSize,
+      getGenerator(wSize),
+      getGenerator(rSize),
+      getGenerator(wChunkSize),
+      getGenerator(rChunkSize),
+      maxOutstanding
+    );
+  }
+}
+
+void testBlocking(C)() if (isCoupledTransports!C) {
+  foreach (T; WrappedTransports!C) {
+    doBlockingTest!T();
+  }
+}
+
+// A quick hack, for the sake of brevity…
+float g_sizeMultiplier = 1;
+
+version (Posix) {
+  immutable defaultTempDir = "/tmp";
+} else version (Windows) {
+  import core.sys.windows.windows;
+  extern(Windows) DWORD GetTempPathA(DWORD nBufferLength, LPTSTR lpBuffer);
+
+  string defaultTempDir() @property {
+    char[MAX_PATH + 1] dir;
+    enforce(GetTempPathA(dir.length, dir.ptr));
+    return to!string(dir.ptr)[0 .. $ - 1];
+  }
+} else static assert(false);
+
+void main(string[] args) {
+  int seed = unpredictableSeed();
+  string tmpDir = defaultTempDir;
+
+  getopt(args, "seed", &seed, "size-multiplier", &g_sizeMultiplier,
+    "tmp-dir", &tmpDir);
+  enforce(g_sizeMultiplier >= 0, "Size multiplier must not be negative.");
+
+  writefln("Using seed: %s", seed);
+  rndGen().seed(seed);
+  CoupledFileTransports.tmpDir = tmpDir;
+
+  auto rand4k = new RandomSizeGenerator(1, 4096);
+
+  /*
+   * We do the basically the same set of tests for each transport type,
+   * although we tweak the parameters in some places.
+   */
+
+  // TMemoryBuffer tests
+  testRw!CoupledMemoryBuffers(1024 * 1024, 0, 0);
+  testRw!CoupledMemoryBuffers(1024 * 256, rand4k, rand4k);
+  testRw!CoupledMemoryBuffers(1024 * 256, 167, 163);
+  testRw!CoupledMemoryBuffers(1024 * 16, 1, 1);
+
+  testRw!CoupledMemoryBuffers(1024 * 256, 0, 0, rand4k, rand4k);
+  testRw!CoupledMemoryBuffers(1024 * 256, rand4k, rand4k, rand4k, rand4k);
+  testRw!CoupledMemoryBuffers(1024 * 256, 167, 163, rand4k, rand4k);
+  testRw!CoupledMemoryBuffers(1024 * 16, 1, 1, rand4k, rand4k);
+
+  testBlocking!CoupledMemoryBuffers();
+
+  // TSocket tests
+  enum socketMaxOutstanding = 4096;
+  testRw!CoupledSocketTransports(1024 * 1024, 0, 0,
+          0, 0, socketMaxOutstanding);
+  testRw!CoupledSocketTransports(1024 * 256, rand4k, rand4k,
+          0, 0, socketMaxOutstanding);
+  testRw!CoupledSocketTransports(1024 * 256, 167, 163,
+          0, 0, socketMaxOutstanding);
+  // Doh.  Apparently writing to a socket has some additional overhead for
+  // each send() call.  If we have more than ~400 outstanding 1-byte write
+  // requests, additional send() calls start blocking.
+  testRw!CoupledSocketTransports(1024 * 16, 1, 1,
+          0, 0, 400);
+  testRw!CoupledSocketTransports(1024 * 256, 0, 0,
+          rand4k, rand4k, socketMaxOutstanding);
+  testRw!CoupledSocketTransports(1024 * 256, rand4k, rand4k,
+          rand4k, rand4k, socketMaxOutstanding);
+  testRw!CoupledSocketTransports(1024 * 256, 167, 163,
+          rand4k, rand4k, socketMaxOutstanding);
+  testRw!CoupledSocketTransports(1024 * 16, 1, 1,
+          rand4k, rand4k, 400);
+
+  testBlocking!CoupledSocketTransports();
+
+  // File transport tests.
+
+  // Cannot write more than the frame size at once.
+  enum maxWriteAtOnce = 1024 * 1024 * 16 - 4;
+
+  testRw!CoupledFileTransports(1024 * 1024, maxWriteAtOnce, 0);
+  testRw!CoupledFileTransports(1024 * 256, rand4k, rand4k);
+  testRw!CoupledFileTransports(1024 * 256, 167, 163);
+  testRw!CoupledFileTransports(1024 * 16, 1, 1);
+
+  testRw!CoupledFileTransports(1024 * 256, 0, 0, rand4k, rand4k);
+  testRw!CoupledFileTransports(1024 * 256, rand4k, rand4k, rand4k, rand4k);
+  testRw!CoupledFileTransports(1024 * 256, 167, 163, rand4k, rand4k);
+  testRw!CoupledFileTransports(1024 * 16, 1, 1, rand4k, rand4k);
+
+  testBlocking!CoupledFileTransports();
+}
+
+
+/*
+ * Timer handling code for use in tests that check the transport blocking
+ * semantics.
+ *
+ * The implementation has been hacked together in a hurry and wastes a lot of
+ * threads, but speed should not be the concern here.
+ */
+
+struct Trigger {
+  this(Duration timeout, TTransport transport, size_t writeLength) {
+    mutex_ = new Mutex;
+    cancelCondition_ = new Condition(mutex_);
+    info_ = new Info(timeout, transport, writeLength);
+    startThread();
+  }
+
+  ~this() {
+    synchronized (mutex_) {
+      info_ = null;
+      cancelCondition_.notifyAll();
+    }
+    if (thread_) thread_.join();
+  }
+
+  @disable this(this) { assert(0); }
+
+  void add(Duration timeout, TTransport transport, size_t writeLength) {
+    synchronized (mutex_) {
+      auto info = new Info(timeout, transport, writeLength);
+      if (info_) {
+        auto prev = info_;
+        while (prev.next) prev = prev.next;
+        prev.next = info;
+      } else {
+        info_ = info;
+        startThread();
+      }
+    }
+  }
+
+  @property short fired() {
+    return atomicLoad(fired_);
+  }
+
+private:
+  void timerThread() {
+    // KLUDGE: Make sure the std.concurrency mbox is initialized on the timer
+    // thread to be able to unblock the file transport.
+    import std.concurrency;
+    thisTid;
+
+    synchronized (mutex_) {
+      while (info_) {
+        auto cancelled = cancelCondition_.wait(info_.timeout);
+        if (cancelled) {
+          info_ = null;
+          break;
+        }
+
+        atomicOp!"+="(fired_, 1);
+
+        // Write some data to the transport to unblock it.
+        auto buf = new ubyte[info_.writeLength];
+        buf[] = 'b';
+        info_.transport.write(buf);
+        info_.transport.flush();
+
+        info_ = info_.next;
+      }
+    }
+
+    thread_ = null;
+  }
+
+  void startThread() {
+    thread_ = new Thread(&timerThread);
+    thread_.start();
+  }
+
+  struct Info {
+    this(Duration timeout, TTransport transport, size_t writeLength) {
+      this.timeout = timeout;
+      this.transport = transport;
+      this.writeLength = writeLength;
+    }
+
+    Duration timeout;
+    TTransport transport;
+    size_t writeLength;
+    Info* next;
+  }
+
+  Info* info_;
+  Thread thread_;
+  shared short fired_;
+
+  import core.sync.mutex;
+  Mutex mutex_;
+  import core.sync.condition;
+  Condition cancelCondition_;
+}

Modified: thrift/trunk/test/StressTest.thrift
URL: http://svn.apache.org/viewvc/thrift/trunk/test/StressTest.thrift?rev=1304085&r1=1304084&r2=1304085&view=diff
==============================================================================
--- thrift/trunk/test/StressTest.thrift (original)
+++ thrift/trunk/test/StressTest.thrift Thu Mar 22 21:49:10 2012
@@ -18,6 +18,7 @@
  */
 
 namespace cpp test.stress
+namespace d thrift.test.stress
 
 service Service {
 

Added: thrift/trunk/tutorial/d/Makefile
URL: http://svn.apache.org/viewvc/thrift/trunk/tutorial/d/Makefile?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/tutorial/d/Makefile (added)
+++ thrift/trunk/tutorial/d/Makefile Thu Mar 22 21:49:10 2012
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+LIB_D_DIR = ../../lib/d
+
+GEN_SRC = ../gen-d/share/SharedService.d ../gen-d/share/shared_types.d \
+	../gen-d/tutorial/tutorial_types.d ../gen-d/tutorial/Calculator.d
+
+default: server client async_client
+
+server: server.d
+	dmd -I${LIB_D_DIR}/src -L-L${LIB_D_DIR} -L-lthriftd server.d ${GEN_SRC}
+
+client: client.d
+	dmd -I${LIB_D_DIR}/src -L-L${LIB_D_DIR} -L-lthriftd client.d ${GEN_SRC}
+
+async_client: async_client.d
+	dmd -I${LIB_D_DIR}/src -L-L${LIB_D_DIR} -L-lthriftd -L-lthriftd-event -L-levent async_client.d ${GEN_SRC}
+
+clean:
+	$(RM) -f server client async_client