You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by jf...@apache.org on 2012/03/22 22:49:13 UTC

svn commit: r1304085 [7/10] - in /thrift/trunk: ./ aclocal/ compiler/cpp/ compiler/cpp/src/generate/ lib/ lib/d/ lib/d/src/ lib/d/src/thrift/ lib/d/src/thrift/async/ lib/d/src/thrift/codegen/ lib/d/src/thrift/internal/ lib/d/src/thrift/internal/test/ l...

Added: thrift/trunk/lib/d/src/thrift/transport/base.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/transport/base.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/transport/base.d (added)
+++ thrift/trunk/lib/d/src/thrift/transport/base.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift.transport.base;
+
+import core.stdc.string : strerror;
+import std.conv : text;
+import thrift.base;
+
+/**
+ * An entity data can be read from and/or written to.
+ *
+ * A TTransport implementation may capable of either reading or writing, but
+ * not necessarily both.
+ */
+interface TTransport {
+  /**
+   * Whether this transport is open.
+   *
+   * If a transport is closed, it can be opened by calling open(), and vice
+   * versa for close().
+   *
+   * While a transport should always be open when trying to read/write data,
+   * the related functions do not necessarily fail when called for a closed
+   * transport. Situations like this could occur e.g. with a wrapper
+   * transport which buffers data when the underlying transport has already
+   * been closed (possibly because the connection was abruptly closed), but
+   * there is still data left to be read in the buffers. This choice has been
+   * made to simplify transport implementations, in terms of both  code
+   * complexity and runtime overhead.
+   */
+  bool isOpen() @property;
+
+  /**
+   * Tests whether there is more data to read or if the remote side is
+   * still open.
+   *
+   * A typical use case would be a server checking if it should process
+   * another request on the transport.
+   */
+  bool peek();
+
+  /**
+   * Opens the transport for communications.
+   *
+   * If the transport is already open, nothing happens.
+   *
+   * Throws: TTransportException if opening fails.
+   */
+  void open();
+
+  /**
+   * Closes the transport.
+   *
+   * If the transport is not open, nothing happens.
+   *
+   * Throws: TTransportException if closing fails.
+   */
+  void close();
+
+  /**
+   * Attempts to fill the given buffer by reading data.
+   *
+   * For potentially blocking data sources (e.g. sockets), read() will only
+   * block if no data is available at all. If there is some data available,
+   * but waiting for new data to arrive would be required to fill the whole
+   * buffer, the readily available data will be immediately returned – use
+   * readAll() if you want to wait until the whole buffer is filled.
+   *
+   * Params:
+   *   buf = Slice to use as buffer.
+   *
+   * Returns: How many bytes were actually read
+   *
+   * Throws: TTransportException if an error occurs.
+   */
+  size_t read(ubyte[] buf);
+
+  /**
+   * Fills the given buffer by reading data into it, failing if not enough
+   * data is available.
+   *
+   * Params:
+   *   buf = Slice to use as buffer.
+   *
+   * Throws: TTransportException if insufficient data is available or reading
+   *   fails altogether.
+   */
+  void readAll(ubyte[] buf);
+
+  /**
+   * Must be called by clients when read is completed.
+   *
+   * Implementations can choose to perform a transport-specific action, e.g.
+   * logging the request to a file.
+   *
+   * Returns: The number of bytes read if available, 0 otherwise.
+   */
+  size_t readEnd();
+
+  /**
+   * Writes the passed slice of data.
+   *
+   * Note: You must call flush() to ensure the data is actually written,
+   * and available to be read back in the future.  Destroying a TTransport
+   * object does not automatically flush pending data – if you destroy a
+   * TTransport object with written but unflushed data, that data may be
+   * discarded.
+   *
+   * Params:
+   *   buf = Slice of data to write.
+   *
+   * Throws: TTransportException if an error occurs.
+   */
+  void write(in ubyte[] buf);
+
+  /**
+   * Must be called by clients when write is completed.
+   *
+   * Implementations can choose to perform a transport-specific action, e.g.
+   * logging the request to a file.
+   *
+   * Returns: The number of bytes written if available, 0 otherwise.
+   */
+  size_t writeEnd();
+
+  /**
+   * Flushes any pending data to be written.
+   *
+   * Must be called before destruction to ensure writes are actually complete,
+   * otherwise pending data may be discarded. Typically used with buffered
+   * transport mechanisms.
+   *
+   * Throws: TTransportException if an error occurs.
+   */
+  void flush();
+
+  /**
+   * Attempts to return a slice of <code>len</code> bytes of incoming data,
+   * possibly copied into buf, not consuming them (i.e.: a later read will
+   * return the same data).
+   *
+   * This method is meant to support protocols that need to read variable-
+   * length fields. They can attempt to borrow the maximum amount of data that
+   * they will need, then <code>consume()</code> what they actually use. Some
+   * transports will not support this method and others will fail occasionally,
+   * so protocols must be prepared to fall back to <code>read()</code> if
+   * borrow fails.
+   *
+   * The transport must be open when calling this.
+   *
+   * Params:
+   *   buf = A buffer where the data can be stored if needed, or null to
+   *     indicate that the caller is not supplying storage, but would like a
+   *     slice of an internal buffer, if available.
+   *   len = The number of bytes to borrow.
+   *
+   * Returns: If the borrow succeeds, a slice containing the borrowed data,
+   *   null otherwise. The slice will be at least as long as requested, but
+   *   may be longer if the returned slice points into an internal buffer
+   *   rather than buf.
+   *
+   * Throws: TTransportException if an error occurs.
+   */
+  const(ubyte)[] borrow(ubyte* buf, size_t len) out (result) {
+    // FIXME: Commented out because len gets corrupted in
+    // thrift.transport.memory borrow() unittest.
+    version(none) assert(result is null || result.length >= len,
+       "Buffer returned by borrow() too short.");
+  }
+
+  /**
+   * Remove len bytes from the transport. This must always follow a borrow
+   * of at least len bytes, and should always succeed.
+   *
+   * The transport must be open when calling this.
+   *
+   * Params:
+   *   len = Number of bytes to consume.
+   *
+   * Throws: TTransportException if an error occurs.
+   */
+  void consume(size_t len);
+}
+
+/**
+ * Provides basic fall-back implementations of the TTransport interface.
+ */
+class TBaseTransport : TTransport {
+  override bool isOpen() @property {
+    return false;
+  }
+
+  override bool peek() {
+    return isOpen;
+  }
+
+  override void open() {
+    throw new TTransportException("Cannot open TBaseTransport.",
+      TTransportException.Type.NOT_IMPLEMENTED);
+  }
+
+  override void close() {
+    throw new TTransportException("Cannot close TBaseTransport.",
+      TTransportException.Type.NOT_IMPLEMENTED);
+  }
+
+  override size_t read(ubyte[] buf) {
+    throw new TTransportException("Cannot read from a TBaseTransport.",
+      TTransportException.Type.NOT_IMPLEMENTED);
+  }
+
+  override void readAll(ubyte[] buf) {
+    size_t have;
+    while (have < buf.length) {
+      size_t get = read(buf[have..$]);
+      if (get <= 0) {
+        throw new TTransportException(text("Could not readAll() ", buf.length,
+          " bytes as no more data was available after ", have, " bytes."),
+          TTransportException.Type.END_OF_FILE);
+      }
+      have += get;
+    }
+  }
+
+  override size_t readEnd() {
+    // Do nothing by default, not needed by all implementations.
+    return 0;
+  }
+
+  override void write(in ubyte[] buf) {
+    throw new TTransportException("Cannot write to a TBaseTransport.",
+      TTransportException.Type.NOT_IMPLEMENTED);
+  }
+
+  override size_t writeEnd() {
+    // Do nothing by default, not needed by all implementations.
+    return 0;
+  }
+
+  override void flush() {
+    // Do nothing by default, not needed by all implementations.
+  }
+
+  override const(ubyte)[] borrow(ubyte* buf, size_t len) {
+    // borrow() is allowed to fail anyway, so just return null.
+    return null;
+  }
+
+  override void consume(size_t len) {
+    throw new TTransportException("Cannot consume from a TBaseTransport.",
+      TTransportException.Type.NOT_IMPLEMENTED);
+  }
+
+protected:
+  this() {}
+}
+
+/**
+ * Makes a TTransport which wraps a given source transport in some way.
+ *
+ * A common use case is inside server implementations, where the raw client
+ * connections accepted from e.g. TServerSocket need to be wrapped into
+ * buffered or compressed transports.
+ */
+class TTransportFactory {
+  /**
+   * Default implementation does nothing, just returns the transport given.
+   */
+  TTransport getTransport(TTransport trans) {
+    return trans;
+  }
+}
+
+/**
+ * Transport factory for transports which simply wrap an underlying TTransport
+ * without requiring additional configuration.
+ */
+class TWrapperTransportFactory(T) if (
+  is(T : TTransport) && __traits(compiles, new T(TTransport.init))
+)  : TTransportFactory {
+  override T getTransport(TTransport trans) {
+    return new T(trans);
+  }
+}
+
+/**
+ * Transport-level exception.
+ */
+class TTransportException : TException {
+  /**
+   * Error codes for the various types of exceptions.
+   */
+  enum Type {
+    UNKNOWN, ///
+    NOT_OPEN, ///
+    TIMED_OUT, ///
+    END_OF_FILE, ///
+    INTERRUPTED, ///
+    BAD_ARGS, ///
+    CORRUPTED_DATA, ///
+    INTERNAL_ERROR, ///
+    NOT_IMPLEMENTED ///
+  }
+
+  ///
+  this(Type type, string file = __FILE__, size_t line = __LINE__, Throwable next = null) {
+    static string msgForType(Type type) {
+      switch (type) {
+        case Type.UNKNOWN: return "Unknown transport exception";
+        case Type.NOT_OPEN: return "Transport not open";
+        case Type.TIMED_OUT: return "Timed out";
+        case Type.END_OF_FILE: return "End of file";
+        case Type.INTERRUPTED: return "Interrupted";
+        case Type.BAD_ARGS: return "Invalid arguments";
+        case Type.CORRUPTED_DATA: return "Corrupted Data";
+        case Type.INTERNAL_ERROR: return "Internal error";
+        case Type.NOT_IMPLEMENTED: return "Not implemented";
+        default: return "(Invalid exception type)";
+      }
+    }
+    this(msgForType(type), type, file, line, next);
+  }
+
+  ///
+  this(string msg, string file = __FILE__, size_t line = __LINE__,
+    Throwable next = null)
+  {
+    this(msg, Type.UNKNOWN, file, line, next);
+  }
+
+  ///
+  this(string msg, Type type, string file = __FILE__, size_t line = __LINE__,
+    Throwable next = null)
+  {
+    super(msg, file, line, next);
+    type_ = type;
+  }
+
+  ///
+  Type type() const nothrow @property {
+    return type_;
+  }
+
+protected:
+  Type type_;
+}
+
+/**
+ * Meta-programming helper returning whether the passed type is a TTransport
+ * implementation.
+ */
+template isTTransport(T) {
+  enum isTTransport = is(T : TTransport);
+}

Added: thrift/trunk/lib/d/src/thrift/transport/buffered.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/transport/buffered.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/transport/buffered.d (added)
+++ thrift/trunk/lib/d/src/thrift/transport/buffered.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift.transport.buffered;
+
+import std.algorithm : min;
+import std.array : empty;
+import std.exception : enforce;
+import thrift.transport.base;
+
+/**
+ * Wraps another transport and buffers reads and writes until the internal
+ * buffers are exhausted, at which point new data is fetched resp. the
+ * accumulated data is written out at once.
+ */
+final class TBufferedTransport : TBaseTransport {
+  /**
+   * Constructs a new instance, using the default buffer sizes.
+   *
+   * Params:
+   *   transport = The underlying transport to wrap.
+   */
+  this(TTransport transport) {
+    this(transport, DEFAULT_BUFFER_SIZE);
+  }
+
+  /**
+   * Constructs a new instance, using the specified buffer size.
+   *
+   * Params:
+   *   transport = The underlying transport to wrap.
+   *   bufferSize = The size of the read and write buffers to use, in bytes.
+   */
+  this(TTransport transport, size_t bufferSize) {
+    this(transport, bufferSize, bufferSize);
+  }
+
+  /**
+   * Constructs a new instance, using the specified buffer size.
+   *
+   * Params:
+   *   transport = The underlying transport to wrap.
+   *   readBufferSize = The size of the read buffer to use, in bytes.
+   *   writeBufferSize = The size of the write buffer to use, in bytes.
+   */
+  this(TTransport transport, size_t readBufferSize, size_t writeBufferSize) {
+    transport_ = transport;
+    readBuffer_ = new ubyte[readBufferSize];
+    writeBuffer_ = new ubyte[writeBufferSize];
+    writeAvail_ = writeBuffer_;
+  }
+
+  /// The default size of the read/write buffers, in bytes.
+  enum int DEFAULT_BUFFER_SIZE = 512;
+
+  override bool isOpen() @property {
+    return transport_.isOpen();
+  }
+
+  override bool peek() {
+    if (readAvail_.empty) {
+      // If there is nothing available to read, see if we can get something
+      // from the underlying transport.
+      auto bytesRead = transport_.read(readBuffer_);
+      readAvail_ = readBuffer_[0 .. bytesRead];
+    }
+
+    return !readAvail_.empty;
+  }
+
+  override void open() {
+    transport_.open();
+  }
+
+  override void close() {
+    if (!isOpen) return;
+    flush();
+    transport_.close();
+  }
+
+  override size_t read(ubyte[] buf) {
+    if (readAvail_.empty) {
+      // No data left in our buffer, fetch some from the underlying transport.
+
+      if (buf.length > readBuffer_.length) {
+        // If the amount of data requested is larger than our reading buffer,
+        // directly read to the passed buffer. This probably doesn't occur too
+        // often in practice (and even if it does, the underlying transport
+        // probably cannot fulfill the request at once anyway), but it can't
+        // harm to try…
+        return transport_.read(buf);
+      }
+
+      auto bytesRead = transport_.read(readBuffer_);
+      readAvail_ = readBuffer_[0 .. bytesRead];
+    }
+
+    // Hand over whatever we have.
+    auto give = min(readAvail_.length, buf.length);
+    buf[0 .. give] = readAvail_[0 .. give];
+    readAvail_ = readAvail_[give .. $];
+    return give;
+  }
+
+  /**
+   * Shortcut version of readAll.
+   */
+  override void readAll(ubyte[] buf) {
+    if (readAvail_.length >= buf.length) {
+      buf[] = readAvail_[0 .. buf.length];
+      readAvail_ = readAvail_[buf.length .. $];
+      return;
+    }
+
+    super.readAll(buf);
+  }
+
+  override void write(in ubyte[] buf) {
+    if (writeAvail_.length >= buf.length) {
+      // If the data fits in the buffer, just save it there.
+      writeAvail_[0 .. buf.length] = buf;
+      writeAvail_ = writeAvail_[buf.length .. $];
+      return;
+    }
+
+    // We have to decide if we copy data from buf to our internal buffer, or
+    // just directly write them out. The same considerations about avoiding
+    // syscalls as for C++ apply here.
+    auto bytesAvail = writeAvail_.ptr - writeBuffer_.ptr;
+    if ((bytesAvail + buf.length >= 2 * writeBuffer_.length) || (bytesAvail == 0)) {
+      // We would immediately need two syscalls anyway (or we don't have
+      // anything) in our buffer to write, so just write out both buffers.
+      if (bytesAvail > 0) {
+        transport_.write(writeBuffer_[0 .. bytesAvail]);
+        writeAvail_ = writeBuffer_;
+      }
+
+      transport_.write(buf);
+      return;
+    }
+
+    // Fill up our internal buffer for a write.
+    writeAvail_[] = buf[0 .. writeAvail_.length];
+    auto left = buf[writeAvail_.length .. $];
+    transport_.write(writeBuffer_);
+
+    // Copy the rest into our buffer.
+    writeBuffer_[0 .. left.length] = left[];
+    writeAvail_ = writeBuffer_[left.length .. $];
+  }
+
+  override void flush() {
+    // Write out any data waiting in the write buffer.
+    auto bytesAvail = writeAvail_.ptr - writeBuffer_.ptr;
+    if (bytesAvail > 0) {
+      // Note that we reset writeAvail_ prior to calling the underlying protocol
+      // to make sure the buffer is cleared even if the transport throws an
+      // exception.
+      writeAvail_ = writeBuffer_;
+      transport_.write(writeBuffer_[0 .. bytesAvail]);
+    }
+
+    // Flush the underlying transport.
+    transport_.flush();
+  }
+
+  override const(ubyte)[] borrow(ubyte* buf, size_t len) {
+    if (len <= readAvail_.length) {
+      return readAvail_;
+    }
+    return null;
+  }
+
+  override void consume(size_t len) {
+    enforce(len <= readBuffer_.length, new TTransportException(
+      "Invalid consume length.", TTransportException.Type.BAD_ARGS));
+    readAvail_ = readAvail_[len .. $];
+  }
+
+  /**
+   * The wrapped transport.
+   */
+  TTransport underlyingTransport() @property {
+    return transport_;
+  }
+
+private:
+  TTransport transport_;
+
+  ubyte[] readBuffer_;
+  ubyte[] writeBuffer_;
+
+  ubyte[] readAvail_;
+  ubyte[] writeAvail_;
+}
+
+/**
+ * Wraps given transports into TBufferedTransports.
+ */
+alias TWrapperTransportFactory!TBufferedTransport TBufferedTransportFactory;

Added: thrift/trunk/lib/d/src/thrift/transport/file.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/transport/file.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/transport/file.d (added)
+++ thrift/trunk/lib/d/src/thrift/transport/file.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,1100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Transports for reading from/writing to Thrift »log files«.
+ *
+ * These transports are not »stupid« sources and sinks just reading and
+ * writing bytes from a file verbatim, but organize the contents in the form
+ * of so-called »events«, which refers to the data written between two flush()
+ * calls.
+ *
+ * Chunking is supported, events are guaranteed to never span chunk boundaries.
+ * As a consequence, an event can never be larger than the chunk size. The
+ * chunk size used is not saved with the file, so care has to be taken to make
+ * sure the same chunk size is used for reading and writing.
+ */
+module thrift.transport.file;
+
+import core.thread : Thread;
+import std.array : empty;
+import std.algorithm : min, max;
+import std.concurrency;
+import std.conv : to;
+import std.datetime : AutoStart, dur, Duration, StopWatch;
+import std.exception;
+import std.stdio : File;
+import thrift.base;
+import thrift.transport.base;
+
+/// The default chunk size, in bytes.
+enum DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
+
+/// The type used to represent event sizes in the file.
+alias uint EventSize;
+
+version (BigEndian) {
+  static assert(false,
+    "Little endian byte order is assumed in thrift.transport.file.");
+}
+
+/**
+ * A transport used to read log files. It can never be written to, calling
+ * write() throws.
+ *
+ * Contrary to the C++ design, explicitly opening the transport/file before
+ * using is necessary to allow manually closing the file without relying on the
+ * object lifetime. Otherwise, it's a straight port of the C++ implementation.
+ */
+final class TFileReaderTransport : TBaseTransport {
+  /**
+   * Creates a new file writer transport.
+   *
+   * Params:
+   *   path = Path of the file to opperate on.
+   */
+  this(string path) {
+    path_ = path;
+    chunkSize_ = DEFAULT_CHUNK_SIZE;
+    readBufferSize_ = DEFAULT_READ_BUFFER_SIZE;
+    readTimeout_ = DEFAULT_READ_TIMEOUT;
+    corruptedEventSleepDuration_ = DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION;
+    maxEventSize = DEFAULT_MAX_EVENT_SIZE;
+  }
+
+  override bool isOpen() @property {
+    return isOpen_;
+  }
+
+  override bool peek() {
+    if (!isOpen) return false;
+
+    // If there is no event currently processed, try fetching one from the
+    // file.
+    if (!currentEvent_) {
+      currentEvent_ = readEvent();
+
+      if (!currentEvent_) {
+        // Still nothing there, couldn't read a new event.
+        return false;
+      }
+    }
+    // check if there is anything to read
+    return (currentEvent_.length - currentEventPos_) > 0;
+  }
+
+  override void open() {
+    if (isOpen) return;
+    try {
+      file_ = File(path_, "rb");
+    } catch (Exception e) {
+      throw new TTransportException("Error on opening input file.",
+        TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
+    }
+    isOpen_ = true;
+  }
+
+  override void close() {
+    if (!isOpen) return;
+
+    file_.close();
+    isOpen_ = false;
+    readState_.resetAllValues();
+  }
+
+  override size_t read(ubyte[] buf) {
+    enforce(isOpen, new TTransportException(
+      "Cannot read if file is not open.", TTransportException.Type.NOT_OPEN));
+
+    // If there is no event currently processed, try fetching one from the
+    // file.
+    if (!currentEvent_) {
+      currentEvent_ = readEvent();
+
+      if (!currentEvent_) {
+        // Still nothing there, couldn't read a new event.
+        return 0;
+      }
+    }
+
+    auto len = buf.length;
+    auto remaining = currentEvent_.length - currentEventPos_;
+
+    if (remaining <= len) {
+      // If less than the requested length is available, read as much as
+      // possible.
+      buf[0 .. remaining] = currentEvent_[currentEventPos_ .. $];
+      currentEvent_ = null;
+      currentEventPos_ = 0;
+      return remaining;
+    }
+
+    // There will still be data left in the buffer after reading, pass out len
+    // bytes.
+    buf[] = currentEvent_[currentEventPos_ .. currentEventPos_ + len];
+    currentEventPos_ += len;
+    return len;
+  }
+
+  ulong getNumChunks() {
+    enforce(isOpen, new TTransportException(
+      "Cannot get number of chunks if file not open.",
+      TTransportException.Type.NOT_OPEN));
+
+    try {
+      auto fileSize = file_.size();
+      if (fileSize == 0) {
+        // Empty files have no chunks.
+        return 0;
+      }
+      return ((fileSize)/chunkSize_) + 1;
+    } catch (Exception e) {
+      throw new TTransportException("Error getting file size.", __FILE__,
+        __LINE__, e);
+    }
+  }
+
+  ulong getCurChunk() {
+    return offset_ / chunkSize_;
+  }
+
+  void seekToChunk(long chunk) {
+    enforce(isOpen, new TTransportException(
+      "Cannot get number of chunks if file not open.",
+      TTransportException.Type.NOT_OPEN));
+
+    auto numChunks = getNumChunks();
+
+    if (chunk < 0) {
+      // Count negative indices from the end.
+      chunk += numChunks;
+    }
+
+    if (chunk < 0) {
+      logError("Incorrect chunk number for reverse seek, seeking to " ~
+       "beginning instead: %s", chunk);
+      chunk = 0;
+    }
+
+    bool seekToEnd;
+    long minEndOffset;
+    if (chunk >= numChunks) {
+      logError("Trying to seek to non-existing chunk, seeking to " ~
+       "end of file instead: %s", chunk);
+      seekToEnd = true;
+      chunk = numChunks - 1;
+      // this is the min offset to process events till
+      minEndOffset = file_.size();
+    }
+
+    readState_.resetAllValues();
+    currentEvent_ = null;
+
+    try {
+      file_.seek(chunk * chunkSize_);
+      offset_ = chunk * chunkSize_;
+    } catch (Exception e) {
+      throw new TTransportException("Error seeking to chunk", __FILE__,
+        __LINE__, e);
+    }
+
+    if (seekToEnd) {
+      // Never wait on the end of the file for new content, we just want to
+      // find the last one.
+      auto oldReadTimeout = readTimeout_;
+      scope (exit) readTimeout_ = oldReadTimeout;
+      readTimeout_ = dur!"hnsecs"(0);
+
+      // Keep on reading unti the last event at point of seekToChunk call.
+      while ((offset_ + readState_.bufferPos_) < minEndOffset) {
+        if (readEvent() is null) {
+          break;
+        }
+      }
+    }
+  }
+
+  void seekToEnd() {
+    seekToChunk(getNumChunks());
+  }
+
+  /**
+   * The size of the chunks the file is divided into, in bytes.
+   */
+  ulong chunkSize() @property const {
+    return chunkSize_;
+  }
+
+  /// ditto
+  void chunkSize(ulong value) @property {
+    enforce(!isOpen, new TTransportException(
+      "Cannot set chunk size after TFileReaderTransport has been opened."));
+    enforce(value > EventSize.sizeof, new TTransportException("Chunks must " ~
+      "be large enough to accomodate at least a single byte of payload data."));
+    chunkSize_ = value;
+  }
+
+  /**
+   * If positive, wait the specified duration for new data when arriving at
+   * end of file. If negative, wait forever (tailing mode), waking up to check
+   * in the specified interval. If zero, do not wait at all.
+   *
+   * Defaults to 500 ms.
+   */
+  Duration readTimeout() @property const {
+    return readTimeout_;
+  }
+
+  /// ditto
+  void readTimeout(Duration value) @property {
+    readTimeout_ = value;
+  }
+
+  /// ditto
+  enum DEFAULT_READ_TIMEOUT = dur!"msecs"(500);
+
+  /**
+   * Read buffer size, in bytes.
+   *
+   * Defaults to 1 MiB.
+   */
+  size_t readBufferSize() @property const {
+    return readBufferSize_;
+  }
+
+  /// ditto
+  void readBufferSize(size_t value) @property {
+    if (readBuffer_) {
+      enforce(value <= readBufferSize_,
+        "Cannot shrink read buffer after first read.");
+      readBuffer_.length = value;
+    }
+    readBufferSize_ = value;
+  }
+
+  /// ditto
+  enum DEFAULT_READ_BUFFER_SIZE = 1 * 1024 * 1024;
+
+  /**
+   * Arbitrary event size limit, in bytes. Must be smaller than chunk size.
+   *
+   * Defaults to zero (no limit).
+   */
+  size_t maxEventSize() @property const {
+    return maxEventSize_;
+  }
+
+  /// ditto
+  void maxEventSize(size_t value) @property {
+    enforce(value <= chunkSize_ - EventSize.sizeof, "Events cannot span " ~
+      "mutiple chunks, maxEventSize must be smaller than chunk size.");
+    maxEventSize_ = value;
+  }
+
+  /// ditto
+  enum DEFAULT_MAX_EVENT_SIZE = 0;
+
+  /**
+   * The interval at which the thread wakes up to check for the next chunk
+   * in tailing mode.
+   *
+   * Defaults to one second.
+   */
+  Duration corruptedEventSleepDuration() const {
+    return corruptedEventSleepDuration_;
+  }
+
+  /// ditto
+  void corruptedEventSleepDuration(Duration value) {
+    corruptedEventSleepDuration_ = value;
+  }
+
+  /// ditto
+  enum DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION = dur!"seconds"(1);
+
+  /**
+   * The maximum number of corrupted events tolerated before the whole chunk
+   * is skipped.
+   *
+   * Defaults to zero.
+   */
+  uint maxCorruptedEvents() @property const {
+    return maxCorruptedEvents_;
+  }
+
+  /// ditto
+  void maxCorruptedEvents(uint value) @property {
+    maxCorruptedEvents_ = value;
+  }
+
+  /// ditto
+  enum DEFAULT_MAX_CORRUPTED_EVENTS = 0;
+
+private:
+  ubyte[] readEvent() {
+    if (!readBuffer_) {
+      readBuffer_ = new ubyte[readBufferSize_];
+    }
+
+    bool timeoutExpired;
+    while (1) {
+      // read from the file if read buffer is exhausted
+      if (readState_.bufferPos_ == readState_.bufferLen_) {
+        // advance the offset pointer
+        offset_ += readState_.bufferLen_;
+
+        try {
+          // Need to clear eof flag before reading, otherwise tailing a file
+          // does not work.
+          file_.clearerr();
+
+          auto usedBuf = file_.rawRead(readBuffer_);
+          readState_.bufferLen_ = usedBuf.length;
+        } catch (Exception e) {
+          readState_.resetAllValues();
+          throw new TTransportException("Error while reading from file",
+            __FILE__, __LINE__, e);
+        }
+
+        readState_.bufferPos_ = 0;
+        readState_.lastDispatchPos_ = 0;
+
+        if (readState_.bufferLen_ == 0) {
+          // Reached end of file.
+          if (readTimeout_ < dur!"hnsecs"(0)) {
+            // Tailing mode, sleep for the specified duration and try again.
+            Thread.sleep(-readTimeout_);
+            continue;
+          } else if (readTimeout_ == dur!"hnsecs"(0) || timeoutExpired) {
+            // Either no timeout set, or it has already expired.
+            readState_.resetState(0);
+            return null;
+          } else {
+            // Timeout mode, sleep for the specified amount of time and retry.
+            Thread.sleep(readTimeout_);
+            timeoutExpired = true;
+            continue;
+          }
+        }
+      }
+
+      // Attempt to read an event from the buffer.
+      while (readState_.bufferPos_ < readState_.bufferLen_) {
+        if (readState_.readingSize_) {
+          if (readState_.eventSizeBuffPos_ == 0) {
+            if ((offset_ + readState_.bufferPos_)/chunkSize_ !=
+              ((offset_ + readState_.bufferPos_ + 3)/chunkSize_))
+            {
+              readState_.bufferPos_++;
+              continue;
+            }
+          }
+
+          readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
+            readBuffer_[readState_.bufferPos_++];
+
+          if (readState_.eventSizeBuffPos_ == 4) {
+            auto size = (cast(uint[])readState_.eventSizeBuff_)[0];
+
+            if (size == 0) {
+              // This is part of the zero padding between chunks.
+              readState_.resetState(readState_.lastDispatchPos_);
+              continue;
+            }
+
+            // got a valid event
+            readState_.readingSize_ = false;
+            readState_.eventLen_ = size;
+            readState_.eventPos_ = 0;
+
+            // check if the event is corrupted and perform recovery if required
+            if (isEventCorrupted()) {
+              performRecovery();
+              // start from the top
+              break;
+            }
+          }
+        } else {
+          if (!readState_.event_) {
+            readState_.event_ = new ubyte[readState_.eventLen_];
+          }
+
+          // take either the entire event or the remaining bytes in the buffer
+          auto reclaimBuffer = min(readState_.bufferLen_ - readState_.bufferPos_,
+            readState_.eventLen_ - readState_.eventPos_);
+
+          // copy data from read buffer into event buffer
+          readState_.event_[
+            readState_.eventPos_ .. readState_.eventPos_ + reclaimBuffer
+          ] = readBuffer_[
+            readState_.bufferPos_ .. readState_.bufferPos_ + reclaimBuffer
+          ];
+
+          // increment position ptrs
+          readState_.eventPos_ += reclaimBuffer;
+          readState_.bufferPos_ += reclaimBuffer;
+
+          // check if the event has been read in full
+          if (readState_.eventPos_ == readState_.eventLen_) {
+            // Reset the read state and return the completed event.
+            auto completeEvent = readState_.event_;
+            readState_.event_ = null;
+            readState_.resetState(readState_.bufferPos_);
+            return completeEvent;
+          }
+        }
+      }
+    }
+  }
+
+  bool isEventCorrupted() {
+    if ((maxEventSize_ > 0) && (readState_.eventLen_ > maxEventSize_)) {
+      // Event size is larger than user-speficied max-event size
+      logError("Corrupt event read: Event size (%s) greater than max " ~
+        "event size (%s)", readState_.eventLen_, maxEventSize_);
+      return true;
+    } else if (readState_.eventLen_ > chunkSize_) {
+      // Event size is larger than chunk size
+      logError("Corrupt event read: Event size (%s) greater than chunk " ~
+        "size (%s)", readState_.eventLen_, chunkSize_);
+      return true;
+    } else if (((offset_ + readState_.bufferPos_ - EventSize.sizeof) / chunkSize_) !=
+      ((offset_ + readState_.bufferPos_ + readState_.eventLen_ - EventSize.sizeof) / chunkSize_))
+    {
+      // Size indicates that event crosses chunk boundary
+      logError("Read corrupt event. Event crosses chunk boundary. " ~
+        "Event size: %s. Offset: %s", readState_.eventLen_,
+        (offset_ + readState_.bufferPos_ + EventSize.sizeof)
+      );
+
+      return true;
+    }
+
+    return false;
+  }
+
+  void performRecovery() {
+    // perform some kickass recovery
+    auto curChunk = getCurChunk();
+    if (lastBadChunk_ == curChunk) {
+      numCorruptedEventsInChunk_++;
+    } else {
+      lastBadChunk_ = curChunk;
+      numCorruptedEventsInChunk_ = 1;
+    }
+
+    if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
+      // maybe there was an error in reading the file from disk
+      // seek to the beginning of chunk and try again
+      seekToChunk(curChunk);
+    } else {
+      // Just skip ahead to the next chunk if we not already at the last chunk.
+      if (curChunk != (getNumChunks() - 1)) {
+        seekToChunk(curChunk + 1);
+      } else if (readTimeout_ < dur!"hnsecs"(0)) {
+        // We are in tailing mode, wait until there is enough data to start
+        // the next chunk.
+        while(curChunk == (getNumChunks() - 1)) {
+          Thread.sleep(corruptedEventSleepDuration_);
+        }
+        seekToChunk(curChunk + 1);
+      } else {
+        // Pretty hosed at this stage, rewind the file back to the last
+        // successful point and punt on the error.
+        readState_.resetState(readState_.lastDispatchPos_);
+        currentEvent_ = null;
+        currentEventPos_ = 0;
+
+        throw new TTransportException("File corrupted at offset: " ~
+          to!string(offset_ + readState_.lastDispatchPos_),
+          TTransportException.Type.CORRUPTED_DATA);
+      }
+    }
+  }
+
+  string path_;
+  File file_;
+  bool isOpen_;
+  long offset_;
+  ubyte[] currentEvent_;
+  size_t currentEventPos_;
+  ulong chunkSize_;
+  Duration readTimeout_;
+  size_t maxEventSize_;
+
+  // Read buffer – lazily allocated on the first read().
+  ubyte[] readBuffer_;
+  size_t readBufferSize_;
+
+  static struct ReadState {
+    ubyte[] event_;
+    size_t eventLen_;
+    size_t eventPos_;
+
+    // keep track of event size
+    ubyte[4] eventSizeBuff_;
+    ubyte eventSizeBuffPos_;
+    bool readingSize_ = true;
+
+    // read buffer variables
+    size_t bufferPos_;
+    size_t bufferLen_;
+
+    // last successful dispatch point
+    size_t lastDispatchPos_;
+
+    void resetState(size_t lastDispatchPos) {
+      readingSize_ = true;
+      eventSizeBuffPos_ = 0;
+      lastDispatchPos_ = lastDispatchPos;
+    }
+
+    void resetAllValues() {
+      resetState(0);
+      bufferPos_ = 0;
+      bufferLen_ = 0;
+      event_ = null;
+    }
+  }
+  ReadState readState_;
+
+  ulong lastBadChunk_;
+  uint maxCorruptedEvents_;
+  uint numCorruptedEventsInChunk_;
+  Duration corruptedEventSleepDuration_;
+}
+
+/**
+ * A transport used to write log files. It can never be read from, calling
+ * read() throws.
+ *
+ * Contrary to the C++ design, explicitly opening the transport/file before
+ * using is necessary to allow manually closing the file without relying on the
+ * object lifetime.
+ */
+final class TFileWriterTransport : TBaseTransport {
+  /**
+   * Creates a new file writer transport.
+   *
+   * Params:
+   *   path = Path of the file to opperate on.
+   */
+  this(string path) {
+    path_ = path;
+
+    chunkSize_ = DEFAULT_CHUNK_SIZE;
+    eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE;
+    ioErrorSleepDuration = DEFAULT_IO_ERROR_SLEEP_DURATION;
+    maxFlushBytes_ = DEFAULT_MAX_FLUSH_BYTES;
+    maxFlushInterval_ = DEFAULT_MAX_FLUSH_INTERVAL;
+  }
+
+  override bool isOpen() @property {
+    return isOpen_;
+  }
+
+  /**
+   * A file writer transport can never be read from.
+   */
+  override bool peek() {
+    return false;
+  }
+
+  override void open() {
+    if (isOpen) return;
+
+    writerThread_ = spawn(
+      &writerThread,
+      path_,
+      chunkSize_,
+      maxFlushBytes_,
+      maxFlushInterval_,
+      ioErrorSleepDuration_
+    );
+    setMaxMailboxSize(writerThread_, eventBufferSize_, OnCrowding.block);
+    isOpen_ = true;
+  }
+
+  /**
+   * Closes the transport, i.e. the underlying file and the writer thread.
+   */
+  override void close() {
+    if (!isOpen) return;
+
+    prioritySend(writerThread_, ShutdownMessage(), thisTid); // FIXME: Should use normal send here.
+    receive((ShutdownMessage msg, Tid tid){});
+    isOpen_ = false;
+  }
+
+  /**
+   * Enqueues the passed slice of data for writing and immediately returns.
+   * write() only blocks if the event buffer has been exhausted.
+   *
+   * The transport must be open when calling this.
+   *
+   * Params:
+   *   buf = Slice of data to write.
+   */
+  override void write(in ubyte[] buf) {
+    enforce(isOpen, new TTransportException(
+      "Cannot write to non-open file.", TTransportException.Type.NOT_OPEN));
+
+    if (buf.empty) {
+      logError("Cannot write empty event, skipping.");
+      return;
+    }
+
+    auto maxSize = chunkSize - EventSize.sizeof;
+    enforce(buf.length <= maxSize, new TTransportException(
+      "Cannot write more than " ~ to!string(maxSize) ~
+      "bytes at once due to chunk size."));
+
+    send(writerThread_, buf.idup);
+  }
+
+  /**
+   * Flushes any pending data to be written.
+   *
+   * The transport must be open when calling this.
+   *
+   * Throws: TTransportException if an error occurs.
+   */
+  override void flush() {
+    enforce(isOpen, new TTransportException(
+      "Cannot flush file if not open.", TTransportException.Type.NOT_OPEN));
+
+    send(writerThread_, FlushMessage(), thisTid);
+    receive((FlushMessage msg, Tid tid){});
+  }
+
+  /**
+   * The size of the chunks the file is divided into, in bytes.
+   *
+   * A single event (write call) never spans multiple chunks – this
+   * effectively limits the event size to chunkSize - EventSize.sizeof.
+   */
+  ulong chunkSize() @property {
+    return chunkSize_;
+  }
+
+  /// ditto
+  void chunkSize(ulong value) @property {
+    enforce(!isOpen, new TTransportException(
+      "Cannot set chunk size after TFileWriterTransport has been opened."));
+    chunkSize_ = value;
+  }
+
+  /**
+   * The maximum number of write() calls buffered, or zero for no limit.
+   *
+   * If the buffer is exhausted, write() will block until space becomes
+   * available.
+   */
+  size_t eventBufferSize() @property {
+    return eventBufferSize_;
+  }
+
+  /// ditto
+  void eventBufferSize(size_t value) @property {
+    eventBufferSize_ = value;
+    if (isOpen) {
+      setMaxMailboxSize(writerThread_, value, OnCrowding.throwException);
+    }
+  }
+
+  /// ditto
+  enum DEFAULT_EVENT_BUFFER_SIZE = 10_000;
+
+  /**
+   * Maximum number of bytes buffered before writing and flushing the file
+   * to disk.
+   *
+   * Currently cannot be set after the first call to write().
+   */
+  size_t maxFlushBytes() @property {
+    return maxFlushBytes_;
+  }
+
+  /// ditto
+  void maxFlushBytes(size_t value) @property {
+    maxFlushBytes_ = value;
+    if (isOpen) {
+      send(writerThread_, FlushBytesMessage(value));
+    }
+  }
+
+  /// ditto
+  enum DEFAULT_MAX_FLUSH_BYTES = 1000 * 1024;
+
+  /**
+   * Maximum interval between flushing the file to disk.
+   *
+   * Currenlty cannot be set after the first call to write().
+   */
+  Duration maxFlushInterval() @property {
+    return maxFlushInterval_;
+  }
+
+  /// ditto
+  void maxFlushInterval(Duration value) @property {
+    maxFlushInterval_ = value;
+    if (isOpen) {
+      send(writerThread_, FlushIntervalMessage(value));
+    }
+  }
+
+  /// ditto
+  enum DEFAULT_MAX_FLUSH_INTERVAL = dur!"seconds"(3);
+
+  /**
+   * When the writer thread encounteres an I/O error, it goes pauses for a
+   * short time before trying to reopen the output file. This controls the
+   * sleep duration.
+   */
+  Duration ioErrorSleepDuration() @property {
+    return ioErrorSleepDuration_;
+  }
+
+  /// ditto
+  void ioErrorSleepDuration(Duration value) @property {
+    ioErrorSleepDuration_ = value;
+    if (isOpen) {
+      send(writerThread_, FlushIntervalMessage(value));
+    }
+  }
+
+  /// ditto
+  enum DEFAULT_IO_ERROR_SLEEP_DURATION = dur!"msecs"(500);
+
+private:
+  string path_;
+  ulong chunkSize_;
+  size_t eventBufferSize_;
+  Duration ioErrorSleepDuration_;
+  size_t maxFlushBytes_;
+  Duration maxFlushInterval_;
+  bool isOpen_;
+  Tid writerThread_;
+}
+
+private {
+  // Signals that the file should be flushed on disk. Sent to the writer
+  // thread and sent back along with the tid for confirmation.
+  struct FlushMessage {}
+
+  // Signals that the writer thread should close the file and shut down. Sent
+  // to the writer thread and sent back along with the tid for confirmation.
+  struct ShutdownMessage {}
+
+  struct FlushBytesMessage {
+    size_t value;
+  }
+
+  struct FlushIntervalMessage {
+    Duration value;
+  }
+
+  struct IoErrorSleepDurationMessage {
+    Duration value;
+  }
+
+  void writerThread(
+    string path,
+    ulong chunkSize,
+    size_t maxFlushBytes,
+    Duration maxFlushInterval,
+    Duration ioErrorSleepDuration
+  ) {
+    bool errorOpening;
+    File file;
+    ulong offset;
+    try {
+      // Open file in appending and binary mode.
+      file = File(path, "ab");
+      offset = file.tell();
+    } catch (Exception e) {
+      logError("Error on opening output file in writer thread: %s", e);
+      errorOpening = true;
+    }
+
+    auto flushTimer = StopWatch(AutoStart.yes);
+    size_t unflushedByteCount;
+
+    Tid shutdownRequestTid;
+    bool shutdownRequested;
+    while (true) {
+      if (shutdownRequested) break;
+
+      bool forceFlush;
+      Tid flushRequestTid;
+      receiveTimeout(max(dur!"hnsecs"(0), maxFlushInterval - flushTimer.peek()),
+        (immutable(ubyte)[] data) {
+          while (errorOpening) {
+            logError("Writer thread going to sleep for %s µs due to IO errors",
+              ioErrorSleepDuration.fracSec.usecs);
+
+            // Sleep for ioErrorSleepDuration, being ready to be interrupted
+            // by shutdown requests.
+            auto timedOut = receiveTimeout(ioErrorSleepDuration,
+              (ShutdownMessage msg, Tid tid){ shutdownRequestTid = tid; });
+            if (!timedOut) {
+              // We got a shutdown request, just drop all events and exit the
+              // main loop as to not block application shutdown with our tries
+              // which we must assume to fail.
+              break;
+            }
+
+            try {
+              file = File(path, "ab");
+              unflushedByteCount = 0;
+              errorOpening = false;
+              logError("Output file %s reopened during writer thread error " ~
+                "recovery", path);
+            } catch (Exception e) {
+              logError("Unable to reopen output file %s during writer " ~
+                "thread error recovery", path);
+            }
+          }
+
+          // Make sure the event does not cross the chunk boundary by writing
+          // a padding consisting of zeroes if it would.
+          auto chunk1 = offset / chunkSize;
+          auto chunk2 = (offset + EventSize.sizeof + data.length - 1) / chunkSize;
+
+          if (chunk1 != chunk2) {
+            // TODO: The C++ implementation refetches the offset here to »keep
+            // in sync« – why would this be needed?
+            auto padding = cast(size_t)
+              ((((offset / chunkSize) + 1) * chunkSize) - offset);
+            auto zeroes = new ubyte[padding];
+            file.rawWrite(zeroes);
+            unflushedByteCount += padding;
+            offset += padding;
+          }
+
+          // TODO: 2 syscalls here, is this a problem performance-wise?
+          // Probably abysmal performance on Windows due to rawWrite
+          // implementation.
+          uint len = cast(uint)data.length;
+          file.rawWrite(cast(ubyte[])(&len)[0..1]);
+          file.rawWrite(data);
+
+          auto bytesWritten = EventSize.sizeof + data.length;
+          unflushedByteCount += bytesWritten;
+          offset += bytesWritten;
+        }, (FlushBytesMessage msg) {
+          maxFlushBytes = msg.value;
+        }, (FlushIntervalMessage msg) {
+          maxFlushInterval = msg.value;
+        }, (IoErrorSleepDurationMessage msg) {
+          ioErrorSleepDuration = msg.value;
+        }, (FlushMessage msg, Tid tid) {
+          forceFlush = true;
+          flushRequestTid = tid;
+        }, (OwnerTerminated msg) {
+          shutdownRequested = true;
+        }, (ShutdownMessage msg, Tid tid) {
+          shutdownRequested = true;
+          shutdownRequestTid = tid;
+        }
+      );
+
+      if (errorOpening) continue;
+
+      bool flush;
+      if (forceFlush || shutdownRequested || unflushedByteCount > maxFlushBytes) {
+        flush = true;
+      } else if (cast(Duration)flushTimer.peek() > maxFlushInterval) {
+        if (unflushedByteCount == 0) {
+          // If the flush timer is due, but no data has been written, don't
+          // needlessly fsync, but do reset the timer.
+          flushTimer.reset();
+        } else {
+          flush = true;
+        }
+      }
+
+      if (flush) {
+        file.flush();
+        flushTimer.reset();
+        unflushedByteCount = 0;
+        if (forceFlush) send(flushRequestTid, FlushMessage(), thisTid);
+      }
+    }
+
+    file.close();
+
+    if (shutdownRequestTid != Tid.init) {
+      send(shutdownRequestTid, ShutdownMessage(), thisTid);
+    }
+  }
+}
+
+version (unittest) {
+  import core.memory : GC;
+  import std.file;
+}
+
+unittest {
+  void tryRemove(string fileName) {
+    try {
+      remove(fileName);
+    } catch (Exception) {}
+  }
+
+  immutable fileName = "unittest.dat.tmp";
+  enforce(!exists(fileName), "Unit test output file " ~ fileName ~
+    " already exists.");
+
+  /*
+   * Check the most basic reading/writing operations.
+   */
+  {
+    scope (exit) tryRemove(fileName);
+
+    auto writer = new TFileWriterTransport(fileName);
+    writer.open();
+    scope (exit) writer.close();
+
+    writer.write([1, 2]);
+    writer.write([3, 4]);
+    writer.write([5, 6, 7]);
+    writer.flush();
+
+    auto reader = new TFileReaderTransport(fileName);
+    reader.open();
+    scope (exit) reader.close();
+
+    auto buf = new ubyte[7];
+    reader.readAll(buf);
+    enforce(buf == [1, 2, 3, 4, 5, 6, 7]);
+  }
+
+  /*
+   * Check that chunking works as expected.
+   */
+  {
+    scope (exit) tryRemove(fileName);
+
+    static assert(EventSize.sizeof == 4);
+    enum CHUNK_SIZE = 10;
+
+    // Write some contents to the file.
+    {
+      auto writer = new TFileWriterTransport(fileName);
+      writer.chunkSize = CHUNK_SIZE;
+      writer.open();
+      scope (exit) writer.close();
+
+      writer.write([0xde]);
+      writer.write([0xad]);
+      // Chunk boundary here.
+      writer.write([0xbe]);
+      // The next write doesn't fit in the five bytes remaining, so we expect
+      // padding zero bytes to be written.
+      writer.write([0xef, 0x12]);
+
+      try {
+        writer.write(new ubyte[CHUNK_SIZE]);
+        enforce(false, "Could write event not fitting in a single chunk.");
+      } catch (TTransportException e) {}
+
+      writer.flush();
+    }
+
+    // Check the raw contents of the file to see if chunk padding was written
+    // as expected.
+    auto file = File(fileName, "r");
+    enforce(file.size == 26);
+    auto written = new ubyte[26];
+    file.rawRead(written);
+    enforce(written == [
+      1, 0, 0, 0, 0xde,
+      1, 0, 0, 0, 0xad,
+      1, 0, 0, 0, 0xbe,
+      0, 0, 0, 0, 0,
+      2, 0, 0, 0, 0xef, 0x12
+    ]);
+
+    // Read the data back in, getting all the events at once.
+    {
+      auto reader = new TFileReaderTransport(fileName);
+      reader.chunkSize = CHUNK_SIZE;
+      reader.open();
+      scope (exit) reader.close();
+
+      auto buf = new ubyte[5];
+      reader.readAll(buf);
+      enforce(buf == [0xde, 0xad, 0xbe, 0xef, 0x12]);
+    }
+  }
+
+  /*
+   * Make sure that close() exits "quickly", i.e. that there is no problem
+   * with the worker thread waking up.
+   */
+  {
+    import std.conv : text;
+    enum NUM_ITERATIONS = 1000;
+
+    uint numOver = 0;
+    foreach (n; 0 .. NUM_ITERATIONS) {
+      scope (exit) tryRemove(fileName);
+
+      auto transport = new TFileWriterTransport(fileName);
+      transport.open();
+
+      // Write something so that the writer thread gets started.
+      transport.write(cast(ubyte[])"foo");
+
+      // Every other iteration, also call flush(), just in case that potentially
+      // has any effect on how the writer thread wakes up.
+      if (n & 0x1) {
+        transport.flush();
+      }
+
+      // Time the call to close().
+      auto sw = StopWatch(AutoStart.yes);
+      transport.close();
+      sw.stop();
+
+      // If any attempt takes more than 500ms, treat that as a fatal failure to
+      // avoid looping over a potentially very slow operation.
+      enforce(sw.peek().msecs < 500,
+        text("close() took ", sw.peek().msecs, "ms."));
+
+      // Normally, it takes less than 5ms on my dev box.
+      // However, if the box is heavily loaded, some of the test runs can take
+      // longer. Additionally, on a Windows Server 2008 instance running in
+      // a VirtualBox VM, it has been observed that about a quarter of the runs
+      // takes (217 ± 1) ms, for reasons not yet known.
+      if (sw.peek().msecs > 5) {
+        ++numOver;
+      }
+
+      // Force garbage collection runs every now and then to make sure we
+      // don't run out of OS thread handles.
+      if (!(n % 100)) GC.collect();
+    }
+
+    // Make sure fewer than a third of the runs took longer than 5ms.
+    enforce(numOver < NUM_ITERATIONS / 3,
+      text(numOver, " iterations took more than 10 ms."));
+  }
+}

Added: thrift/trunk/lib/d/src/thrift/transport/framed.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/transport/framed.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/transport/framed.d (added)
+++ thrift/trunk/lib/d/src/thrift/transport/framed.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+module thrift.transport.framed;
+
+import core.bitop : bswap;
+import std.algorithm : min;
+import std.array : empty;
+import std.exception : enforce;
+import thrift.transport.base;
+
+/**
+ * Framed transport.
+ *
+ * All writes go into an in-memory buffer until flush is called, at which point
+ * the transport writes the length of the entire binary chunk followed by the
+ * data payload. The receiver on the other end then performs a single
+ * »fixed-length« read to get the whole message off the wire.
+ */
+final class TFramedTransport : TBaseTransport {
+  /**
+   * Constructs a new framed transport.
+   *
+   * Params:
+   *   transport = The underlying transport to wrap.
+   */
+  this(TTransport transport) {
+    transport_ = transport;
+  }
+
+  /**
+   * Returns the wrapped transport.
+   */
+  TTransport underlyingTransport() @property {
+    return transport_;
+  }
+
+  override bool isOpen() @property {
+    return transport_.isOpen;
+  }
+
+  override bool peek() {
+    return rBuf_.length > 0 || transport_.peek();
+  }
+
+  override void open() {
+    transport_.open();
+  }
+
+  override void close() {
+    flush();
+    transport_.close();
+  }
+
+  /**
+   * Attempts to read data into the given buffer, stopping when the buffer is
+   * exhausted or the frame end is reached.
+   *
+   * TODO: Contrary to the C++ implementation, this never does cross-frame
+   * reads – is there actually a valid use case for that?
+   *
+   * Params:
+   *   buf = Slice to use as buffer.
+   *
+   * Returns: How many bytes were actually read.
+   *
+   * Throws: TTransportException if an error occurs.
+   */
+  override size_t read(ubyte[] buf) {
+    // If the buffer is empty, read a new frame off the wire.
+    if (rBuf_.empty) {
+      bool gotFrame = readFrame();
+      if (!gotFrame) return 0;
+    }
+
+    auto size = min(rBuf_.length, buf.length);
+    buf[0..size] = rBuf_[0..size];
+    rBuf_ = rBuf_[size..$];
+    return size;
+  }
+
+  override void write(in ubyte[] buf) {
+    wBuf_ ~= buf;
+  }
+
+  override void flush() {
+    if (wBuf_.empty) return;
+
+    // Properly reset the write buffer even some of the protocol operations go
+    // wrong.
+    scope (exit) {
+      wBuf_.length = 0;
+      wBuf_.assumeSafeAppend();
+    }
+
+    int len = bswap(cast(int)wBuf_.length);
+    transport_.write(cast(ubyte[])(&len)[0..1]);
+    transport_.write(wBuf_);
+    transport_.flush();
+  }
+
+  override const(ubyte)[] borrow(ubyte* buf, size_t len) {
+    if (len <= rBuf_.length) {
+      return rBuf_;
+    } else {
+      // Don't try attempting cross-frame borrows, trying that does not make
+      // much sense anyway.
+      return null;
+    }
+  }
+
+  override void consume(size_t len) {
+    enforce(len <= rBuf_.length, new TTransportException(
+      "Invalid consume length", TTransportException.Type.BAD_ARGS));
+    rBuf_ = rBuf_[len .. $];
+  }
+
+private:
+  bool readFrame() {
+    // Read the size of the next frame. We can't use readAll() since that
+    // always throws an exception on EOF, but want to throw an exception only
+    // if EOF occurs after partial size data.
+    int size;
+    size_t size_read;
+    while (size_read < size.sizeof) {
+      auto data = (cast(ubyte*)&size)[size_read..size.sizeof];
+      auto read = transport_.read(data);
+      if (read == 0) {
+        if (size_read == 0) {
+          // EOF before any data was read.
+          return false;
+        } else {
+          // EOF after a partial frame header – illegal.
+          throw new TTransportException(
+            "No more data to read after partial frame header",
+            TTransportException.Type.END_OF_FILE
+          );
+        }
+      }
+      size_read += read;
+    }
+
+    size = bswap(size);
+    enforce(size >= 0, new TTransportException("Frame size has negative value",
+      TTransportException.Type.CORRUPTED_DATA));
+
+    // TODO: Benchmark this.
+    rBuf_.length = size;
+    rBuf_.assumeSafeAppend();
+
+    transport_.readAll(rBuf_);
+    return true;
+  }
+
+  TTransport transport_;
+  ubyte[] rBuf_;
+  ubyte[] wBuf_;
+}
+
+/**
+ * Wraps given transports into TFramedTransports.
+ */
+alias TWrapperTransportFactory!TFramedTransport TFramedTransportFactory;
+
+version (unittest) {
+  import std.random : Mt19937, uniform;
+  import thrift.transport.memory;
+}
+
+// Some basic random testing, always starting with the same seed for
+// deterministic unit test results – more tests in transport_test.
+unittest {
+  auto randGen = Mt19937(42);
+
+  // 32 kiB of data to work with.
+  auto data = new ubyte[1 << 15];
+  foreach (ref b; data) {
+    b = uniform!"[]"(cast(ubyte)0, cast(ubyte)255, randGen);
+  }
+
+  // Generate a list of chunk sizes to split the data into. A uniform
+  // distribution is not quite realistic, but std.random doesn't have anything
+  // else yet.
+  enum MAX_FRAME_LENGTH = 512;
+  auto chunkSizesList = new size_t[][2];
+  foreach (ref chunkSizes; chunkSizesList) {
+    size_t sum;
+    while (true) {
+      auto curLen = uniform(0, MAX_FRAME_LENGTH, randGen);
+      sum += curLen;
+      if (sum > data.length) break;
+      chunkSizes ~= curLen;
+    }
+  }
+  chunkSizesList ~= [data.length]; // Also test whole chunk at once.
+
+  // Test writing data.
+  {
+    foreach (chunkSizes; chunkSizesList) {
+      auto buf = new TMemoryBuffer;
+      auto framed = new TFramedTransport(buf);
+
+      auto remainingData = data;
+      foreach (chunkSize; chunkSizes) {
+        framed.write(remainingData[0..chunkSize]);
+        remainingData = remainingData[chunkSize..$];
+      }
+      framed.flush();
+
+      auto writtenData = data[0..($ - remainingData.length)];
+      auto actualData = buf.getContents();
+
+      // Check frame size.
+      int frameSize = bswap((cast(int[])(actualData[0..int.sizeof]))[0]);
+      enforce(frameSize == writtenData.length);
+
+      // Check actual data.
+      enforce(actualData[int.sizeof..$] == writtenData);
+    }
+  }
+
+  // Test reading data.
+  {
+    foreach (chunkSizes; chunkSizesList) {
+      auto buf = new TMemoryBuffer;
+
+      auto size = bswap(cast(int)data.length);
+      buf.write(cast(ubyte[])(&size)[0..1]);
+      buf.write(data);
+
+      auto framed = new TFramedTransport(buf);
+      ubyte[] readData;
+      readData.reserve(data.length);
+      foreach (chunkSize; chunkSizes) {
+        // This should work with read because we have one huge frame.
+        auto oldReadLen = readData.length;
+        readData.length += chunkSize;
+        framed.read(readData[oldReadLen..$]);
+      }
+
+      enforce(readData == data[0..readData.length]);
+    }
+  }
+
+  // Test combined reading/writing of multiple frames.
+  foreach (flushProbability; [1, 2, 4, 8, 16, 32]) {
+    foreach (chunkSizes; chunkSizesList) {
+      auto buf = new TMemoryBuffer;
+      auto framed = new TFramedTransport(buf);
+
+      size_t[] frameSizes;
+
+      // Write the data.
+      size_t frameSize;
+      auto remainingData = data;
+      foreach (chunkSize; chunkSizes) {
+        framed.write(remainingData[0..chunkSize]);
+        remainingData = remainingData[chunkSize..$];
+
+        frameSize += chunkSize;
+        if (frameSize > 0 && uniform(0, flushProbability, randGen) == 0) {
+          frameSizes ~= frameSize;
+          frameSize = 0;
+          framed.flush();
+        }
+      }
+      if (frameSize > 0) {
+        frameSizes ~= frameSize;
+        frameSize = 0;
+        framed.flush();
+      }
+
+      // Read it back.
+      auto readData = new ubyte[data.length - remainingData.length];
+      auto remainToRead = readData;
+      foreach (fSize; frameSizes) {
+        // We are exploiting an implementation detail of TFramedTransport:
+        // The read buffer starts empty and it will never return more than one
+        // frame per read, so by just requesting all of the data, we should
+        // always get exactly one frame.
+        auto got = framed.read(remainToRead);
+        enforce(got == fSize);
+        remainToRead = remainToRead[fSize..$];
+      }
+
+      enforce(remainToRead.empty);
+      enforce(readData == data[0..readData.length]);
+    }
+  }
+}
+
+// Test flush()ing an empty buffer.
+unittest {
+  auto buf = new TMemoryBuffer();
+  auto framed = new TFramedTransport(buf);
+  immutable out1 = [0, 0, 0, 1, 'a'];
+  immutable out2 = [0, 0, 0, 1, 'a', 0, 0, 0, 2, 'b', 'c'];
+
+  framed.flush();
+  enforce(buf.getContents() == []);
+  framed.flush();
+  framed.flush();
+  enforce(buf.getContents() == []);
+  framed.write(cast(ubyte[])"a");
+  enforce(buf.getContents() == []);
+  framed.flush();
+  enforce(buf.getContents() == out1);
+  framed.flush();
+  framed.flush();
+  enforce(buf.getContents() == out1);
+  framed.write(cast(ubyte[])"bc");
+  enforce(buf.getContents() == out1);
+  framed.flush();
+  enforce(buf.getContents() == out2);
+  framed.flush();
+  framed.flush();
+  enforce(buf.getContents() == out2);
+}

Added: thrift/trunk/lib/d/src/thrift/transport/http.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/transport/http.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/transport/http.d (added)
+++ thrift/trunk/lib/d/src/thrift/transport/http.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * HTTP tranpsort implementation, modelled after the C++ one.
+ *
+ * Unfortunately, libcurl is quite heavyweight and supports only client-side
+ * applications. This is an implementation of the basic HTTP/1.1 parts
+ * supporting HTTP 100 Continue, chunked transfer encoding, keepalive, etc.
+ */
+module thrift.transport.http;
+
+import std.algorithm : canFind, countUntil, endsWith, findSplit, min, startsWith;
+import std.ascii : toLower;
+import std.array : empty;
+import std.conv : parse, to;
+import std.datetime : Clock, UTC;
+import std.string : stripLeft;
+import thrift.base : VERSION;
+import thrift.transport.base;
+import thrift.transport.memory;
+import thrift.transport.socket;
+
+/**
+ * Base class for both client- and server-side HTTP transports.
+ */
+abstract class THttpTransport : TBaseTransport {
+  this(TTransport transport) {
+    transport_ = transport;
+    readHeaders_ = true;
+    httpBuf_ = new ubyte[HTTP_BUFFER_SIZE];
+    httpBufRemaining_ = httpBuf_[0 .. 0];
+    readBuffer_ = new TMemoryBuffer;
+    writeBuffer_ = new TMemoryBuffer;
+  }
+
+  override bool isOpen() {
+    return transport_.isOpen();
+  }
+
+  override bool peek() {
+    return transport_.peek();
+  }
+
+  override void open() {
+    transport_.open();
+  }
+
+  override void close() {
+    transport_.close();
+  }
+
+  override size_t read(ubyte[] buf) {
+    if (!readBuffer_.peek()) {
+      readBuffer_.reset();
+
+      if (!refill()) return 0;
+
+      if (readHeaders_) {
+        readHeaders();
+      }
+
+      size_t got;
+      if (chunked_) {
+        got = readChunked();
+      } else {
+        got = readContent(contentLength_);
+      }
+      readHeaders_ = true;
+
+      if (got == 0) return 0;
+    }
+    return readBuffer_.read(buf);
+  }
+
+  override size_t readEnd() {
+    // Read any pending chunked data (footers etc.)
+    if (chunked_) {
+      while (!chunkedDone_) {
+        readChunked();
+      }
+    }
+    return 0;
+  }
+
+  override void write(in ubyte[] buf) {
+    writeBuffer_.write(buf);
+  }
+
+  override void flush() {
+    auto data = writeBuffer_.getContents();
+    string header = getHeader(data.length);
+
+    transport_.write(cast(const(ubyte)[]) header);
+    transport_.write(data);
+    transport_.flush();
+
+    // Reset the buffer and header variables.
+    writeBuffer_.reset();
+    readHeaders_ = true;
+  }
+
+  /**
+   * The size of the buffer to read HTTP requests into, in bytes. Will expand
+   * as required.
+   */
+  enum HTTP_BUFFER_SIZE = 1024;
+
+protected:
+  abstract string getHeader(size_t dataLength);
+  abstract bool parseStatusLine(const(ubyte)[] status);
+
+  void parseHeader(const(ubyte)[] header) {
+    auto split = findSplit(header, [':']);
+    if (split[1].empty) {
+      // No colon found.
+      return;
+    }
+
+    static bool compToLower(ubyte a, ubyte b) {
+      return a == toLower(cast(char)b);
+    }
+
+    if (startsWith!compToLower(split[0], cast(ubyte[])"transfer-encoding")) {
+      if (endsWith!compToLower(split[2], cast(ubyte[])"chunked")) {
+        chunked_ = true;
+      }
+    } else if (startsWith!compToLower(split[0], cast(ubyte[])"content-length")) {
+      chunked_ = false;
+      auto lengthString = stripLeft(cast(const(char)[])split[2]);
+      contentLength_ = parse!size_t(lengthString);
+    }
+  }
+
+private:
+  ubyte[] readLine() {
+    while (true) {
+      auto split = findSplit(httpBufRemaining_, cast(ubyte[])"\r\n");
+
+      if (split[1].empty) {
+        // No CRLF yet, move whatever we have now to front and refill.
+        if (httpBufRemaining_.empty) {
+          httpBufRemaining_ = httpBuf_[0 .. 0];
+        } else {
+          httpBuf_[0 .. httpBufRemaining_.length] = httpBufRemaining_;
+          httpBufRemaining_ = httpBuf_[0 .. httpBufRemaining_.length];
+        }
+
+        if (!refill()) {
+          auto buf = httpBufRemaining_;
+          httpBufRemaining_ = httpBufRemaining_[$ - 1 .. $ - 1];
+          return buf;
+        }
+      } else {
+        // Set the remaining buffer to the part after \r\n and return the part
+        // (line) before it.
+        httpBufRemaining_ = split[2];
+        return split[0];
+      }
+    }
+  }
+
+  void readHeaders() {
+    // Initialize headers state variables
+    contentLength_ = 0;
+    chunked_ = false;
+    chunkedDone_ = false;
+    chunkSize_ = 0;
+
+    // Control state flow
+    bool statusLine = true;
+    bool finished;
+
+    // Loop until headers are finished
+    while (true) {
+      auto line = readLine();
+
+      if (line.length == 0) {
+        if (finished) {
+          readHeaders_ = false;
+          return;
+        } else {
+          // Must have been an HTTP 100, keep going for another status line
+          statusLine = true;
+        }
+      } else {
+        if (statusLine) {
+          statusLine = false;
+          finished = parseStatusLine(line);
+        } else {
+          parseHeader(line);
+        }
+      }
+    }
+  }
+
+  size_t readChunked() {
+    size_t length;
+
+    auto line = readLine();
+    size_t chunkSize;
+    try {
+      auto charLine = cast(char[])line;
+      chunkSize = parse!size_t(charLine, 16);
+    } catch (Exception e) {
+      throw new TTransportException("Invalid chunk size: " ~ to!string(line),
+        TTransportException.Type.CORRUPTED_DATA);
+    }
+
+    if (chunkSize == 0) {
+      readChunkedFooters();
+    } else {
+      // Read data content
+      length += readContent(chunkSize);
+      // Read trailing CRLF after content
+      readLine();
+    }
+    return length;
+  }
+
+  void readChunkedFooters() {
+    while (true) {
+      auto line = readLine();
+      if (line.length == 0) {
+        chunkedDone_ = true;
+        break;
+      }
+    }
+  }
+
+  size_t readContent(size_t size) {
+    auto need = size;
+    while (need > 0) {
+      if (httpBufRemaining_.length == 0) {
+        // We have given all the data, reset position to head of the buffer.
+        httpBufRemaining_ = httpBuf_[0 .. 0];
+        if (!refill()) return size - need;
+      }
+
+      auto give = min(httpBufRemaining_.length, need);
+      readBuffer_.write(cast(ubyte[])httpBufRemaining_[0 .. give]);
+      httpBufRemaining_ = httpBufRemaining_[give .. $];
+      need -= give;
+    }
+    return size;
+  }
+
+  bool refill() {
+    // Is there a nicer way to do this?
+    auto indexBegin = httpBufRemaining_.ptr - httpBuf_.ptr;
+    auto indexEnd = indexBegin + httpBufRemaining_.length;
+
+    if (httpBuf_.length - indexEnd <= (httpBuf_.length / 4)) {
+      httpBuf_.length *= 2;
+    }
+
+    // Read more data.
+    auto got = transport_.read(cast(ubyte[])httpBuf_[indexEnd .. $]);
+    if (got == 0) return false;
+    httpBufRemaining_ = httpBuf_[indexBegin .. indexEnd + got];
+    return true;
+  }
+
+  TTransport transport_;
+
+  TMemoryBuffer writeBuffer_;
+  TMemoryBuffer readBuffer_;
+
+  bool readHeaders_;
+  bool chunked_;
+  bool chunkedDone_;
+  size_t chunkSize_;
+  size_t contentLength_;
+
+  ubyte[] httpBuf_;
+  ubyte[] httpBufRemaining_;
+}
+
+/**
+ * HTTP client transport.
+ */
+final class TClientHttpTransport : THttpTransport {
+  /**
+   * Constructs a client http transport operating on the passed underlying
+   * transport.
+   *
+   * Params:
+   *   transport = The underlying transport used for the actual I/O.
+   *   host = The HTTP host string.
+   *   path = The HTTP path string.
+   */
+  this(TTransport transport, string host, string path) {
+    super(transport);
+    host_ = host;
+    path_ = path;
+  }
+
+  /**
+   * Convenience overload for constructing a client HTTP transport using a
+   * TSocket connecting to the specified host and port.
+   *
+   * Params:
+   *   host = The server to connect to, also used as HTTP host string.
+   *   port = The port to connect to.
+   *   path = The HTTP path string.
+   */
+  this(string host, ushort port, string path) {
+    this(new TSocket(host, port), host, path);
+  }
+
+protected:
+  override string getHeader(size_t dataLength) {
+    return "POST " ~ path_ ~ " HTTP/1.1\r\n" ~
+      "Host: " ~ host_ ~ "\r\n" ~
+      "Content-Type: application/x-thrift\r\n" ~
+      "Content-Length: " ~ to!string(dataLength) ~ "\r\n" ~
+      "Accept: application/x-thrift\r\n"
+      "User-Agent: Thrift/" ~ VERSION ~ " (D/TClientHttpTransport)\r\n" ~
+      "\r\n";
+  }
+
+  override bool parseStatusLine(const(ubyte)[] status) {
+    // HTTP-Version SP Status-Code SP Reason-Phrase CRLF
+    auto firstSplit = findSplit(status, [' ']);
+    if (firstSplit[1].empty) {
+      throw new TTransportException("Bad status: " ~ to!string(status),
+        TTransportException.Type.CORRUPTED_DATA);
+    }
+
+    auto codeReason = firstSplit[2][countUntil!"a != b"(firstSplit[2], ' ') .. $];
+    auto secondSplit = findSplit(codeReason, [' ']);
+    if (secondSplit[1].empty) {
+      throw new TTransportException("Bad status: " ~ to!string(status),
+        TTransportException.Type.CORRUPTED_DATA);
+    }
+
+    if (secondSplit[0] == "200") {
+      // HTTP 200 = OK, we got the response
+      return true;
+    } else if (secondSplit[0] == "100") {
+      // HTTP 100 = continue, just keep reading
+      return false;
+    }
+
+    throw new TTransportException("Bad status (unhandled status code): " ~
+      to!string(cast(const(char[]))status), TTransportException.Type.CORRUPTED_DATA);
+  }
+
+private:
+  string host_;
+  string path_;
+}
+
+/**
+ * HTTP server transport.
+ */
+final class TServerHttpTransport : THttpTransport {
+  /**
+   * Constructs a new instance.
+   *
+   * Param:
+   *   transport = The underlying transport used for the actual I/O.
+   */
+  this(TTransport transport) {
+    super(transport);
+  }
+
+protected:
+  override string getHeader(size_t dataLength) {
+    return "HTTP/1.1 200 OK\r\n" ~
+      "Date: " ~ getRFC1123Time() ~ "\r\n" ~
+      "Server: Thrift/" ~ VERSION ~ "\r\n" ~
+      "Content-Type: application/x-thrift\r\n" ~
+      "Content-Length: " ~ to!string(dataLength) ~ "\r\n" ~
+      "Connection: Keep-Alive\r\n" ~
+      "\r\n";
+  }
+
+  override bool parseStatusLine(const(ubyte)[] status) {
+    // Method SP Request-URI SP HTTP-Version CRLF.
+    auto split = findSplit(status, [' ']);
+    if (split[1].empty) {
+      throw new TTransportException("Bad status: " ~ to!string(status),
+        TTransportException.Type.CORRUPTED_DATA);
+    }
+
+    auto uriVersion = split[2][countUntil!"a != b"(split[2], ' ') .. $];
+    if (!canFind(uriVersion, ' ')) {
+      throw new TTransportException("Bad status: " ~ to!string(status),
+        TTransportException.Type.CORRUPTED_DATA);
+    }
+
+    if (split[0] == "POST") {
+      // POST method ok, looking for content.
+      return true;
+    }
+
+    throw new TTransportException("Bad status (unsupported method): " ~
+      to!string(status), TTransportException.Type.CORRUPTED_DATA);
+  }
+}
+
+/**
+ * Wraps a transport into a HTTP server protocol.
+ */
+alias TWrapperTransportFactory!TServerHttpTransport TServerHttpTransportFactory;
+
+private {
+  import std.string : format;
+  string getRFC1123Time() {
+    auto sysTime = Clock.currTime(UTC());
+
+    auto dayName = capMemberName(sysTime.dayOfWeek);
+    auto monthName = capMemberName(sysTime.month);
+
+    return format("%s, %s %s %s %s:%s:%s GMT", dayName, sysTime.day,
+      monthName, sysTime.year, sysTime.hour, sysTime.minute, sysTime.second);
+  }
+
+  import std.ascii : toUpper;
+  import std.traits : EnumMembers;
+  string capMemberName(T)(T val) if (is(T == enum)) {
+    foreach (i, e; EnumMembers!T) {
+      enum name = __traits(derivedMembers, T)[i];
+      enum capName = cast(char) toUpper(name[0]) ~ name [1 .. $];
+      if (val == e) {
+        return capName;
+      }
+    }
+    throw new Exception("Not a member of " ~ T.stringof ~ ": " ~ to!string(val));
+  }
+
+  unittest {
+    enum Foo {
+      bar,
+      bAZ
+    }
+
+    import std.exception;
+    enforce(capMemberName(Foo.bar) == "Bar");
+    enforce(capMemberName(Foo.bAZ) == "BAZ");
+  }
+}

Added: thrift/trunk/lib/d/src/thrift/transport/memory.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/transport/memory.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/transport/memory.d (added)
+++ thrift/trunk/lib/d/src/thrift/transport/memory.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift.transport.memory;
+
+import core.exception : onOutOfMemoryError;
+import core.stdc.stdlib : free, realloc;
+import std.algorithm : min;
+import std.conv : text;
+import thrift.transport.base;
+
+/**
+ * A transport that simply reads from and writes to an in-memory buffer. Every
+ * time you call write on it, the data is simply placed into a buffer, and
+ * every time you call read, data is consumed from that buffer.
+ *
+ * Currently, the storage for written data is never reclaimed, even if the
+ * buffer contents have already been read out again.
+ */
+final class TMemoryBuffer : TBaseTransport {
+  /**
+   * Constructs a new memory transport with an empty internal buffer.
+   */
+  this() {}
+
+  /**
+   * Constructs a new memory transport with an empty internal buffer,
+   * reserving space for capacity bytes in advance.
+   *
+   * If the amount of data which will be written to the buffer is already
+   * known on construction, this can better performance over the default
+   * constructor because reallocations can be avoided.
+   *
+   * If the preallocated buffer is exhausted, data can still be written to the
+   * transport, but reallocations will happen.
+   *
+   * Params:
+   *   capacity = Size of the initially reserved buffer (in bytes).
+   */
+  this(size_t capacity) {
+    reset(capacity);
+  }
+
+  /**
+   * Constructs a new memory transport initially containing the passed data.
+   *
+   * For now, the passed buffer is not intelligently used, the data is just
+   * copied to the internal buffer.
+   *
+   * Params:
+   *   buffer = Initial contents available to be read.
+   */
+  this(in ubyte[] contents) {
+    auto size = contents.length;
+    reset(size);
+    buffer_[0 .. size] = contents[];
+    writeOffset_ = size;
+  }
+
+  /**
+   * Destructor, frees the internally allocated buffer.
+   */
+  ~this() {
+    free(buffer_);
+  }
+
+  /**
+   * Returns a read-only view of the current buffer contents.
+   *
+   * Note: For performance reasons, the returned slice is only valid for the
+   * life of this object, and may be invalidated on the next write() call at
+   * will – you might want to immediately .dup it if you intend to keep it
+   * around.
+   */
+  const(ubyte)[] getContents() {
+    return buffer_[readOffset_ .. writeOffset_];
+  }
+
+  /**
+   * A memory transport is always open.
+   */
+  override bool isOpen() @property {
+    return true;
+  }
+
+  override bool peek() {
+    return writeOffset_ - readOffset_ > 0;
+  }
+
+  /**
+   * Opening is a no-op() for a memory buffer.
+   */
+  override void open() {}
+
+  /**
+   * Closing is a no-op() for a memory buffer, it is always open.
+   */
+  override void close() {}
+
+  override size_t read(ubyte[] buf) {
+    auto size = min(buf.length, writeOffset_ - readOffset_);
+    buf[0 .. size] = buffer_[readOffset_ .. readOffset_ + size];
+    readOffset_ += size;
+    return size;
+  }
+
+  /**
+   * Shortcut version of readAll() – using this over TBaseTransport.readAll()
+   * can give us a nice speed increase because gives us a nice speed increase
+   * because it is typically a very hot path during deserialization.
+   */
+  override void readAll(ubyte[] buf) {
+    auto available = writeOffset_ - readOffset_;
+    if (buf.length > available) {
+      throw new TTransportException(text("Cannot readAll() ", buf.length,
+        " bytes of data because only ", available, " bytes are available."),
+        TTransportException.Type.END_OF_FILE);
+    }
+
+    buf[] = buffer_[readOffset_ .. readOffset_ + buf.length];
+    readOffset_ += buf.length;
+  }
+
+  override void write(in ubyte[] buf) {
+    auto need = buf.length;
+    if (bufferLen_ - writeOffset_ < need) {
+      // Exponential growth.
+      auto newLen = bufferLen_ + 1;
+      while (newLen - writeOffset_ < need) newLen *= 2;
+      cRealloc(buffer_, newLen);
+      bufferLen_ = newLen;
+    }
+
+    buffer_[writeOffset_ .. writeOffset_ + need] = buf[];
+    writeOffset_ += need;
+  }
+
+  override const(ubyte)[] borrow(ubyte* buf, size_t len) {
+    if (len <= writeOffset_ - readOffset_) {
+      return buffer_[readOffset_ .. writeOffset_];
+    } else {
+      return null;
+    }
+  }
+
+  override void consume(size_t len) {
+    readOffset_ += len;
+  }
+
+  void reset() {
+    readOffset_ = 0;
+    writeOffset_ = 0;
+  }
+
+  void reset(size_t capacity) {
+    readOffset_ = 0;
+    writeOffset_ = 0;
+    if (bufferLen_ < capacity) {
+      cRealloc(buffer_, capacity);
+      bufferLen_ = capacity;
+    }
+  }
+
+private:
+  ubyte* buffer_;
+  size_t bufferLen_;
+  size_t readOffset_;
+  size_t writeOffset_;
+}
+
+private {
+  void cRealloc(ref ubyte* data, size_t newSize) {
+    auto result = realloc(data, newSize);
+    if (result is null) onOutOfMemoryError();
+    data = cast(ubyte*)result;
+  }
+}
+
+version (unittest) {
+  import std.exception;
+}
+
+unittest {
+  auto a = new TMemoryBuffer(5);
+  immutable(ubyte[]) testData = [1, 2, 3, 4];
+  auto buf = new ubyte[testData.length];
+  enforce(a.isOpen);
+
+  // a should be empty.
+  enforce(!a.peek());
+  enforce(a.read(buf) == 0);
+  assertThrown!TTransportException(a.readAll(buf));
+
+  // Write some data and read it back again.
+  a.write(testData);
+  enforce(a.peek());
+  enforce(a.getContents() == testData);
+  enforce(a.read(buf) == testData.length);
+  enforce(buf == testData);
+
+  // a should be empty again.
+  enforce(!a.peek());
+  enforce(a.read(buf) == 0);
+  assertThrown!TTransportException(a.readAll(buf));
+
+  // Test the constructor which directly accepts initial data.
+  auto b = new TMemoryBuffer(testData);
+  enforce(b.isOpen);
+  enforce(b.peek());
+  enforce(b.getContents() == testData);
+
+  // Test borrow().
+  auto borrowed = b.borrow(null, testData.length);
+  enforce(borrowed == testData);
+  enforce(b.peek());
+  b.consume(testData.length);
+  enforce(!b.peek());
+}

Added: thrift/trunk/lib/d/src/thrift/transport/piped.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/transport/piped.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/transport/piped.d (added)
+++ thrift/trunk/lib/d/src/thrift/transport/piped.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift.transport.piped;
+
+import thrift.transport.base;
+import thrift.transport.memory;
+
+/**
+ * Pipes data request from one transport to another when readEnd()
+ * or writeEnd() is called.
+ *
+ * A typical use case would be to log requests on e.g. a socket to
+ * disk (i. e. pipe them to a TFileWriterTransport).
+ *
+ * The implementation keeps an internal buffer which expands to
+ * hold the whole amount of data read/written until the corresponding *End()
+ * method is called.
+ *
+ * Contrary to the C++ implementation, this doesn't introduce yet another layer
+ * of input/output buffering, all calls are passed to the underlying source
+ * transport verbatim.
+ */
+final class TPipedTransport(Source = TTransport) if (
+  isTTransport!Source
+) : TBaseTransport {
+  /// The default initial buffer size if not explicitly specified, in bytes.
+  enum DEFAULT_INITIAL_BUFFER_SIZE = 512;
+
+  /**
+   * Constructs a new instance.
+   *
+   * By default, only reads are piped (pipeReads = true, pipeWrites = false).
+   *
+   * Params:
+   *   srcTrans = The transport to which all requests are forwarded.
+   *   dstTrans = The transport the read/written data is copied to.
+   *   initialBufferSize = The default size of the read/write buffers, for
+   *     performance tuning.
+   */
+  this(Source srcTrans, TTransport dstTrans,
+    size_t initialBufferSize = DEFAULT_INITIAL_BUFFER_SIZE
+  ) {
+    srcTrans_ = srcTrans;
+    dstTrans_ = dstTrans;
+
+    readBuffer_ = new TMemoryBuffer(initialBufferSize);
+    writeBuffer_ = new TMemoryBuffer(initialBufferSize);
+
+    pipeReads_ = true;
+    pipeWrites_ = false;
+  }
+
+  bool pipeReads() @property const {
+    return pipeReads_;
+  }
+
+  void pipeReads(bool value) @property {
+    if (!value) {
+      readBuffer_.reset();
+    }
+    pipeReads_ = value;
+  }
+
+  bool pipeWrites() @property const {
+    return pipeWrites_;
+  }
+
+  void pipeWrites(bool value) @property {
+    if (!value) {
+      writeBuffer_.reset();
+    }
+    pipeWrites_ = value;
+  }
+
+  override bool isOpen() {
+    return srcTrans_.isOpen();
+  }
+
+  override bool peek() {
+    return srcTrans_.peek();
+  }
+
+  override void open() {
+    srcTrans_.open();
+  }
+
+  override void close() {
+    srcTrans_.close();
+  }
+
+  override size_t read(ubyte[] buf) {
+    auto bytesRead = srcTrans_.read(buf);
+
+    if (pipeReads_) {
+      readBuffer_.write(buf[0 .. bytesRead]);
+    }
+
+    return bytesRead;
+  }
+
+  override size_t readEnd() {
+    if (pipeReads_) {
+      auto data = readBuffer_.getContents();
+      dstTrans_.write(data);
+      dstTrans_.flush();
+      readBuffer_.reset();
+
+      srcTrans_.readEnd();
+
+      // Return data.length instead of the readEnd() result of the source
+      // transports because it might not be available from it.
+      return data.length;
+    }
+
+    return srcTrans_.readEnd();
+  }
+
+  override void write(in ubyte[] buf) {
+    if (pipeWrites_) {
+      writeBuffer_.write(buf);
+    }
+
+    srcTrans_.write(buf);
+  }
+
+  override size_t writeEnd() {
+    if (pipeWrites_) {
+      auto data = writeBuffer_.getContents();
+      dstTrans_.write(data);
+      dstTrans_.flush();
+      writeBuffer_.reset();
+
+      srcTrans_.writeEnd();
+
+      // Return data.length instead of the readEnd() result of the source
+      // transports because it might not be available from it.
+      return data.length;
+    }
+
+    return srcTrans_.writeEnd();
+  }
+
+  override void flush() {
+    srcTrans_.flush();
+  }
+
+private:
+  Source srcTrans_;
+  TTransport dstTrans_;
+
+  TMemoryBuffer readBuffer_;
+  TMemoryBuffer writeBuffer_;
+
+  bool pipeReads_;
+  bool pipeWrites_;
+}
+
+/**
+ * TPipedTransport construction helper to avoid having to explicitly
+ * specify the transport types, i.e. to allow the constructor being called
+ * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)).
+ */
+TPipedTransport!Source tPipedTransport(Source)(
+  Source srcTrans, TTransport dstTrans
+) if (isTTransport!Source) {
+  return new typeof(return)(srcTrans, dstTrans);
+}
+
+version (unittest) {
+  // DMD @@BUG@@: UFCS for std.array.empty doesn't work when import is moved
+  // into unittest block.
+  import std.array;
+  import std.exception : enforce;
+}
+
+unittest {
+  auto underlying = new TMemoryBuffer;
+  auto pipeTarget = new TMemoryBuffer;
+  auto trans = tPipedTransport(underlying, pipeTarget);
+
+  underlying.write(cast(ubyte[])"abcd");
+
+  ubyte[4] buffer;
+  trans.readAll(buffer[0 .. 2]);
+  enforce(buffer[0 .. 2] == "ab");
+  enforce(pipeTarget.getContents().empty);
+
+  trans.readEnd();
+  enforce(pipeTarget.getContents() == "ab");
+  pipeTarget.reset();
+
+  underlying.write(cast(ubyte[])"ef");
+  trans.readAll(buffer[0 .. 2]);
+  enforce(buffer[0 .. 2] == "cd");
+  enforce(pipeTarget.getContents().empty);
+
+  trans.readAll(buffer[0 .. 2]);
+  enforce(buffer[0 .. 2] == "ef");
+  enforce(pipeTarget.getContents().empty);
+
+  trans.readEnd();
+  enforce(pipeTarget.getContents() == "cdef");
+}