You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by jf...@apache.org on 2012/03/22 22:49:13 UTC
svn commit: r1304085 [7/10] - in /thrift/trunk: ./ aclocal/ compiler/cpp/
compiler/cpp/src/generate/ lib/ lib/d/ lib/d/src/ lib/d/src/thrift/
lib/d/src/thrift/async/ lib/d/src/thrift/codegen/
lib/d/src/thrift/internal/ lib/d/src/thrift/internal/test/ l...
Added: thrift/trunk/lib/d/src/thrift/transport/base.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/transport/base.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/transport/base.d (added)
+++ thrift/trunk/lib/d/src/thrift/transport/base.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift.transport.base;
+
+import core.stdc.string : strerror;
+import std.conv : text;
+import thrift.base;
+
+/**
+ * An entity data can be read from and/or written to.
+ *
+ * A TTransport implementation may capable of either reading or writing, but
+ * not necessarily both.
+ */
+interface TTransport {
+ /**
+ * Whether this transport is open.
+ *
+ * If a transport is closed, it can be opened by calling open(), and vice
+ * versa for close().
+ *
+ * While a transport should always be open when trying to read/write data,
+ * the related functions do not necessarily fail when called for a closed
+ * transport. Situations like this could occur e.g. with a wrapper
+ * transport which buffers data when the underlying transport has already
+ * been closed (possibly because the connection was abruptly closed), but
+ * there is still data left to be read in the buffers. This choice has been
+ * made to simplify transport implementations, in terms of both code
+ * complexity and runtime overhead.
+ */
+ bool isOpen() @property;
+
+ /**
+ * Tests whether there is more data to read or if the remote side is
+ * still open.
+ *
+ * A typical use case would be a server checking if it should process
+ * another request on the transport.
+ */
+ bool peek();
+
+ /**
+ * Opens the transport for communications.
+ *
+ * If the transport is already open, nothing happens.
+ *
+ * Throws: TTransportException if opening fails.
+ */
+ void open();
+
+ /**
+ * Closes the transport.
+ *
+ * If the transport is not open, nothing happens.
+ *
+ * Throws: TTransportException if closing fails.
+ */
+ void close();
+
+ /**
+ * Attempts to fill the given buffer by reading data.
+ *
+ * For potentially blocking data sources (e.g. sockets), read() will only
+ * block if no data is available at all. If there is some data available,
+ * but waiting for new data to arrive would be required to fill the whole
+ * buffer, the readily available data will be immediately returned â use
+ * readAll() if you want to wait until the whole buffer is filled.
+ *
+ * Params:
+ * buf = Slice to use as buffer.
+ *
+ * Returns: How many bytes were actually read
+ *
+ * Throws: TTransportException if an error occurs.
+ */
+ size_t read(ubyte[] buf);
+
+ /**
+ * Fills the given buffer by reading data into it, failing if not enough
+ * data is available.
+ *
+ * Params:
+ * buf = Slice to use as buffer.
+ *
+ * Throws: TTransportException if insufficient data is available or reading
+ * fails altogether.
+ */
+ void readAll(ubyte[] buf);
+
+ /**
+ * Must be called by clients when read is completed.
+ *
+ * Implementations can choose to perform a transport-specific action, e.g.
+ * logging the request to a file.
+ *
+ * Returns: The number of bytes read if available, 0 otherwise.
+ */
+ size_t readEnd();
+
+ /**
+ * Writes the passed slice of data.
+ *
+ * Note: You must call flush() to ensure the data is actually written,
+ * and available to be read back in the future. Destroying a TTransport
+ * object does not automatically flush pending data â if you destroy a
+ * TTransport object with written but unflushed data, that data may be
+ * discarded.
+ *
+ * Params:
+ * buf = Slice of data to write.
+ *
+ * Throws: TTransportException if an error occurs.
+ */
+ void write(in ubyte[] buf);
+
+ /**
+ * Must be called by clients when write is completed.
+ *
+ * Implementations can choose to perform a transport-specific action, e.g.
+ * logging the request to a file.
+ *
+ * Returns: The number of bytes written if available, 0 otherwise.
+ */
+ size_t writeEnd();
+
+ /**
+ * Flushes any pending data to be written.
+ *
+ * Must be called before destruction to ensure writes are actually complete,
+ * otherwise pending data may be discarded. Typically used with buffered
+ * transport mechanisms.
+ *
+ * Throws: TTransportException if an error occurs.
+ */
+ void flush();
+
+ /**
+ * Attempts to return a slice of <code>len</code> bytes of incoming data,
+ * possibly copied into buf, not consuming them (i.e.: a later read will
+ * return the same data).
+ *
+ * This method is meant to support protocols that need to read variable-
+ * length fields. They can attempt to borrow the maximum amount of data that
+ * they will need, then <code>consume()</code> what they actually use. Some
+ * transports will not support this method and others will fail occasionally,
+ * so protocols must be prepared to fall back to <code>read()</code> if
+ * borrow fails.
+ *
+ * The transport must be open when calling this.
+ *
+ * Params:
+ * buf = A buffer where the data can be stored if needed, or null to
+ * indicate that the caller is not supplying storage, but would like a
+ * slice of an internal buffer, if available.
+ * len = The number of bytes to borrow.
+ *
+ * Returns: If the borrow succeeds, a slice containing the borrowed data,
+ * null otherwise. The slice will be at least as long as requested, but
+ * may be longer if the returned slice points into an internal buffer
+ * rather than buf.
+ *
+ * Throws: TTransportException if an error occurs.
+ */
+ const(ubyte)[] borrow(ubyte* buf, size_t len) out (result) {
+ // FIXME: Commented out because len gets corrupted in
+ // thrift.transport.memory borrow() unittest.
+ version(none) assert(result is null || result.length >= len,
+ "Buffer returned by borrow() too short.");
+ }
+
+ /**
+ * Remove len bytes from the transport. This must always follow a borrow
+ * of at least len bytes, and should always succeed.
+ *
+ * The transport must be open when calling this.
+ *
+ * Params:
+ * len = Number of bytes to consume.
+ *
+ * Throws: TTransportException if an error occurs.
+ */
+ void consume(size_t len);
+}
+
+/**
+ * Provides basic fall-back implementations of the TTransport interface.
+ */
+class TBaseTransport : TTransport {
+ override bool isOpen() @property {
+ return false;
+ }
+
+ override bool peek() {
+ return isOpen;
+ }
+
+ override void open() {
+ throw new TTransportException("Cannot open TBaseTransport.",
+ TTransportException.Type.NOT_IMPLEMENTED);
+ }
+
+ override void close() {
+ throw new TTransportException("Cannot close TBaseTransport.",
+ TTransportException.Type.NOT_IMPLEMENTED);
+ }
+
+ override size_t read(ubyte[] buf) {
+ throw new TTransportException("Cannot read from a TBaseTransport.",
+ TTransportException.Type.NOT_IMPLEMENTED);
+ }
+
+ override void readAll(ubyte[] buf) {
+ size_t have;
+ while (have < buf.length) {
+ size_t get = read(buf[have..$]);
+ if (get <= 0) {
+ throw new TTransportException(text("Could not readAll() ", buf.length,
+ " bytes as no more data was available after ", have, " bytes."),
+ TTransportException.Type.END_OF_FILE);
+ }
+ have += get;
+ }
+ }
+
+ override size_t readEnd() {
+ // Do nothing by default, not needed by all implementations.
+ return 0;
+ }
+
+ override void write(in ubyte[] buf) {
+ throw new TTransportException("Cannot write to a TBaseTransport.",
+ TTransportException.Type.NOT_IMPLEMENTED);
+ }
+
+ override size_t writeEnd() {
+ // Do nothing by default, not needed by all implementations.
+ return 0;
+ }
+
+ override void flush() {
+ // Do nothing by default, not needed by all implementations.
+ }
+
+ override const(ubyte)[] borrow(ubyte* buf, size_t len) {
+ // borrow() is allowed to fail anyway, so just return null.
+ return null;
+ }
+
+ override void consume(size_t len) {
+ throw new TTransportException("Cannot consume from a TBaseTransport.",
+ TTransportException.Type.NOT_IMPLEMENTED);
+ }
+
+protected:
+ this() {}
+}
+
+/**
+ * Makes a TTransport which wraps a given source transport in some way.
+ *
+ * A common use case is inside server implementations, where the raw client
+ * connections accepted from e.g. TServerSocket need to be wrapped into
+ * buffered or compressed transports.
+ */
+class TTransportFactory {
+ /**
+ * Default implementation does nothing, just returns the transport given.
+ */
+ TTransport getTransport(TTransport trans) {
+ return trans;
+ }
+}
+
+/**
+ * Transport factory for transports which simply wrap an underlying TTransport
+ * without requiring additional configuration.
+ */
+class TWrapperTransportFactory(T) if (
+ is(T : TTransport) && __traits(compiles, new T(TTransport.init))
+) : TTransportFactory {
+ override T getTransport(TTransport trans) {
+ return new T(trans);
+ }
+}
+
+/**
+ * Transport-level exception.
+ */
+class TTransportException : TException {
+ /**
+ * Error codes for the various types of exceptions.
+ */
+ enum Type {
+ UNKNOWN, ///
+ NOT_OPEN, ///
+ TIMED_OUT, ///
+ END_OF_FILE, ///
+ INTERRUPTED, ///
+ BAD_ARGS, ///
+ CORRUPTED_DATA, ///
+ INTERNAL_ERROR, ///
+ NOT_IMPLEMENTED ///
+ }
+
+ ///
+ this(Type type, string file = __FILE__, size_t line = __LINE__, Throwable next = null) {
+ static string msgForType(Type type) {
+ switch (type) {
+ case Type.UNKNOWN: return "Unknown transport exception";
+ case Type.NOT_OPEN: return "Transport not open";
+ case Type.TIMED_OUT: return "Timed out";
+ case Type.END_OF_FILE: return "End of file";
+ case Type.INTERRUPTED: return "Interrupted";
+ case Type.BAD_ARGS: return "Invalid arguments";
+ case Type.CORRUPTED_DATA: return "Corrupted Data";
+ case Type.INTERNAL_ERROR: return "Internal error";
+ case Type.NOT_IMPLEMENTED: return "Not implemented";
+ default: return "(Invalid exception type)";
+ }
+ }
+ this(msgForType(type), type, file, line, next);
+ }
+
+ ///
+ this(string msg, string file = __FILE__, size_t line = __LINE__,
+ Throwable next = null)
+ {
+ this(msg, Type.UNKNOWN, file, line, next);
+ }
+
+ ///
+ this(string msg, Type type, string file = __FILE__, size_t line = __LINE__,
+ Throwable next = null)
+ {
+ super(msg, file, line, next);
+ type_ = type;
+ }
+
+ ///
+ Type type() const nothrow @property {
+ return type_;
+ }
+
+protected:
+ Type type_;
+}
+
+/**
+ * Meta-programming helper returning whether the passed type is a TTransport
+ * implementation.
+ */
+template isTTransport(T) {
+ enum isTTransport = is(T : TTransport);
+}
Added: thrift/trunk/lib/d/src/thrift/transport/buffered.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/transport/buffered.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/transport/buffered.d (added)
+++ thrift/trunk/lib/d/src/thrift/transport/buffered.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift.transport.buffered;
+
+import std.algorithm : min;
+import std.array : empty;
+import std.exception : enforce;
+import thrift.transport.base;
+
+/**
+ * Wraps another transport and buffers reads and writes until the internal
+ * buffers are exhausted, at which point new data is fetched resp. the
+ * accumulated data is written out at once.
+ */
+final class TBufferedTransport : TBaseTransport {
+ /**
+ * Constructs a new instance, using the default buffer sizes.
+ *
+ * Params:
+ * transport = The underlying transport to wrap.
+ */
+ this(TTransport transport) {
+ this(transport, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Constructs a new instance, using the specified buffer size.
+ *
+ * Params:
+ * transport = The underlying transport to wrap.
+ * bufferSize = The size of the read and write buffers to use, in bytes.
+ */
+ this(TTransport transport, size_t bufferSize) {
+ this(transport, bufferSize, bufferSize);
+ }
+
+ /**
+ * Constructs a new instance, using the specified buffer size.
+ *
+ * Params:
+ * transport = The underlying transport to wrap.
+ * readBufferSize = The size of the read buffer to use, in bytes.
+ * writeBufferSize = The size of the write buffer to use, in bytes.
+ */
+ this(TTransport transport, size_t readBufferSize, size_t writeBufferSize) {
+ transport_ = transport;
+ readBuffer_ = new ubyte[readBufferSize];
+ writeBuffer_ = new ubyte[writeBufferSize];
+ writeAvail_ = writeBuffer_;
+ }
+
+ /// The default size of the read/write buffers, in bytes.
+ enum int DEFAULT_BUFFER_SIZE = 512;
+
+ override bool isOpen() @property {
+ return transport_.isOpen();
+ }
+
+ override bool peek() {
+ if (readAvail_.empty) {
+ // If there is nothing available to read, see if we can get something
+ // from the underlying transport.
+ auto bytesRead = transport_.read(readBuffer_);
+ readAvail_ = readBuffer_[0 .. bytesRead];
+ }
+
+ return !readAvail_.empty;
+ }
+
+ override void open() {
+ transport_.open();
+ }
+
+ override void close() {
+ if (!isOpen) return;
+ flush();
+ transport_.close();
+ }
+
+ override size_t read(ubyte[] buf) {
+ if (readAvail_.empty) {
+ // No data left in our buffer, fetch some from the underlying transport.
+
+ if (buf.length > readBuffer_.length) {
+ // If the amount of data requested is larger than our reading buffer,
+ // directly read to the passed buffer. This probably doesn't occur too
+ // often in practice (and even if it does, the underlying transport
+ // probably cannot fulfill the request at once anyway), but it can't
+ // harm to tryâ¦
+ return transport_.read(buf);
+ }
+
+ auto bytesRead = transport_.read(readBuffer_);
+ readAvail_ = readBuffer_[0 .. bytesRead];
+ }
+
+ // Hand over whatever we have.
+ auto give = min(readAvail_.length, buf.length);
+ buf[0 .. give] = readAvail_[0 .. give];
+ readAvail_ = readAvail_[give .. $];
+ return give;
+ }
+
+ /**
+ * Shortcut version of readAll.
+ */
+ override void readAll(ubyte[] buf) {
+ if (readAvail_.length >= buf.length) {
+ buf[] = readAvail_[0 .. buf.length];
+ readAvail_ = readAvail_[buf.length .. $];
+ return;
+ }
+
+ super.readAll(buf);
+ }
+
+ override void write(in ubyte[] buf) {
+ if (writeAvail_.length >= buf.length) {
+ // If the data fits in the buffer, just save it there.
+ writeAvail_[0 .. buf.length] = buf;
+ writeAvail_ = writeAvail_[buf.length .. $];
+ return;
+ }
+
+ // We have to decide if we copy data from buf to our internal buffer, or
+ // just directly write them out. The same considerations about avoiding
+ // syscalls as for C++ apply here.
+ auto bytesAvail = writeAvail_.ptr - writeBuffer_.ptr;
+ if ((bytesAvail + buf.length >= 2 * writeBuffer_.length) || (bytesAvail == 0)) {
+ // We would immediately need two syscalls anyway (or we don't have
+ // anything) in our buffer to write, so just write out both buffers.
+ if (bytesAvail > 0) {
+ transport_.write(writeBuffer_[0 .. bytesAvail]);
+ writeAvail_ = writeBuffer_;
+ }
+
+ transport_.write(buf);
+ return;
+ }
+
+ // Fill up our internal buffer for a write.
+ writeAvail_[] = buf[0 .. writeAvail_.length];
+ auto left = buf[writeAvail_.length .. $];
+ transport_.write(writeBuffer_);
+
+ // Copy the rest into our buffer.
+ writeBuffer_[0 .. left.length] = left[];
+ writeAvail_ = writeBuffer_[left.length .. $];
+ }
+
+ override void flush() {
+ // Write out any data waiting in the write buffer.
+ auto bytesAvail = writeAvail_.ptr - writeBuffer_.ptr;
+ if (bytesAvail > 0) {
+ // Note that we reset writeAvail_ prior to calling the underlying protocol
+ // to make sure the buffer is cleared even if the transport throws an
+ // exception.
+ writeAvail_ = writeBuffer_;
+ transport_.write(writeBuffer_[0 .. bytesAvail]);
+ }
+
+ // Flush the underlying transport.
+ transport_.flush();
+ }
+
+ override const(ubyte)[] borrow(ubyte* buf, size_t len) {
+ if (len <= readAvail_.length) {
+ return readAvail_;
+ }
+ return null;
+ }
+
+ override void consume(size_t len) {
+ enforce(len <= readBuffer_.length, new TTransportException(
+ "Invalid consume length.", TTransportException.Type.BAD_ARGS));
+ readAvail_ = readAvail_[len .. $];
+ }
+
+ /**
+ * The wrapped transport.
+ */
+ TTransport underlyingTransport() @property {
+ return transport_;
+ }
+
+private:
+ TTransport transport_;
+
+ ubyte[] readBuffer_;
+ ubyte[] writeBuffer_;
+
+ ubyte[] readAvail_;
+ ubyte[] writeAvail_;
+}
+
+/**
+ * Wraps given transports into TBufferedTransports.
+ */
+alias TWrapperTransportFactory!TBufferedTransport TBufferedTransportFactory;
Added: thrift/trunk/lib/d/src/thrift/transport/file.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/transport/file.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/transport/file.d (added)
+++ thrift/trunk/lib/d/src/thrift/transport/file.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,1100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Transports for reading from/writing to Thrift »log files«.
+ *
+ * These transports are not »stupid« sources and sinks just reading and
+ * writing bytes from a file verbatim, but organize the contents in the form
+ * of so-called »events«, which refers to the data written between two flush()
+ * calls.
+ *
+ * Chunking is supported, events are guaranteed to never span chunk boundaries.
+ * As a consequence, an event can never be larger than the chunk size. The
+ * chunk size used is not saved with the file, so care has to be taken to make
+ * sure the same chunk size is used for reading and writing.
+ */
+module thrift.transport.file;
+
+import core.thread : Thread;
+import std.array : empty;
+import std.algorithm : min, max;
+import std.concurrency;
+import std.conv : to;
+import std.datetime : AutoStart, dur, Duration, StopWatch;
+import std.exception;
+import std.stdio : File;
+import thrift.base;
+import thrift.transport.base;
+
+/// The default chunk size, in bytes.
+enum DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
+
+/// The type used to represent event sizes in the file.
+alias uint EventSize;
+
+version (BigEndian) {
+ static assert(false,
+ "Little endian byte order is assumed in thrift.transport.file.");
+}
+
+/**
+ * A transport used to read log files. It can never be written to, calling
+ * write() throws.
+ *
+ * Contrary to the C++ design, explicitly opening the transport/file before
+ * using is necessary to allow manually closing the file without relying on the
+ * object lifetime. Otherwise, it's a straight port of the C++ implementation.
+ */
+final class TFileReaderTransport : TBaseTransport {
+ /**
+ * Creates a new file writer transport.
+ *
+ * Params:
+ * path = Path of the file to opperate on.
+ */
+ this(string path) {
+ path_ = path;
+ chunkSize_ = DEFAULT_CHUNK_SIZE;
+ readBufferSize_ = DEFAULT_READ_BUFFER_SIZE;
+ readTimeout_ = DEFAULT_READ_TIMEOUT;
+ corruptedEventSleepDuration_ = DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION;
+ maxEventSize = DEFAULT_MAX_EVENT_SIZE;
+ }
+
+ override bool isOpen() @property {
+ return isOpen_;
+ }
+
+ override bool peek() {
+ if (!isOpen) return false;
+
+ // If there is no event currently processed, try fetching one from the
+ // file.
+ if (!currentEvent_) {
+ currentEvent_ = readEvent();
+
+ if (!currentEvent_) {
+ // Still nothing there, couldn't read a new event.
+ return false;
+ }
+ }
+ // check if there is anything to read
+ return (currentEvent_.length - currentEventPos_) > 0;
+ }
+
+ override void open() {
+ if (isOpen) return;
+ try {
+ file_ = File(path_, "rb");
+ } catch (Exception e) {
+ throw new TTransportException("Error on opening input file.",
+ TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
+ }
+ isOpen_ = true;
+ }
+
+ override void close() {
+ if (!isOpen) return;
+
+ file_.close();
+ isOpen_ = false;
+ readState_.resetAllValues();
+ }
+
+ override size_t read(ubyte[] buf) {
+ enforce(isOpen, new TTransportException(
+ "Cannot read if file is not open.", TTransportException.Type.NOT_OPEN));
+
+ // If there is no event currently processed, try fetching one from the
+ // file.
+ if (!currentEvent_) {
+ currentEvent_ = readEvent();
+
+ if (!currentEvent_) {
+ // Still nothing there, couldn't read a new event.
+ return 0;
+ }
+ }
+
+ auto len = buf.length;
+ auto remaining = currentEvent_.length - currentEventPos_;
+
+ if (remaining <= len) {
+ // If less than the requested length is available, read as much as
+ // possible.
+ buf[0 .. remaining] = currentEvent_[currentEventPos_ .. $];
+ currentEvent_ = null;
+ currentEventPos_ = 0;
+ return remaining;
+ }
+
+ // There will still be data left in the buffer after reading, pass out len
+ // bytes.
+ buf[] = currentEvent_[currentEventPos_ .. currentEventPos_ + len];
+ currentEventPos_ += len;
+ return len;
+ }
+
+ ulong getNumChunks() {
+ enforce(isOpen, new TTransportException(
+ "Cannot get number of chunks if file not open.",
+ TTransportException.Type.NOT_OPEN));
+
+ try {
+ auto fileSize = file_.size();
+ if (fileSize == 0) {
+ // Empty files have no chunks.
+ return 0;
+ }
+ return ((fileSize)/chunkSize_) + 1;
+ } catch (Exception e) {
+ throw new TTransportException("Error getting file size.", __FILE__,
+ __LINE__, e);
+ }
+ }
+
+ ulong getCurChunk() {
+ return offset_ / chunkSize_;
+ }
+
+ void seekToChunk(long chunk) {
+ enforce(isOpen, new TTransportException(
+ "Cannot get number of chunks if file not open.",
+ TTransportException.Type.NOT_OPEN));
+
+ auto numChunks = getNumChunks();
+
+ if (chunk < 0) {
+ // Count negative indices from the end.
+ chunk += numChunks;
+ }
+
+ if (chunk < 0) {
+ logError("Incorrect chunk number for reverse seek, seeking to " ~
+ "beginning instead: %s", chunk);
+ chunk = 0;
+ }
+
+ bool seekToEnd;
+ long minEndOffset;
+ if (chunk >= numChunks) {
+ logError("Trying to seek to non-existing chunk, seeking to " ~
+ "end of file instead: %s", chunk);
+ seekToEnd = true;
+ chunk = numChunks - 1;
+ // this is the min offset to process events till
+ minEndOffset = file_.size();
+ }
+
+ readState_.resetAllValues();
+ currentEvent_ = null;
+
+ try {
+ file_.seek(chunk * chunkSize_);
+ offset_ = chunk * chunkSize_;
+ } catch (Exception e) {
+ throw new TTransportException("Error seeking to chunk", __FILE__,
+ __LINE__, e);
+ }
+
+ if (seekToEnd) {
+ // Never wait on the end of the file for new content, we just want to
+ // find the last one.
+ auto oldReadTimeout = readTimeout_;
+ scope (exit) readTimeout_ = oldReadTimeout;
+ readTimeout_ = dur!"hnsecs"(0);
+
+ // Keep on reading unti the last event at point of seekToChunk call.
+ while ((offset_ + readState_.bufferPos_) < minEndOffset) {
+ if (readEvent() is null) {
+ break;
+ }
+ }
+ }
+ }
+
+ void seekToEnd() {
+ seekToChunk(getNumChunks());
+ }
+
+ /**
+ * The size of the chunks the file is divided into, in bytes.
+ */
+ ulong chunkSize() @property const {
+ return chunkSize_;
+ }
+
+ /// ditto
+ void chunkSize(ulong value) @property {
+ enforce(!isOpen, new TTransportException(
+ "Cannot set chunk size after TFileReaderTransport has been opened."));
+ enforce(value > EventSize.sizeof, new TTransportException("Chunks must " ~
+ "be large enough to accomodate at least a single byte of payload data."));
+ chunkSize_ = value;
+ }
+
+ /**
+ * If positive, wait the specified duration for new data when arriving at
+ * end of file. If negative, wait forever (tailing mode), waking up to check
+ * in the specified interval. If zero, do not wait at all.
+ *
+ * Defaults to 500 ms.
+ */
+ Duration readTimeout() @property const {
+ return readTimeout_;
+ }
+
+ /// ditto
+ void readTimeout(Duration value) @property {
+ readTimeout_ = value;
+ }
+
+ /// ditto
+ enum DEFAULT_READ_TIMEOUT = dur!"msecs"(500);
+
+ /**
+ * Read buffer size, in bytes.
+ *
+ * Defaults to 1 MiB.
+ */
+ size_t readBufferSize() @property const {
+ return readBufferSize_;
+ }
+
+ /// ditto
+ void readBufferSize(size_t value) @property {
+ if (readBuffer_) {
+ enforce(value <= readBufferSize_,
+ "Cannot shrink read buffer after first read.");
+ readBuffer_.length = value;
+ }
+ readBufferSize_ = value;
+ }
+
+ /// ditto
+ enum DEFAULT_READ_BUFFER_SIZE = 1 * 1024 * 1024;
+
+ /**
+ * Arbitrary event size limit, in bytes. Must be smaller than chunk size.
+ *
+ * Defaults to zero (no limit).
+ */
+ size_t maxEventSize() @property const {
+ return maxEventSize_;
+ }
+
+ /// ditto
+ void maxEventSize(size_t value) @property {
+ enforce(value <= chunkSize_ - EventSize.sizeof, "Events cannot span " ~
+ "mutiple chunks, maxEventSize must be smaller than chunk size.");
+ maxEventSize_ = value;
+ }
+
+ /// ditto
+ enum DEFAULT_MAX_EVENT_SIZE = 0;
+
+ /**
+ * The interval at which the thread wakes up to check for the next chunk
+ * in tailing mode.
+ *
+ * Defaults to one second.
+ */
+ Duration corruptedEventSleepDuration() const {
+ return corruptedEventSleepDuration_;
+ }
+
+ /// ditto
+ void corruptedEventSleepDuration(Duration value) {
+ corruptedEventSleepDuration_ = value;
+ }
+
+ /// ditto
+ enum DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION = dur!"seconds"(1);
+
+ /**
+ * The maximum number of corrupted events tolerated before the whole chunk
+ * is skipped.
+ *
+ * Defaults to zero.
+ */
+ uint maxCorruptedEvents() @property const {
+ return maxCorruptedEvents_;
+ }
+
+ /// ditto
+ void maxCorruptedEvents(uint value) @property {
+ maxCorruptedEvents_ = value;
+ }
+
+ /// ditto
+ enum DEFAULT_MAX_CORRUPTED_EVENTS = 0;
+
+private:
+ ubyte[] readEvent() {
+ if (!readBuffer_) {
+ readBuffer_ = new ubyte[readBufferSize_];
+ }
+
+ bool timeoutExpired;
+ while (1) {
+ // read from the file if read buffer is exhausted
+ if (readState_.bufferPos_ == readState_.bufferLen_) {
+ // advance the offset pointer
+ offset_ += readState_.bufferLen_;
+
+ try {
+ // Need to clear eof flag before reading, otherwise tailing a file
+ // does not work.
+ file_.clearerr();
+
+ auto usedBuf = file_.rawRead(readBuffer_);
+ readState_.bufferLen_ = usedBuf.length;
+ } catch (Exception e) {
+ readState_.resetAllValues();
+ throw new TTransportException("Error while reading from file",
+ __FILE__, __LINE__, e);
+ }
+
+ readState_.bufferPos_ = 0;
+ readState_.lastDispatchPos_ = 0;
+
+ if (readState_.bufferLen_ == 0) {
+ // Reached end of file.
+ if (readTimeout_ < dur!"hnsecs"(0)) {
+ // Tailing mode, sleep for the specified duration and try again.
+ Thread.sleep(-readTimeout_);
+ continue;
+ } else if (readTimeout_ == dur!"hnsecs"(0) || timeoutExpired) {
+ // Either no timeout set, or it has already expired.
+ readState_.resetState(0);
+ return null;
+ } else {
+ // Timeout mode, sleep for the specified amount of time and retry.
+ Thread.sleep(readTimeout_);
+ timeoutExpired = true;
+ continue;
+ }
+ }
+ }
+
+ // Attempt to read an event from the buffer.
+ while (readState_.bufferPos_ < readState_.bufferLen_) {
+ if (readState_.readingSize_) {
+ if (readState_.eventSizeBuffPos_ == 0) {
+ if ((offset_ + readState_.bufferPos_)/chunkSize_ !=
+ ((offset_ + readState_.bufferPos_ + 3)/chunkSize_))
+ {
+ readState_.bufferPos_++;
+ continue;
+ }
+ }
+
+ readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
+ readBuffer_[readState_.bufferPos_++];
+
+ if (readState_.eventSizeBuffPos_ == 4) {
+ auto size = (cast(uint[])readState_.eventSizeBuff_)[0];
+
+ if (size == 0) {
+ // This is part of the zero padding between chunks.
+ readState_.resetState(readState_.lastDispatchPos_);
+ continue;
+ }
+
+ // got a valid event
+ readState_.readingSize_ = false;
+ readState_.eventLen_ = size;
+ readState_.eventPos_ = 0;
+
+ // check if the event is corrupted and perform recovery if required
+ if (isEventCorrupted()) {
+ performRecovery();
+ // start from the top
+ break;
+ }
+ }
+ } else {
+ if (!readState_.event_) {
+ readState_.event_ = new ubyte[readState_.eventLen_];
+ }
+
+ // take either the entire event or the remaining bytes in the buffer
+ auto reclaimBuffer = min(readState_.bufferLen_ - readState_.bufferPos_,
+ readState_.eventLen_ - readState_.eventPos_);
+
+ // copy data from read buffer into event buffer
+ readState_.event_[
+ readState_.eventPos_ .. readState_.eventPos_ + reclaimBuffer
+ ] = readBuffer_[
+ readState_.bufferPos_ .. readState_.bufferPos_ + reclaimBuffer
+ ];
+
+ // increment position ptrs
+ readState_.eventPos_ += reclaimBuffer;
+ readState_.bufferPos_ += reclaimBuffer;
+
+ // check if the event has been read in full
+ if (readState_.eventPos_ == readState_.eventLen_) {
+ // Reset the read state and return the completed event.
+ auto completeEvent = readState_.event_;
+ readState_.event_ = null;
+ readState_.resetState(readState_.bufferPos_);
+ return completeEvent;
+ }
+ }
+ }
+ }
+ }
+
+ bool isEventCorrupted() {
+ if ((maxEventSize_ > 0) && (readState_.eventLen_ > maxEventSize_)) {
+ // Event size is larger than user-speficied max-event size
+ logError("Corrupt event read: Event size (%s) greater than max " ~
+ "event size (%s)", readState_.eventLen_, maxEventSize_);
+ return true;
+ } else if (readState_.eventLen_ > chunkSize_) {
+ // Event size is larger than chunk size
+ logError("Corrupt event read: Event size (%s) greater than chunk " ~
+ "size (%s)", readState_.eventLen_, chunkSize_);
+ return true;
+ } else if (((offset_ + readState_.bufferPos_ - EventSize.sizeof) / chunkSize_) !=
+ ((offset_ + readState_.bufferPos_ + readState_.eventLen_ - EventSize.sizeof) / chunkSize_))
+ {
+ // Size indicates that event crosses chunk boundary
+ logError("Read corrupt event. Event crosses chunk boundary. " ~
+ "Event size: %s. Offset: %s", readState_.eventLen_,
+ (offset_ + readState_.bufferPos_ + EventSize.sizeof)
+ );
+
+ return true;
+ }
+
+ return false;
+ }
+
+ void performRecovery() {
+ // perform some kickass recovery
+ auto curChunk = getCurChunk();
+ if (lastBadChunk_ == curChunk) {
+ numCorruptedEventsInChunk_++;
+ } else {
+ lastBadChunk_ = curChunk;
+ numCorruptedEventsInChunk_ = 1;
+ }
+
+ if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
+ // maybe there was an error in reading the file from disk
+ // seek to the beginning of chunk and try again
+ seekToChunk(curChunk);
+ } else {
+ // Just skip ahead to the next chunk if we not already at the last chunk.
+ if (curChunk != (getNumChunks() - 1)) {
+ seekToChunk(curChunk + 1);
+ } else if (readTimeout_ < dur!"hnsecs"(0)) {
+ // We are in tailing mode, wait until there is enough data to start
+ // the next chunk.
+ while(curChunk == (getNumChunks() - 1)) {
+ Thread.sleep(corruptedEventSleepDuration_);
+ }
+ seekToChunk(curChunk + 1);
+ } else {
+ // Pretty hosed at this stage, rewind the file back to the last
+ // successful point and punt on the error.
+ readState_.resetState(readState_.lastDispatchPos_);
+ currentEvent_ = null;
+ currentEventPos_ = 0;
+
+ throw new TTransportException("File corrupted at offset: " ~
+ to!string(offset_ + readState_.lastDispatchPos_),
+ TTransportException.Type.CORRUPTED_DATA);
+ }
+ }
+ }
+
+ string path_;
+ File file_;
+ bool isOpen_;
+ long offset_;
+ ubyte[] currentEvent_;
+ size_t currentEventPos_;
+ ulong chunkSize_;
+ Duration readTimeout_;
+ size_t maxEventSize_;
+
+ // Read buffer â lazily allocated on the first read().
+ ubyte[] readBuffer_;
+ size_t readBufferSize_;
+
+ static struct ReadState {
+ ubyte[] event_;
+ size_t eventLen_;
+ size_t eventPos_;
+
+ // keep track of event size
+ ubyte[4] eventSizeBuff_;
+ ubyte eventSizeBuffPos_;
+ bool readingSize_ = true;
+
+ // read buffer variables
+ size_t bufferPos_;
+ size_t bufferLen_;
+
+ // last successful dispatch point
+ size_t lastDispatchPos_;
+
+ void resetState(size_t lastDispatchPos) {
+ readingSize_ = true;
+ eventSizeBuffPos_ = 0;
+ lastDispatchPos_ = lastDispatchPos;
+ }
+
+ void resetAllValues() {
+ resetState(0);
+ bufferPos_ = 0;
+ bufferLen_ = 0;
+ event_ = null;
+ }
+ }
+ ReadState readState_;
+
+ ulong lastBadChunk_;
+ uint maxCorruptedEvents_;
+ uint numCorruptedEventsInChunk_;
+ Duration corruptedEventSleepDuration_;
+}
+
+/**
+ * A transport used to write log files. It can never be read from, calling
+ * read() throws.
+ *
+ * Contrary to the C++ design, explicitly opening the transport/file before
+ * using is necessary to allow manually closing the file without relying on the
+ * object lifetime.
+ */
+final class TFileWriterTransport : TBaseTransport {
+ /**
+ * Creates a new file writer transport.
+ *
+ * Params:
+ * path = Path of the file to opperate on.
+ */
+ this(string path) {
+ path_ = path;
+
+ chunkSize_ = DEFAULT_CHUNK_SIZE;
+ eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE;
+ ioErrorSleepDuration = DEFAULT_IO_ERROR_SLEEP_DURATION;
+ maxFlushBytes_ = DEFAULT_MAX_FLUSH_BYTES;
+ maxFlushInterval_ = DEFAULT_MAX_FLUSH_INTERVAL;
+ }
+
+ override bool isOpen() @property {
+ return isOpen_;
+ }
+
+ /**
+ * A file writer transport can never be read from.
+ */
+ override bool peek() {
+ return false;
+ }
+
+ override void open() {
+ if (isOpen) return;
+
+ writerThread_ = spawn(
+ &writerThread,
+ path_,
+ chunkSize_,
+ maxFlushBytes_,
+ maxFlushInterval_,
+ ioErrorSleepDuration_
+ );
+ setMaxMailboxSize(writerThread_, eventBufferSize_, OnCrowding.block);
+ isOpen_ = true;
+ }
+
+ /**
+ * Closes the transport, i.e. the underlying file and the writer thread.
+ */
+ override void close() {
+ if (!isOpen) return;
+
+ prioritySend(writerThread_, ShutdownMessage(), thisTid); // FIXME: Should use normal send here.
+ receive((ShutdownMessage msg, Tid tid){});
+ isOpen_ = false;
+ }
+
+ /**
+ * Enqueues the passed slice of data for writing and immediately returns.
+ * write() only blocks if the event buffer has been exhausted.
+ *
+ * The transport must be open when calling this.
+ *
+ * Params:
+ * buf = Slice of data to write.
+ */
+ override void write(in ubyte[] buf) {
+ enforce(isOpen, new TTransportException(
+ "Cannot write to non-open file.", TTransportException.Type.NOT_OPEN));
+
+ if (buf.empty) {
+ logError("Cannot write empty event, skipping.");
+ return;
+ }
+
+ auto maxSize = chunkSize - EventSize.sizeof;
+ enforce(buf.length <= maxSize, new TTransportException(
+ "Cannot write more than " ~ to!string(maxSize) ~
+ "bytes at once due to chunk size."));
+
+ send(writerThread_, buf.idup);
+ }
+
+ /**
+ * Flushes any pending data to be written.
+ *
+ * The transport must be open when calling this.
+ *
+ * Throws: TTransportException if an error occurs.
+ */
+ override void flush() {
+ enforce(isOpen, new TTransportException(
+ "Cannot flush file if not open.", TTransportException.Type.NOT_OPEN));
+
+ send(writerThread_, FlushMessage(), thisTid);
+ receive((FlushMessage msg, Tid tid){});
+ }
+
+ /**
+ * The size of the chunks the file is divided into, in bytes.
+ *
+ * A single event (write call) never spans multiple chunks â this
+ * effectively limits the event size to chunkSize - EventSize.sizeof.
+ */
+ ulong chunkSize() @property {
+ return chunkSize_;
+ }
+
+ /// ditto
+ void chunkSize(ulong value) @property {
+ enforce(!isOpen, new TTransportException(
+ "Cannot set chunk size after TFileWriterTransport has been opened."));
+ chunkSize_ = value;
+ }
+
+ /**
+ * The maximum number of write() calls buffered, or zero for no limit.
+ *
+ * If the buffer is exhausted, write() will block until space becomes
+ * available.
+ */
+ size_t eventBufferSize() @property {
+ return eventBufferSize_;
+ }
+
+ /// ditto
+ void eventBufferSize(size_t value) @property {
+ eventBufferSize_ = value;
+ if (isOpen) {
+ setMaxMailboxSize(writerThread_, value, OnCrowding.throwException);
+ }
+ }
+
+ /// ditto
+ enum DEFAULT_EVENT_BUFFER_SIZE = 10_000;
+
+ /**
+ * Maximum number of bytes buffered before writing and flushing the file
+ * to disk.
+ *
+ * Currently cannot be set after the first call to write().
+ */
+ size_t maxFlushBytes() @property {
+ return maxFlushBytes_;
+ }
+
+ /// ditto
+ void maxFlushBytes(size_t value) @property {
+ maxFlushBytes_ = value;
+ if (isOpen) {
+ send(writerThread_, FlushBytesMessage(value));
+ }
+ }
+
+ /// ditto
+ enum DEFAULT_MAX_FLUSH_BYTES = 1000 * 1024;
+
+ /**
+ * Maximum interval between flushing the file to disk.
+ *
+ * Currenlty cannot be set after the first call to write().
+ */
+ Duration maxFlushInterval() @property {
+ return maxFlushInterval_;
+ }
+
+ /// ditto
+ void maxFlushInterval(Duration value) @property {
+ maxFlushInterval_ = value;
+ if (isOpen) {
+ send(writerThread_, FlushIntervalMessage(value));
+ }
+ }
+
+ /// ditto
+ enum DEFAULT_MAX_FLUSH_INTERVAL = dur!"seconds"(3);
+
+ /**
+ * When the writer thread encounteres an I/O error, it goes pauses for a
+ * short time before trying to reopen the output file. This controls the
+ * sleep duration.
+ */
+ Duration ioErrorSleepDuration() @property {
+ return ioErrorSleepDuration_;
+ }
+
+ /// ditto
+ void ioErrorSleepDuration(Duration value) @property {
+ ioErrorSleepDuration_ = value;
+ if (isOpen) {
+ send(writerThread_, FlushIntervalMessage(value));
+ }
+ }
+
+ /// ditto
+ enum DEFAULT_IO_ERROR_SLEEP_DURATION = dur!"msecs"(500);
+
+private:
+ string path_;
+ ulong chunkSize_;
+ size_t eventBufferSize_;
+ Duration ioErrorSleepDuration_;
+ size_t maxFlushBytes_;
+ Duration maxFlushInterval_;
+ bool isOpen_;
+ Tid writerThread_;
+}
+
+private {
+ // Signals that the file should be flushed on disk. Sent to the writer
+ // thread and sent back along with the tid for confirmation.
+ struct FlushMessage {}
+
+ // Signals that the writer thread should close the file and shut down. Sent
+ // to the writer thread and sent back along with the tid for confirmation.
+ struct ShutdownMessage {}
+
+ struct FlushBytesMessage {
+ size_t value;
+ }
+
+ struct FlushIntervalMessage {
+ Duration value;
+ }
+
+ struct IoErrorSleepDurationMessage {
+ Duration value;
+ }
+
+ void writerThread(
+ string path,
+ ulong chunkSize,
+ size_t maxFlushBytes,
+ Duration maxFlushInterval,
+ Duration ioErrorSleepDuration
+ ) {
+ bool errorOpening;
+ File file;
+ ulong offset;
+ try {
+ // Open file in appending and binary mode.
+ file = File(path, "ab");
+ offset = file.tell();
+ } catch (Exception e) {
+ logError("Error on opening output file in writer thread: %s", e);
+ errorOpening = true;
+ }
+
+ auto flushTimer = StopWatch(AutoStart.yes);
+ size_t unflushedByteCount;
+
+ Tid shutdownRequestTid;
+ bool shutdownRequested;
+ while (true) {
+ if (shutdownRequested) break;
+
+ bool forceFlush;
+ Tid flushRequestTid;
+ receiveTimeout(max(dur!"hnsecs"(0), maxFlushInterval - flushTimer.peek()),
+ (immutable(ubyte)[] data) {
+ while (errorOpening) {
+ logError("Writer thread going to sleep for %s µs due to IO errors",
+ ioErrorSleepDuration.fracSec.usecs);
+
+ // Sleep for ioErrorSleepDuration, being ready to be interrupted
+ // by shutdown requests.
+ auto timedOut = receiveTimeout(ioErrorSleepDuration,
+ (ShutdownMessage msg, Tid tid){ shutdownRequestTid = tid; });
+ if (!timedOut) {
+ // We got a shutdown request, just drop all events and exit the
+ // main loop as to not block application shutdown with our tries
+ // which we must assume to fail.
+ break;
+ }
+
+ try {
+ file = File(path, "ab");
+ unflushedByteCount = 0;
+ errorOpening = false;
+ logError("Output file %s reopened during writer thread error " ~
+ "recovery", path);
+ } catch (Exception e) {
+ logError("Unable to reopen output file %s during writer " ~
+ "thread error recovery", path);
+ }
+ }
+
+ // Make sure the event does not cross the chunk boundary by writing
+ // a padding consisting of zeroes if it would.
+ auto chunk1 = offset / chunkSize;
+ auto chunk2 = (offset + EventSize.sizeof + data.length - 1) / chunkSize;
+
+ if (chunk1 != chunk2) {
+ // TODO: The C++ implementation refetches the offset here to »keep
+ // in sync« â why would this be needed?
+ auto padding = cast(size_t)
+ ((((offset / chunkSize) + 1) * chunkSize) - offset);
+ auto zeroes = new ubyte[padding];
+ file.rawWrite(zeroes);
+ unflushedByteCount += padding;
+ offset += padding;
+ }
+
+ // TODO: 2 syscalls here, is this a problem performance-wise?
+ // Probably abysmal performance on Windows due to rawWrite
+ // implementation.
+ uint len = cast(uint)data.length;
+ file.rawWrite(cast(ubyte[])(&len)[0..1]);
+ file.rawWrite(data);
+
+ auto bytesWritten = EventSize.sizeof + data.length;
+ unflushedByteCount += bytesWritten;
+ offset += bytesWritten;
+ }, (FlushBytesMessage msg) {
+ maxFlushBytes = msg.value;
+ }, (FlushIntervalMessage msg) {
+ maxFlushInterval = msg.value;
+ }, (IoErrorSleepDurationMessage msg) {
+ ioErrorSleepDuration = msg.value;
+ }, (FlushMessage msg, Tid tid) {
+ forceFlush = true;
+ flushRequestTid = tid;
+ }, (OwnerTerminated msg) {
+ shutdownRequested = true;
+ }, (ShutdownMessage msg, Tid tid) {
+ shutdownRequested = true;
+ shutdownRequestTid = tid;
+ }
+ );
+
+ if (errorOpening) continue;
+
+ bool flush;
+ if (forceFlush || shutdownRequested || unflushedByteCount > maxFlushBytes) {
+ flush = true;
+ } else if (cast(Duration)flushTimer.peek() > maxFlushInterval) {
+ if (unflushedByteCount == 0) {
+ // If the flush timer is due, but no data has been written, don't
+ // needlessly fsync, but do reset the timer.
+ flushTimer.reset();
+ } else {
+ flush = true;
+ }
+ }
+
+ if (flush) {
+ file.flush();
+ flushTimer.reset();
+ unflushedByteCount = 0;
+ if (forceFlush) send(flushRequestTid, FlushMessage(), thisTid);
+ }
+ }
+
+ file.close();
+
+ if (shutdownRequestTid != Tid.init) {
+ send(shutdownRequestTid, ShutdownMessage(), thisTid);
+ }
+ }
+}
+
+version (unittest) {
+ import core.memory : GC;
+ import std.file;
+}
+
+unittest {
+ void tryRemove(string fileName) {
+ try {
+ remove(fileName);
+ } catch (Exception) {}
+ }
+
+ immutable fileName = "unittest.dat.tmp";
+ enforce(!exists(fileName), "Unit test output file " ~ fileName ~
+ " already exists.");
+
+ /*
+ * Check the most basic reading/writing operations.
+ */
+ {
+ scope (exit) tryRemove(fileName);
+
+ auto writer = new TFileWriterTransport(fileName);
+ writer.open();
+ scope (exit) writer.close();
+
+ writer.write([1, 2]);
+ writer.write([3, 4]);
+ writer.write([5, 6, 7]);
+ writer.flush();
+
+ auto reader = new TFileReaderTransport(fileName);
+ reader.open();
+ scope (exit) reader.close();
+
+ auto buf = new ubyte[7];
+ reader.readAll(buf);
+ enforce(buf == [1, 2, 3, 4, 5, 6, 7]);
+ }
+
+ /*
+ * Check that chunking works as expected.
+ */
+ {
+ scope (exit) tryRemove(fileName);
+
+ static assert(EventSize.sizeof == 4);
+ enum CHUNK_SIZE = 10;
+
+ // Write some contents to the file.
+ {
+ auto writer = new TFileWriterTransport(fileName);
+ writer.chunkSize = CHUNK_SIZE;
+ writer.open();
+ scope (exit) writer.close();
+
+ writer.write([0xde]);
+ writer.write([0xad]);
+ // Chunk boundary here.
+ writer.write([0xbe]);
+ // The next write doesn't fit in the five bytes remaining, so we expect
+ // padding zero bytes to be written.
+ writer.write([0xef, 0x12]);
+
+ try {
+ writer.write(new ubyte[CHUNK_SIZE]);
+ enforce(false, "Could write event not fitting in a single chunk.");
+ } catch (TTransportException e) {}
+
+ writer.flush();
+ }
+
+ // Check the raw contents of the file to see if chunk padding was written
+ // as expected.
+ auto file = File(fileName, "r");
+ enforce(file.size == 26);
+ auto written = new ubyte[26];
+ file.rawRead(written);
+ enforce(written == [
+ 1, 0, 0, 0, 0xde,
+ 1, 0, 0, 0, 0xad,
+ 1, 0, 0, 0, 0xbe,
+ 0, 0, 0, 0, 0,
+ 2, 0, 0, 0, 0xef, 0x12
+ ]);
+
+ // Read the data back in, getting all the events at once.
+ {
+ auto reader = new TFileReaderTransport(fileName);
+ reader.chunkSize = CHUNK_SIZE;
+ reader.open();
+ scope (exit) reader.close();
+
+ auto buf = new ubyte[5];
+ reader.readAll(buf);
+ enforce(buf == [0xde, 0xad, 0xbe, 0xef, 0x12]);
+ }
+ }
+
+ /*
+ * Make sure that close() exits "quickly", i.e. that there is no problem
+ * with the worker thread waking up.
+ */
+ {
+ import std.conv : text;
+ enum NUM_ITERATIONS = 1000;
+
+ uint numOver = 0;
+ foreach (n; 0 .. NUM_ITERATIONS) {
+ scope (exit) tryRemove(fileName);
+
+ auto transport = new TFileWriterTransport(fileName);
+ transport.open();
+
+ // Write something so that the writer thread gets started.
+ transport.write(cast(ubyte[])"foo");
+
+ // Every other iteration, also call flush(), just in case that potentially
+ // has any effect on how the writer thread wakes up.
+ if (n & 0x1) {
+ transport.flush();
+ }
+
+ // Time the call to close().
+ auto sw = StopWatch(AutoStart.yes);
+ transport.close();
+ sw.stop();
+
+ // If any attempt takes more than 500ms, treat that as a fatal failure to
+ // avoid looping over a potentially very slow operation.
+ enforce(sw.peek().msecs < 500,
+ text("close() took ", sw.peek().msecs, "ms."));
+
+ // Normally, it takes less than 5ms on my dev box.
+ // However, if the box is heavily loaded, some of the test runs can take
+ // longer. Additionally, on a Windows Server 2008 instance running in
+ // a VirtualBox VM, it has been observed that about a quarter of the runs
+ // takes (217 ± 1) ms, for reasons not yet known.
+ if (sw.peek().msecs > 5) {
+ ++numOver;
+ }
+
+ // Force garbage collection runs every now and then to make sure we
+ // don't run out of OS thread handles.
+ if (!(n % 100)) GC.collect();
+ }
+
+ // Make sure fewer than a third of the runs took longer than 5ms.
+ enforce(numOver < NUM_ITERATIONS / 3,
+ text(numOver, " iterations took more than 10 ms."));
+ }
+}
Added: thrift/trunk/lib/d/src/thrift/transport/framed.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/transport/framed.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/transport/framed.d (added)
+++ thrift/trunk/lib/d/src/thrift/transport/framed.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+module thrift.transport.framed;
+
+import core.bitop : bswap;
+import std.algorithm : min;
+import std.array : empty;
+import std.exception : enforce;
+import thrift.transport.base;
+
+/**
+ * Framed transport.
+ *
+ * All writes go into an in-memory buffer until flush is called, at which point
+ * the transport writes the length of the entire binary chunk followed by the
+ * data payload. The receiver on the other end then performs a single
+ * »fixed-length« read to get the whole message off the wire.
+ */
+final class TFramedTransport : TBaseTransport {
+ /**
+ * Constructs a new framed transport.
+ *
+ * Params:
+ * transport = The underlying transport to wrap.
+ */
+ this(TTransport transport) {
+ transport_ = transport;
+ }
+
+ /**
+ * Returns the wrapped transport.
+ */
+ TTransport underlyingTransport() @property {
+ return transport_;
+ }
+
+ override bool isOpen() @property {
+ return transport_.isOpen;
+ }
+
+ override bool peek() {
+ return rBuf_.length > 0 || transport_.peek();
+ }
+
+ override void open() {
+ transport_.open();
+ }
+
+ override void close() {
+ flush();
+ transport_.close();
+ }
+
+ /**
+ * Attempts to read data into the given buffer, stopping when the buffer is
+ * exhausted or the frame end is reached.
+ *
+ * TODO: Contrary to the C++ implementation, this never does cross-frame
+ * reads â is there actually a valid use case for that?
+ *
+ * Params:
+ * buf = Slice to use as buffer.
+ *
+ * Returns: How many bytes were actually read.
+ *
+ * Throws: TTransportException if an error occurs.
+ */
+ override size_t read(ubyte[] buf) {
+ // If the buffer is empty, read a new frame off the wire.
+ if (rBuf_.empty) {
+ bool gotFrame = readFrame();
+ if (!gotFrame) return 0;
+ }
+
+ auto size = min(rBuf_.length, buf.length);
+ buf[0..size] = rBuf_[0..size];
+ rBuf_ = rBuf_[size..$];
+ return size;
+ }
+
+ override void write(in ubyte[] buf) {
+ wBuf_ ~= buf;
+ }
+
+ override void flush() {
+ if (wBuf_.empty) return;
+
+ // Properly reset the write buffer even some of the protocol operations go
+ // wrong.
+ scope (exit) {
+ wBuf_.length = 0;
+ wBuf_.assumeSafeAppend();
+ }
+
+ int len = bswap(cast(int)wBuf_.length);
+ transport_.write(cast(ubyte[])(&len)[0..1]);
+ transport_.write(wBuf_);
+ transport_.flush();
+ }
+
+ override const(ubyte)[] borrow(ubyte* buf, size_t len) {
+ if (len <= rBuf_.length) {
+ return rBuf_;
+ } else {
+ // Don't try attempting cross-frame borrows, trying that does not make
+ // much sense anyway.
+ return null;
+ }
+ }
+
+ override void consume(size_t len) {
+ enforce(len <= rBuf_.length, new TTransportException(
+ "Invalid consume length", TTransportException.Type.BAD_ARGS));
+ rBuf_ = rBuf_[len .. $];
+ }
+
+private:
+ bool readFrame() {
+ // Read the size of the next frame. We can't use readAll() since that
+ // always throws an exception on EOF, but want to throw an exception only
+ // if EOF occurs after partial size data.
+ int size;
+ size_t size_read;
+ while (size_read < size.sizeof) {
+ auto data = (cast(ubyte*)&size)[size_read..size.sizeof];
+ auto read = transport_.read(data);
+ if (read == 0) {
+ if (size_read == 0) {
+ // EOF before any data was read.
+ return false;
+ } else {
+ // EOF after a partial frame header â illegal.
+ throw new TTransportException(
+ "No more data to read after partial frame header",
+ TTransportException.Type.END_OF_FILE
+ );
+ }
+ }
+ size_read += read;
+ }
+
+ size = bswap(size);
+ enforce(size >= 0, new TTransportException("Frame size has negative value",
+ TTransportException.Type.CORRUPTED_DATA));
+
+ // TODO: Benchmark this.
+ rBuf_.length = size;
+ rBuf_.assumeSafeAppend();
+
+ transport_.readAll(rBuf_);
+ return true;
+ }
+
+ TTransport transport_;
+ ubyte[] rBuf_;
+ ubyte[] wBuf_;
+}
+
+/**
+ * Wraps given transports into TFramedTransports.
+ */
+alias TWrapperTransportFactory!TFramedTransport TFramedTransportFactory;
+
+version (unittest) {
+ import std.random : Mt19937, uniform;
+ import thrift.transport.memory;
+}
+
+// Some basic random testing, always starting with the same seed for
+// deterministic unit test results â more tests in transport_test.
+unittest {
+ auto randGen = Mt19937(42);
+
+ // 32 kiB of data to work with.
+ auto data = new ubyte[1 << 15];
+ foreach (ref b; data) {
+ b = uniform!"[]"(cast(ubyte)0, cast(ubyte)255, randGen);
+ }
+
+ // Generate a list of chunk sizes to split the data into. A uniform
+ // distribution is not quite realistic, but std.random doesn't have anything
+ // else yet.
+ enum MAX_FRAME_LENGTH = 512;
+ auto chunkSizesList = new size_t[][2];
+ foreach (ref chunkSizes; chunkSizesList) {
+ size_t sum;
+ while (true) {
+ auto curLen = uniform(0, MAX_FRAME_LENGTH, randGen);
+ sum += curLen;
+ if (sum > data.length) break;
+ chunkSizes ~= curLen;
+ }
+ }
+ chunkSizesList ~= [data.length]; // Also test whole chunk at once.
+
+ // Test writing data.
+ {
+ foreach (chunkSizes; chunkSizesList) {
+ auto buf = new TMemoryBuffer;
+ auto framed = new TFramedTransport(buf);
+
+ auto remainingData = data;
+ foreach (chunkSize; chunkSizes) {
+ framed.write(remainingData[0..chunkSize]);
+ remainingData = remainingData[chunkSize..$];
+ }
+ framed.flush();
+
+ auto writtenData = data[0..($ - remainingData.length)];
+ auto actualData = buf.getContents();
+
+ // Check frame size.
+ int frameSize = bswap((cast(int[])(actualData[0..int.sizeof]))[0]);
+ enforce(frameSize == writtenData.length);
+
+ // Check actual data.
+ enforce(actualData[int.sizeof..$] == writtenData);
+ }
+ }
+
+ // Test reading data.
+ {
+ foreach (chunkSizes; chunkSizesList) {
+ auto buf = new TMemoryBuffer;
+
+ auto size = bswap(cast(int)data.length);
+ buf.write(cast(ubyte[])(&size)[0..1]);
+ buf.write(data);
+
+ auto framed = new TFramedTransport(buf);
+ ubyte[] readData;
+ readData.reserve(data.length);
+ foreach (chunkSize; chunkSizes) {
+ // This should work with read because we have one huge frame.
+ auto oldReadLen = readData.length;
+ readData.length += chunkSize;
+ framed.read(readData[oldReadLen..$]);
+ }
+
+ enforce(readData == data[0..readData.length]);
+ }
+ }
+
+ // Test combined reading/writing of multiple frames.
+ foreach (flushProbability; [1, 2, 4, 8, 16, 32]) {
+ foreach (chunkSizes; chunkSizesList) {
+ auto buf = new TMemoryBuffer;
+ auto framed = new TFramedTransport(buf);
+
+ size_t[] frameSizes;
+
+ // Write the data.
+ size_t frameSize;
+ auto remainingData = data;
+ foreach (chunkSize; chunkSizes) {
+ framed.write(remainingData[0..chunkSize]);
+ remainingData = remainingData[chunkSize..$];
+
+ frameSize += chunkSize;
+ if (frameSize > 0 && uniform(0, flushProbability, randGen) == 0) {
+ frameSizes ~= frameSize;
+ frameSize = 0;
+ framed.flush();
+ }
+ }
+ if (frameSize > 0) {
+ frameSizes ~= frameSize;
+ frameSize = 0;
+ framed.flush();
+ }
+
+ // Read it back.
+ auto readData = new ubyte[data.length - remainingData.length];
+ auto remainToRead = readData;
+ foreach (fSize; frameSizes) {
+ // We are exploiting an implementation detail of TFramedTransport:
+ // The read buffer starts empty and it will never return more than one
+ // frame per read, so by just requesting all of the data, we should
+ // always get exactly one frame.
+ auto got = framed.read(remainToRead);
+ enforce(got == fSize);
+ remainToRead = remainToRead[fSize..$];
+ }
+
+ enforce(remainToRead.empty);
+ enforce(readData == data[0..readData.length]);
+ }
+ }
+}
+
+// Test flush()ing an empty buffer.
+unittest {
+ auto buf = new TMemoryBuffer();
+ auto framed = new TFramedTransport(buf);
+ immutable out1 = [0, 0, 0, 1, 'a'];
+ immutable out2 = [0, 0, 0, 1, 'a', 0, 0, 0, 2, 'b', 'c'];
+
+ framed.flush();
+ enforce(buf.getContents() == []);
+ framed.flush();
+ framed.flush();
+ enforce(buf.getContents() == []);
+ framed.write(cast(ubyte[])"a");
+ enforce(buf.getContents() == []);
+ framed.flush();
+ enforce(buf.getContents() == out1);
+ framed.flush();
+ framed.flush();
+ enforce(buf.getContents() == out1);
+ framed.write(cast(ubyte[])"bc");
+ enforce(buf.getContents() == out1);
+ framed.flush();
+ enforce(buf.getContents() == out2);
+ framed.flush();
+ framed.flush();
+ enforce(buf.getContents() == out2);
+}
Added: thrift/trunk/lib/d/src/thrift/transport/http.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/transport/http.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/transport/http.d (added)
+++ thrift/trunk/lib/d/src/thrift/transport/http.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * HTTP tranpsort implementation, modelled after the C++ one.
+ *
+ * Unfortunately, libcurl is quite heavyweight and supports only client-side
+ * applications. This is an implementation of the basic HTTP/1.1 parts
+ * supporting HTTP 100 Continue, chunked transfer encoding, keepalive, etc.
+ */
+module thrift.transport.http;
+
+import std.algorithm : canFind, countUntil, endsWith, findSplit, min, startsWith;
+import std.ascii : toLower;
+import std.array : empty;
+import std.conv : parse, to;
+import std.datetime : Clock, UTC;
+import std.string : stripLeft;
+import thrift.base : VERSION;
+import thrift.transport.base;
+import thrift.transport.memory;
+import thrift.transport.socket;
+
+/**
+ * Base class for both client- and server-side HTTP transports.
+ */
+abstract class THttpTransport : TBaseTransport {
+ this(TTransport transport) {
+ transport_ = transport;
+ readHeaders_ = true;
+ httpBuf_ = new ubyte[HTTP_BUFFER_SIZE];
+ httpBufRemaining_ = httpBuf_[0 .. 0];
+ readBuffer_ = new TMemoryBuffer;
+ writeBuffer_ = new TMemoryBuffer;
+ }
+
+ override bool isOpen() {
+ return transport_.isOpen();
+ }
+
+ override bool peek() {
+ return transport_.peek();
+ }
+
+ override void open() {
+ transport_.open();
+ }
+
+ override void close() {
+ transport_.close();
+ }
+
+ override size_t read(ubyte[] buf) {
+ if (!readBuffer_.peek()) {
+ readBuffer_.reset();
+
+ if (!refill()) return 0;
+
+ if (readHeaders_) {
+ readHeaders();
+ }
+
+ size_t got;
+ if (chunked_) {
+ got = readChunked();
+ } else {
+ got = readContent(contentLength_);
+ }
+ readHeaders_ = true;
+
+ if (got == 0) return 0;
+ }
+ return readBuffer_.read(buf);
+ }
+
+ override size_t readEnd() {
+ // Read any pending chunked data (footers etc.)
+ if (chunked_) {
+ while (!chunkedDone_) {
+ readChunked();
+ }
+ }
+ return 0;
+ }
+
+ override void write(in ubyte[] buf) {
+ writeBuffer_.write(buf);
+ }
+
+ override void flush() {
+ auto data = writeBuffer_.getContents();
+ string header = getHeader(data.length);
+
+ transport_.write(cast(const(ubyte)[]) header);
+ transport_.write(data);
+ transport_.flush();
+
+ // Reset the buffer and header variables.
+ writeBuffer_.reset();
+ readHeaders_ = true;
+ }
+
+ /**
+ * The size of the buffer to read HTTP requests into, in bytes. Will expand
+ * as required.
+ */
+ enum HTTP_BUFFER_SIZE = 1024;
+
+protected:
+ abstract string getHeader(size_t dataLength);
+ abstract bool parseStatusLine(const(ubyte)[] status);
+
+ void parseHeader(const(ubyte)[] header) {
+ auto split = findSplit(header, [':']);
+ if (split[1].empty) {
+ // No colon found.
+ return;
+ }
+
+ static bool compToLower(ubyte a, ubyte b) {
+ return a == toLower(cast(char)b);
+ }
+
+ if (startsWith!compToLower(split[0], cast(ubyte[])"transfer-encoding")) {
+ if (endsWith!compToLower(split[2], cast(ubyte[])"chunked")) {
+ chunked_ = true;
+ }
+ } else if (startsWith!compToLower(split[0], cast(ubyte[])"content-length")) {
+ chunked_ = false;
+ auto lengthString = stripLeft(cast(const(char)[])split[2]);
+ contentLength_ = parse!size_t(lengthString);
+ }
+ }
+
+private:
+ ubyte[] readLine() {
+ while (true) {
+ auto split = findSplit(httpBufRemaining_, cast(ubyte[])"\r\n");
+
+ if (split[1].empty) {
+ // No CRLF yet, move whatever we have now to front and refill.
+ if (httpBufRemaining_.empty) {
+ httpBufRemaining_ = httpBuf_[0 .. 0];
+ } else {
+ httpBuf_[0 .. httpBufRemaining_.length] = httpBufRemaining_;
+ httpBufRemaining_ = httpBuf_[0 .. httpBufRemaining_.length];
+ }
+
+ if (!refill()) {
+ auto buf = httpBufRemaining_;
+ httpBufRemaining_ = httpBufRemaining_[$ - 1 .. $ - 1];
+ return buf;
+ }
+ } else {
+ // Set the remaining buffer to the part after \r\n and return the part
+ // (line) before it.
+ httpBufRemaining_ = split[2];
+ return split[0];
+ }
+ }
+ }
+
+ void readHeaders() {
+ // Initialize headers state variables
+ contentLength_ = 0;
+ chunked_ = false;
+ chunkedDone_ = false;
+ chunkSize_ = 0;
+
+ // Control state flow
+ bool statusLine = true;
+ bool finished;
+
+ // Loop until headers are finished
+ while (true) {
+ auto line = readLine();
+
+ if (line.length == 0) {
+ if (finished) {
+ readHeaders_ = false;
+ return;
+ } else {
+ // Must have been an HTTP 100, keep going for another status line
+ statusLine = true;
+ }
+ } else {
+ if (statusLine) {
+ statusLine = false;
+ finished = parseStatusLine(line);
+ } else {
+ parseHeader(line);
+ }
+ }
+ }
+ }
+
+ size_t readChunked() {
+ size_t length;
+
+ auto line = readLine();
+ size_t chunkSize;
+ try {
+ auto charLine = cast(char[])line;
+ chunkSize = parse!size_t(charLine, 16);
+ } catch (Exception e) {
+ throw new TTransportException("Invalid chunk size: " ~ to!string(line),
+ TTransportException.Type.CORRUPTED_DATA);
+ }
+
+ if (chunkSize == 0) {
+ readChunkedFooters();
+ } else {
+ // Read data content
+ length += readContent(chunkSize);
+ // Read trailing CRLF after content
+ readLine();
+ }
+ return length;
+ }
+
+ void readChunkedFooters() {
+ while (true) {
+ auto line = readLine();
+ if (line.length == 0) {
+ chunkedDone_ = true;
+ break;
+ }
+ }
+ }
+
+ size_t readContent(size_t size) {
+ auto need = size;
+ while (need > 0) {
+ if (httpBufRemaining_.length == 0) {
+ // We have given all the data, reset position to head of the buffer.
+ httpBufRemaining_ = httpBuf_[0 .. 0];
+ if (!refill()) return size - need;
+ }
+
+ auto give = min(httpBufRemaining_.length, need);
+ readBuffer_.write(cast(ubyte[])httpBufRemaining_[0 .. give]);
+ httpBufRemaining_ = httpBufRemaining_[give .. $];
+ need -= give;
+ }
+ return size;
+ }
+
+ bool refill() {
+ // Is there a nicer way to do this?
+ auto indexBegin = httpBufRemaining_.ptr - httpBuf_.ptr;
+ auto indexEnd = indexBegin + httpBufRemaining_.length;
+
+ if (httpBuf_.length - indexEnd <= (httpBuf_.length / 4)) {
+ httpBuf_.length *= 2;
+ }
+
+ // Read more data.
+ auto got = transport_.read(cast(ubyte[])httpBuf_[indexEnd .. $]);
+ if (got == 0) return false;
+ httpBufRemaining_ = httpBuf_[indexBegin .. indexEnd + got];
+ return true;
+ }
+
+ TTransport transport_;
+
+ TMemoryBuffer writeBuffer_;
+ TMemoryBuffer readBuffer_;
+
+ bool readHeaders_;
+ bool chunked_;
+ bool chunkedDone_;
+ size_t chunkSize_;
+ size_t contentLength_;
+
+ ubyte[] httpBuf_;
+ ubyte[] httpBufRemaining_;
+}
+
+/**
+ * HTTP client transport.
+ */
+final class TClientHttpTransport : THttpTransport {
+ /**
+ * Constructs a client http transport operating on the passed underlying
+ * transport.
+ *
+ * Params:
+ * transport = The underlying transport used for the actual I/O.
+ * host = The HTTP host string.
+ * path = The HTTP path string.
+ */
+ this(TTransport transport, string host, string path) {
+ super(transport);
+ host_ = host;
+ path_ = path;
+ }
+
+ /**
+ * Convenience overload for constructing a client HTTP transport using a
+ * TSocket connecting to the specified host and port.
+ *
+ * Params:
+ * host = The server to connect to, also used as HTTP host string.
+ * port = The port to connect to.
+ * path = The HTTP path string.
+ */
+ this(string host, ushort port, string path) {
+ this(new TSocket(host, port), host, path);
+ }
+
+protected:
+ override string getHeader(size_t dataLength) {
+ return "POST " ~ path_ ~ " HTTP/1.1\r\n" ~
+ "Host: " ~ host_ ~ "\r\n" ~
+ "Content-Type: application/x-thrift\r\n" ~
+ "Content-Length: " ~ to!string(dataLength) ~ "\r\n" ~
+ "Accept: application/x-thrift\r\n"
+ "User-Agent: Thrift/" ~ VERSION ~ " (D/TClientHttpTransport)\r\n" ~
+ "\r\n";
+ }
+
+ override bool parseStatusLine(const(ubyte)[] status) {
+ // HTTP-Version SP Status-Code SP Reason-Phrase CRLF
+ auto firstSplit = findSplit(status, [' ']);
+ if (firstSplit[1].empty) {
+ throw new TTransportException("Bad status: " ~ to!string(status),
+ TTransportException.Type.CORRUPTED_DATA);
+ }
+
+ auto codeReason = firstSplit[2][countUntil!"a != b"(firstSplit[2], ' ') .. $];
+ auto secondSplit = findSplit(codeReason, [' ']);
+ if (secondSplit[1].empty) {
+ throw new TTransportException("Bad status: " ~ to!string(status),
+ TTransportException.Type.CORRUPTED_DATA);
+ }
+
+ if (secondSplit[0] == "200") {
+ // HTTP 200 = OK, we got the response
+ return true;
+ } else if (secondSplit[0] == "100") {
+ // HTTP 100 = continue, just keep reading
+ return false;
+ }
+
+ throw new TTransportException("Bad status (unhandled status code): " ~
+ to!string(cast(const(char[]))status), TTransportException.Type.CORRUPTED_DATA);
+ }
+
+private:
+ string host_;
+ string path_;
+}
+
+/**
+ * HTTP server transport.
+ */
+final class TServerHttpTransport : THttpTransport {
+ /**
+ * Constructs a new instance.
+ *
+ * Param:
+ * transport = The underlying transport used for the actual I/O.
+ */
+ this(TTransport transport) {
+ super(transport);
+ }
+
+protected:
+ override string getHeader(size_t dataLength) {
+ return "HTTP/1.1 200 OK\r\n" ~
+ "Date: " ~ getRFC1123Time() ~ "\r\n" ~
+ "Server: Thrift/" ~ VERSION ~ "\r\n" ~
+ "Content-Type: application/x-thrift\r\n" ~
+ "Content-Length: " ~ to!string(dataLength) ~ "\r\n" ~
+ "Connection: Keep-Alive\r\n" ~
+ "\r\n";
+ }
+
+ override bool parseStatusLine(const(ubyte)[] status) {
+ // Method SP Request-URI SP HTTP-Version CRLF.
+ auto split = findSplit(status, [' ']);
+ if (split[1].empty) {
+ throw new TTransportException("Bad status: " ~ to!string(status),
+ TTransportException.Type.CORRUPTED_DATA);
+ }
+
+ auto uriVersion = split[2][countUntil!"a != b"(split[2], ' ') .. $];
+ if (!canFind(uriVersion, ' ')) {
+ throw new TTransportException("Bad status: " ~ to!string(status),
+ TTransportException.Type.CORRUPTED_DATA);
+ }
+
+ if (split[0] == "POST") {
+ // POST method ok, looking for content.
+ return true;
+ }
+
+ throw new TTransportException("Bad status (unsupported method): " ~
+ to!string(status), TTransportException.Type.CORRUPTED_DATA);
+ }
+}
+
+/**
+ * Wraps a transport into a HTTP server protocol.
+ */
+alias TWrapperTransportFactory!TServerHttpTransport TServerHttpTransportFactory;
+
+private {
+ import std.string : format;
+ string getRFC1123Time() {
+ auto sysTime = Clock.currTime(UTC());
+
+ auto dayName = capMemberName(sysTime.dayOfWeek);
+ auto monthName = capMemberName(sysTime.month);
+
+ return format("%s, %s %s %s %s:%s:%s GMT", dayName, sysTime.day,
+ monthName, sysTime.year, sysTime.hour, sysTime.minute, sysTime.second);
+ }
+
+ import std.ascii : toUpper;
+ import std.traits : EnumMembers;
+ string capMemberName(T)(T val) if (is(T == enum)) {
+ foreach (i, e; EnumMembers!T) {
+ enum name = __traits(derivedMembers, T)[i];
+ enum capName = cast(char) toUpper(name[0]) ~ name [1 .. $];
+ if (val == e) {
+ return capName;
+ }
+ }
+ throw new Exception("Not a member of " ~ T.stringof ~ ": " ~ to!string(val));
+ }
+
+ unittest {
+ enum Foo {
+ bar,
+ bAZ
+ }
+
+ import std.exception;
+ enforce(capMemberName(Foo.bar) == "Bar");
+ enforce(capMemberName(Foo.bAZ) == "BAZ");
+ }
+}
Added: thrift/trunk/lib/d/src/thrift/transport/memory.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/transport/memory.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/transport/memory.d (added)
+++ thrift/trunk/lib/d/src/thrift/transport/memory.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift.transport.memory;
+
+import core.exception : onOutOfMemoryError;
+import core.stdc.stdlib : free, realloc;
+import std.algorithm : min;
+import std.conv : text;
+import thrift.transport.base;
+
+/**
+ * A transport that simply reads from and writes to an in-memory buffer. Every
+ * time you call write on it, the data is simply placed into a buffer, and
+ * every time you call read, data is consumed from that buffer.
+ *
+ * Currently, the storage for written data is never reclaimed, even if the
+ * buffer contents have already been read out again.
+ */
+final class TMemoryBuffer : TBaseTransport {
+ /**
+ * Constructs a new memory transport with an empty internal buffer.
+ */
+ this() {}
+
+ /**
+ * Constructs a new memory transport with an empty internal buffer,
+ * reserving space for capacity bytes in advance.
+ *
+ * If the amount of data which will be written to the buffer is already
+ * known on construction, this can better performance over the default
+ * constructor because reallocations can be avoided.
+ *
+ * If the preallocated buffer is exhausted, data can still be written to the
+ * transport, but reallocations will happen.
+ *
+ * Params:
+ * capacity = Size of the initially reserved buffer (in bytes).
+ */
+ this(size_t capacity) {
+ reset(capacity);
+ }
+
+ /**
+ * Constructs a new memory transport initially containing the passed data.
+ *
+ * For now, the passed buffer is not intelligently used, the data is just
+ * copied to the internal buffer.
+ *
+ * Params:
+ * buffer = Initial contents available to be read.
+ */
+ this(in ubyte[] contents) {
+ auto size = contents.length;
+ reset(size);
+ buffer_[0 .. size] = contents[];
+ writeOffset_ = size;
+ }
+
+ /**
+ * Destructor, frees the internally allocated buffer.
+ */
+ ~this() {
+ free(buffer_);
+ }
+
+ /**
+ * Returns a read-only view of the current buffer contents.
+ *
+ * Note: For performance reasons, the returned slice is only valid for the
+ * life of this object, and may be invalidated on the next write() call at
+ * will â you might want to immediately .dup it if you intend to keep it
+ * around.
+ */
+ const(ubyte)[] getContents() {
+ return buffer_[readOffset_ .. writeOffset_];
+ }
+
+ /**
+ * A memory transport is always open.
+ */
+ override bool isOpen() @property {
+ return true;
+ }
+
+ override bool peek() {
+ return writeOffset_ - readOffset_ > 0;
+ }
+
+ /**
+ * Opening is a no-op() for a memory buffer.
+ */
+ override void open() {}
+
+ /**
+ * Closing is a no-op() for a memory buffer, it is always open.
+ */
+ override void close() {}
+
+ override size_t read(ubyte[] buf) {
+ auto size = min(buf.length, writeOffset_ - readOffset_);
+ buf[0 .. size] = buffer_[readOffset_ .. readOffset_ + size];
+ readOffset_ += size;
+ return size;
+ }
+
+ /**
+ * Shortcut version of readAll() â using this over TBaseTransport.readAll()
+ * can give us a nice speed increase because gives us a nice speed increase
+ * because it is typically a very hot path during deserialization.
+ */
+ override void readAll(ubyte[] buf) {
+ auto available = writeOffset_ - readOffset_;
+ if (buf.length > available) {
+ throw new TTransportException(text("Cannot readAll() ", buf.length,
+ " bytes of data because only ", available, " bytes are available."),
+ TTransportException.Type.END_OF_FILE);
+ }
+
+ buf[] = buffer_[readOffset_ .. readOffset_ + buf.length];
+ readOffset_ += buf.length;
+ }
+
+ override void write(in ubyte[] buf) {
+ auto need = buf.length;
+ if (bufferLen_ - writeOffset_ < need) {
+ // Exponential growth.
+ auto newLen = bufferLen_ + 1;
+ while (newLen - writeOffset_ < need) newLen *= 2;
+ cRealloc(buffer_, newLen);
+ bufferLen_ = newLen;
+ }
+
+ buffer_[writeOffset_ .. writeOffset_ + need] = buf[];
+ writeOffset_ += need;
+ }
+
+ override const(ubyte)[] borrow(ubyte* buf, size_t len) {
+ if (len <= writeOffset_ - readOffset_) {
+ return buffer_[readOffset_ .. writeOffset_];
+ } else {
+ return null;
+ }
+ }
+
+ override void consume(size_t len) {
+ readOffset_ += len;
+ }
+
+ void reset() {
+ readOffset_ = 0;
+ writeOffset_ = 0;
+ }
+
+ void reset(size_t capacity) {
+ readOffset_ = 0;
+ writeOffset_ = 0;
+ if (bufferLen_ < capacity) {
+ cRealloc(buffer_, capacity);
+ bufferLen_ = capacity;
+ }
+ }
+
+private:
+ ubyte* buffer_;
+ size_t bufferLen_;
+ size_t readOffset_;
+ size_t writeOffset_;
+}
+
+private {
+ void cRealloc(ref ubyte* data, size_t newSize) {
+ auto result = realloc(data, newSize);
+ if (result is null) onOutOfMemoryError();
+ data = cast(ubyte*)result;
+ }
+}
+
+version (unittest) {
+ import std.exception;
+}
+
+unittest {
+ auto a = new TMemoryBuffer(5);
+ immutable(ubyte[]) testData = [1, 2, 3, 4];
+ auto buf = new ubyte[testData.length];
+ enforce(a.isOpen);
+
+ // a should be empty.
+ enforce(!a.peek());
+ enforce(a.read(buf) == 0);
+ assertThrown!TTransportException(a.readAll(buf));
+
+ // Write some data and read it back again.
+ a.write(testData);
+ enforce(a.peek());
+ enforce(a.getContents() == testData);
+ enforce(a.read(buf) == testData.length);
+ enforce(buf == testData);
+
+ // a should be empty again.
+ enforce(!a.peek());
+ enforce(a.read(buf) == 0);
+ assertThrown!TTransportException(a.readAll(buf));
+
+ // Test the constructor which directly accepts initial data.
+ auto b = new TMemoryBuffer(testData);
+ enforce(b.isOpen);
+ enforce(b.peek());
+ enforce(b.getContents() == testData);
+
+ // Test borrow().
+ auto borrowed = b.borrow(null, testData.length);
+ enforce(borrowed == testData);
+ enforce(b.peek());
+ b.consume(testData.length);
+ enforce(!b.peek());
+}
Added: thrift/trunk/lib/d/src/thrift/transport/piped.d
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/d/src/thrift/transport/piped.d?rev=1304085&view=auto
==============================================================================
--- thrift/trunk/lib/d/src/thrift/transport/piped.d (added)
+++ thrift/trunk/lib/d/src/thrift/transport/piped.d Thu Mar 22 21:49:10 2012
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift.transport.piped;
+
+import thrift.transport.base;
+import thrift.transport.memory;
+
+/**
+ * Pipes data request from one transport to another when readEnd()
+ * or writeEnd() is called.
+ *
+ * A typical use case would be to log requests on e.g. a socket to
+ * disk (i. e. pipe them to a TFileWriterTransport).
+ *
+ * The implementation keeps an internal buffer which expands to
+ * hold the whole amount of data read/written until the corresponding *End()
+ * method is called.
+ *
+ * Contrary to the C++ implementation, this doesn't introduce yet another layer
+ * of input/output buffering, all calls are passed to the underlying source
+ * transport verbatim.
+ */
+final class TPipedTransport(Source = TTransport) if (
+ isTTransport!Source
+) : TBaseTransport {
+ /// The default initial buffer size if not explicitly specified, in bytes.
+ enum DEFAULT_INITIAL_BUFFER_SIZE = 512;
+
+ /**
+ * Constructs a new instance.
+ *
+ * By default, only reads are piped (pipeReads = true, pipeWrites = false).
+ *
+ * Params:
+ * srcTrans = The transport to which all requests are forwarded.
+ * dstTrans = The transport the read/written data is copied to.
+ * initialBufferSize = The default size of the read/write buffers, for
+ * performance tuning.
+ */
+ this(Source srcTrans, TTransport dstTrans,
+ size_t initialBufferSize = DEFAULT_INITIAL_BUFFER_SIZE
+ ) {
+ srcTrans_ = srcTrans;
+ dstTrans_ = dstTrans;
+
+ readBuffer_ = new TMemoryBuffer(initialBufferSize);
+ writeBuffer_ = new TMemoryBuffer(initialBufferSize);
+
+ pipeReads_ = true;
+ pipeWrites_ = false;
+ }
+
+ bool pipeReads() @property const {
+ return pipeReads_;
+ }
+
+ void pipeReads(bool value) @property {
+ if (!value) {
+ readBuffer_.reset();
+ }
+ pipeReads_ = value;
+ }
+
+ bool pipeWrites() @property const {
+ return pipeWrites_;
+ }
+
+ void pipeWrites(bool value) @property {
+ if (!value) {
+ writeBuffer_.reset();
+ }
+ pipeWrites_ = value;
+ }
+
+ override bool isOpen() {
+ return srcTrans_.isOpen();
+ }
+
+ override bool peek() {
+ return srcTrans_.peek();
+ }
+
+ override void open() {
+ srcTrans_.open();
+ }
+
+ override void close() {
+ srcTrans_.close();
+ }
+
+ override size_t read(ubyte[] buf) {
+ auto bytesRead = srcTrans_.read(buf);
+
+ if (pipeReads_) {
+ readBuffer_.write(buf[0 .. bytesRead]);
+ }
+
+ return bytesRead;
+ }
+
+ override size_t readEnd() {
+ if (pipeReads_) {
+ auto data = readBuffer_.getContents();
+ dstTrans_.write(data);
+ dstTrans_.flush();
+ readBuffer_.reset();
+
+ srcTrans_.readEnd();
+
+ // Return data.length instead of the readEnd() result of the source
+ // transports because it might not be available from it.
+ return data.length;
+ }
+
+ return srcTrans_.readEnd();
+ }
+
+ override void write(in ubyte[] buf) {
+ if (pipeWrites_) {
+ writeBuffer_.write(buf);
+ }
+
+ srcTrans_.write(buf);
+ }
+
+ override size_t writeEnd() {
+ if (pipeWrites_) {
+ auto data = writeBuffer_.getContents();
+ dstTrans_.write(data);
+ dstTrans_.flush();
+ writeBuffer_.reset();
+
+ srcTrans_.writeEnd();
+
+ // Return data.length instead of the readEnd() result of the source
+ // transports because it might not be available from it.
+ return data.length;
+ }
+
+ return srcTrans_.writeEnd();
+ }
+
+ override void flush() {
+ srcTrans_.flush();
+ }
+
+private:
+ Source srcTrans_;
+ TTransport dstTrans_;
+
+ TMemoryBuffer readBuffer_;
+ TMemoryBuffer writeBuffer_;
+
+ bool pipeReads_;
+ bool pipeWrites_;
+}
+
+/**
+ * TPipedTransport construction helper to avoid having to explicitly
+ * specify the transport types, i.e. to allow the constructor being called
+ * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)).
+ */
+TPipedTransport!Source tPipedTransport(Source)(
+ Source srcTrans, TTransport dstTrans
+) if (isTTransport!Source) {
+ return new typeof(return)(srcTrans, dstTrans);
+}
+
+version (unittest) {
+ // DMD @@BUG@@: UFCS for std.array.empty doesn't work when import is moved
+ // into unittest block.
+ import std.array;
+ import std.exception : enforce;
+}
+
+unittest {
+ auto underlying = new TMemoryBuffer;
+ auto pipeTarget = new TMemoryBuffer;
+ auto trans = tPipedTransport(underlying, pipeTarget);
+
+ underlying.write(cast(ubyte[])"abcd");
+
+ ubyte[4] buffer;
+ trans.readAll(buffer[0 .. 2]);
+ enforce(buffer[0 .. 2] == "ab");
+ enforce(pipeTarget.getContents().empty);
+
+ trans.readEnd();
+ enforce(pipeTarget.getContents() == "ab");
+ pipeTarget.reset();
+
+ underlying.write(cast(ubyte[])"ef");
+ trans.readAll(buffer[0 .. 2]);
+ enforce(buffer[0 .. 2] == "cd");
+ enforce(pipeTarget.getContents().empty);
+
+ trans.readAll(buffer[0 .. 2]);
+ enforce(buffer[0 .. 2] == "ef");
+ enforce(pipeTarget.getContents().empty);
+
+ trans.readEnd();
+ enforce(pipeTarget.getContents() == "cdef");
+}