You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/10/18 19:52:16 UTC

svn commit: r1185772 [2/4] - in /incubator/kafka/trunk/clients/csharp/src/Kafka: Kafka.Client/ Kafka.Client/Cfg/ Kafka.Client/Consumers/ Kafka.Client/Exceptions/ Kafka.Client/Messages/ Kafka.Client/Producers/ Kafka.Client/Producers/Async/ Kafka.Client/...

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducerPool.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducerPool.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducerPool.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducerPool.cs Tue Oct 18 17:52:13 2011
@@ -39,6 +39,7 @@ namespace Kafka.Client.Producers.Async
     {
         private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
         private readonly IDictionary<int, IAsyncProducer> asyncProducers;
+        private volatile bool disposed;
         
         /// <summary>
         /// Factory method used to instantiating asynchronous producer pool
@@ -56,7 +57,7 @@ namespace Kafka.Client.Producers.Async
         /// Instantiated asynchronous producer pool
         /// </returns>
         public static AsyncProducerPool<TData> CreateAsyncPool(
-            ProducerConfig config, 
+            ProducerConfiguration config, 
             IEncoder<TData> serializer, 
             ICallbackHandler cbkHandler)
         {
@@ -76,7 +77,7 @@ namespace Kafka.Client.Producers.Async
         /// Instantiated asynchronous producer pool
         /// </returns>
         public static AsyncProducerPool<TData> CreateAsyncPool(
-            ProducerConfig config,
+            ProducerConfiguration config,
             IEncoder<TData> serializer)
         {
             return new AsyncProducerPool<TData>(config, serializer);
@@ -101,7 +102,7 @@ namespace Kafka.Client.Producers.Async
         /// Should be used for testing purpose only
         /// </remarks>
         private AsyncProducerPool(
-            ProducerConfig config, 
+            ProducerConfiguration config, 
             IEncoder<TData> serializer, 
             IDictionary<int, IAsyncProducer> asyncProducers, 
             ICallbackHandler cbkHandler)
@@ -123,7 +124,7 @@ namespace Kafka.Client.Producers.Async
         /// The callback invoked after new broker is added.
         /// </param>
         private AsyncProducerPool(
-            ProducerConfig config, 
+            ProducerConfiguration config, 
             IEncoder<TData> serializer, 
             ICallbackHandler cbkHandler)
             : this(config, serializer, new Dictionary<int, IAsyncProducer>(), cbkHandler)
@@ -139,12 +140,12 @@ namespace Kafka.Client.Producers.Async
         /// <param name="serializer">
         /// The serializer.
         /// </param>
-        private AsyncProducerPool(ProducerConfig config, IEncoder<TData> serializer)
+        private AsyncProducerPool(ProducerConfiguration config, IEncoder<TData> serializer)
             : this(
             config,
             serializer,
             new Dictionary<int, IAsyncProducer>(),
-            ReflectionHelper.Instantiate<ICallbackHandler>(config.CallbackHandler))
+            ReflectionHelper.Instantiate<ICallbackHandler>(config.CallbackHandlerClass))
         {
         }
 
@@ -159,7 +160,8 @@ namespace Kafka.Client.Producers.Async
         /// </remarks>
         public override void Send(IEnumerable<ProducerPoolData<TData>> poolData)
         {
-            Guard.Assert<ArgumentNullException>(() => poolData != null);
+            this.EnsuresNotDisposed();
+            Guard.NotNull(poolData, "poolData");
             Dictionary<int, List<ProducerPoolData<TData>>> distinctBrokers = poolData.GroupBy(
                 x => x.BidPid.BrokerId, x => x)
                 .ToDictionary(x => x.Key, x => x.ToList());
@@ -184,14 +186,10 @@ namespace Kafka.Client.Producers.Async
         /// <param name="broker">The broker informations.</param>
         public override void AddProducer(Broker broker)
         {
-            Guard.Assert<ArgumentNullException>(() => broker != null);
-            var asyncConfig = new AsyncProducerConfig
+            this.EnsuresNotDisposed();
+            Guard.NotNull(broker, "broker");
+            var asyncConfig = new AsyncProducerConfiguration(this.Config, broker.Id, broker.Host, broker.Port)
                 {
-                    Host = broker.Host,
-                    Port = broker.Port,
-                    QueueTime = this.Config.QueueTime,
-                    QueueSize = this.Config.QueueSize,
-                    BatchSize = this.Config.BatchSize,
                     SerializerClass = this.Config.SerializerClass
                 };
             var asyncProducer = new AsyncProducer(asyncConfig, this.CallbackHandler);
@@ -203,5 +201,24 @@ namespace Kafka.Client.Producers.Async
                 broker.Port);
             this.asyncProducers.Add(broker.Id, asyncProducer);
         }
+
+        protected override void Dispose(bool disposing)
+        {
+            if (!disposing)
+            {
+                return;
+            }
+
+            if (this.disposed)
+            {
+                return;
+            }
+
+            this.disposed = true;
+            foreach (var asyncProducer in this.asyncProducers.Values)
+            {
+                asyncProducer.Dispose();
+            }
+        }
     }
 }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/IAsyncProducer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/IAsyncProducer.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/IAsyncProducer.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/IAsyncProducer.cs Tue Oct 18 17:52:13 2011
@@ -17,6 +17,7 @@
 
 namespace Kafka.Client.Producers.Async
 {
+    using System;
     using System.Collections.Generic;
     using Kafka.Client.Messages;
     using Kafka.Client.Requests;
@@ -24,7 +25,7 @@ namespace Kafka.Client.Producers.Async
     /// <summary>
     /// Sends messages encapsulated in request to Kafka server asynchronously
     /// </summary>
-    public interface IAsyncProducer
+    public interface IAsyncProducer : IDisposable
     {
         /// <summary>
         /// Sends request to Kafka server asynchronously

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducerPool.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducerPool.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducerPool.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducerPool.cs Tue Oct 18 17:52:13 2011
@@ -17,6 +17,7 @@
 
 namespace Kafka.Client.Producers
 {
+    using System;
     using System.Collections.Generic;
     using Kafka.Client.Cluster;
 
@@ -24,7 +25,7 @@ namespace Kafka.Client.Producers
     /// Pool of producers used by producer high-level API
     /// </summary>
     /// <typeparam name="TData">The type of the data.</typeparam>
-    internal interface IProducerPool<TData>
+    internal interface IProducerPool<TData> : IDisposable
     {
         /// <summary>
         /// Selects either a synchronous or an asynchronous producer, for

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ConfigBrokerPartitionInfo.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ConfigBrokerPartitionInfo.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ConfigBrokerPartitionInfo.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ConfigBrokerPartitionInfo.cs Tue Oct 18 17:52:13 2011
@@ -32,16 +32,16 @@ namespace Kafka.Client.Producers.Partiti
     /// </remarks>
     internal class ConfigBrokerPartitionInfo : IBrokerPartitionInfo
     {
-        private readonly ProducerConfig config;
+        private readonly ProducerConfiguration config;
         private IDictionary<int, Broker> brokers;
 
         /// <summary>
         /// Initializes a new instance of the <see cref="ConfigBrokerPartitionInfo"/> class.
         /// </summary>
         /// <param name="config">The config.</param>
-        public ConfigBrokerPartitionInfo(ProducerConfig config)
+        public ConfigBrokerPartitionInfo(ProducerConfiguration config)
         {
-            Guard.Assert<ArgumentNullException>(() => config != null);
+            Guard.NotNull(config, "config");
             this.config = config;
             this.InitializeBrokers();
         }
@@ -67,7 +67,7 @@ namespace Kafka.Client.Producers.Partiti
         /// <remarks>Partition ID would be allways 0</remarks>
         public SortedSet<Partition> GetBrokerPartitionInfo(string topic)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(topic));
+            Guard.NotNullNorEmpty(topic, "topic");
             var partitions = new SortedSet<Partition>();
             foreach (var item in this.brokers)
             {
@@ -108,13 +108,11 @@ namespace Kafka.Client.Producers.Partiti
             }
 
             this.brokers = new Dictionary<int, Broker>();
-            string[] brokersInfoList = this.config.BrokerPartitionInfo.Split(',');
-            foreach (string item in brokersInfoList)
+            foreach (var item in this.config.Brokers)
             {
-                var parts = item.Split(':');
-                int id = int.Parse(parts[0], CultureInfo.InvariantCulture);
-                int port = int.Parse(parts[2], CultureInfo.InvariantCulture);
-                this.brokers.Add(id, new Broker(id, parts[1], parts[1], port));
+                this.brokers.Add(
+                    item.BrokerId, 
+                    new Broker(item.BrokerId, item.Host, item.Host, item.Port));
             }
         }
     }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/DefaultPartitioner.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/DefaultPartitioner.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/DefaultPartitioner.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/DefaultPartitioner.cs Tue Oct 18 17:52:13 2011
@@ -41,7 +41,7 @@ namespace Kafka.Client.Producers.Partiti
         /// </remarks>
         public int Partition(TKey key, int numPartitions)
         {
-            Guard.Assert<ArgumentOutOfRangeException>(() => numPartitions > 0);
+            Guard.Greater(numPartitions, 0, "numPartitions");
             return key == null 
                 ? Randomizer.Next(numPartitions) 
                 : key.GetHashCode() % numPartitions;

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs Tue Oct 18 17:52:13 2011
@@ -18,6 +18,7 @@
 namespace Kafka.Client.Producers.Partitioning
 {
     using System;
+    using System.Collections.Concurrent;
     using System.Collections.Generic;
     using System.Globalization;
     using System.Reflection;
@@ -66,8 +67,8 @@ namespace Kafka.Client.Producers.Partiti
         /// </summary>
         /// <param name="config">The config.</param>
         /// <param name="callback">The callback invoked when new broker is added.</param>
-        public ZKBrokerPartitionInfo(ZKConfig config, Action<int, string, int> callback)
-            : this(new ZooKeeperClient(config.ZkConnect, config.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+        public ZKBrokerPartitionInfo(ProducerConfiguration config, Action<int, string, int> callback)
+            : this(new ZooKeeperClient(config.ZooKeeper.ZkConnect, config.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
         {
             this.callback = callback;
         }
@@ -93,7 +94,7 @@ namespace Kafka.Client.Producers.Partiti
         /// </returns>
         public SortedSet<Partition> GetBrokerPartitionInfo(string topic)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(topic));
+            Guard.NotNullNorEmpty(topic, "topic");
 
             this.EnsuresNotDisposed();
             SortedSet<Partition> brokerPartitions = null;
@@ -101,11 +102,15 @@ namespace Kafka.Client.Producers.Partiti
             {
                 brokerPartitions = this.topicBrokerPartitions[topic];
             }
+            else
+            {
+                this.topicBrokerPartitions.Add(topic, null);
+            }
 
             if (brokerPartitions == null || brokerPartitions.Count == 0)
             {
                 var numBrokerPartitions = this.BootstrapWithExistingBrokers(topic);
-                this.topicBrokerPartitions.Add(topic, numBrokerPartitions);
+                this.topicBrokerPartitions[topic] = numBrokerPartitions;
                 return numBrokerPartitions;
             }
 
@@ -168,7 +173,7 @@ namespace Kafka.Client.Producers.Partiti
                 return;
             }
 
-            this.brokers = new Dictionary<int, Broker>();
+            this.brokers = new ConcurrentDictionary<int, Broker>();
             IList<string> brokerIds = this.zkclient.GetChildrenParentMayNotExist(ZooKeeperClient.DefaultBrokerIdsPath);
             foreach (var brokerId in brokerIds)
             {
@@ -191,7 +196,7 @@ namespace Kafka.Client.Producers.Partiti
                 return;
             }
 
-            this.topicBrokerPartitions = new Dictionary<string, SortedSet<Partition>>();
+            this.topicBrokerPartitions = new ConcurrentDictionary<string, SortedSet<Partition>>();
             this.zkclient.MakeSurePersistentPathExists(ZooKeeperClient.DefaultBrokerTopicsPath);
             IList<string> topics = this.zkclient.GetChildrenParentMayNotExist(ZooKeeperClient.DefaultBrokerTopicsPath);
             foreach (string topic in topics)
@@ -306,7 +311,7 @@ namespace Kafka.Client.Producers.Partiti
         /// </remarks>
         public void HandleStateChanged(ZooKeeperStateChangedEventArgs args)
         {
-            Guard.Assert<ArgumentNullException>(() => args != null);
+            Guard.NotNull(args, "args");
             Guard.Assert<ArgumentException>(() => args.State != KeeperState.Unknown);
 
             this.EnsuresNotDisposed();
@@ -322,7 +327,7 @@ namespace Kafka.Client.Producers.Partiti
         /// </remarks>
         public void HandleSessionCreated(ZooKeeperSessionCreatedEventArgs args)
         {
-            Guard.Assert<ArgumentNullException>(() => args != null);
+            Guard.NotNull(args, "args");
 
             this.EnsuresNotDisposed();
             Logger.Debug("ZK expired; release old list of broker partitions for topics ");

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs Tue Oct 18 17:52:13 2011
@@ -40,7 +40,7 @@ namespace Kafka.Client.Producers
         /// <remarks>
         /// Should be used for testing purpose only.
         /// </remarks>
-        internal Producer(ProducerConfig config, IPartitioner<string> partitioner, IProducerPool<Message> producerPool, bool populateProducerPool)
+        internal Producer(ProducerConfiguration config, IPartitioner<string> partitioner, IProducerPool<Message> producerPool, bool populateProducerPool)
             : base(config, partitioner, producerPool, populateProducerPool)
         {
         }
@@ -53,7 +53,7 @@ namespace Kafka.Client.Producers
         /// Can be used when all config parameters will be specified through the config object
         /// and will be instantiated via reflection
         /// </remarks>
-        public Producer(ProducerConfig config)
+        public Producer(ProducerConfiguration config)
             : base(config)
         {
         }
@@ -71,7 +71,7 @@ namespace Kafka.Client.Producers
         /// Can be used to provide pre-instantiated objects for all config parameters
         /// that would otherwise be instantiated via reflection.
         /// </remarks>
-        public Producer(ProducerConfig config, IPartitioner<string> partitioner, IEncoder<Message> encoder, ICallbackHandler callbackHandler)
+        public Producer(ProducerConfiguration config, IPartitioner<string> partitioner, IEncoder<Message> encoder, ICallbackHandler callbackHandler)
             : base(config, partitioner, encoder, callbackHandler)
         {
         }
@@ -88,7 +88,7 @@ namespace Kafka.Client.Producers
         /// Can be used to provide pre-instantiated objects for all config parameters
         /// that would otherwise be instantiated via reflection.
         /// </remarks>
-        public Producer(ProducerConfig config, IPartitioner<string> partitioner, IEncoder<Message> encoder)
+        public Producer(ProducerConfiguration config, IPartitioner<string> partitioner, IEncoder<Message> encoder)
             : base(config, partitioner, encoder)
         {
         }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs Tue Oct 18 17:52:13 2011
@@ -41,13 +41,13 @@ namespace Kafka.Client.Producers
     /// Provides serialization of data through a user-specified encoder, zookeeper based automatic broker discovery
     /// and software load balancing through an optionally user-specified partitioner
     /// </remarks>
-    public class Producer<TKey, TData> : ZooKeeperAwareKafkaClientBase, IProducer<TKey, TData>
+    public class Producer<TKey, TData> : KafkaClientBase, IProducer<TKey, TData>
         where TKey : class 
         where TData : class 
     {
         private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);       
         private static readonly Random Randomizer = new Random();
-        private readonly ProducerConfig config;
+        private readonly ProducerConfiguration config;
         private readonly IProducerPool<TData> producerPool;
         private readonly IPartitioner<TKey> partitioner;
         private readonly bool populateProducerPool;
@@ -67,19 +67,19 @@ namespace Kafka.Client.Producers
         /// Should be used for testing purpose only.
         /// </remarks>
         internal Producer(
-            ProducerConfig config,
+            ProducerConfiguration config,
             IPartitioner<TKey> partitioner,
             IProducerPool<TData> producerPool,
             bool populateProducerPool = true)
-            : base(config)
         {
-            Guard.Assert<ArgumentNullException>(() => config != null);
-            Guard.Assert<ArgumentNullException>(() => producerPool != null);
+            Guard.NotNull(config, "config");
+            Guard.NotNull(producerPool, "producerPool");
+
             this.config = config;
             this.partitioner = partitioner ?? new DefaultPartitioner<TKey>();
             this.populateProducerPool = populateProducerPool;
             this.producerPool = producerPool;
-            if (this.IsZooKeeperEnabled)
+            if (this.config.IsZooKeeperEnabled)
             {
                 this.brokerPartitionInfo = new ZKBrokerPartitionInfo(this.config, this.Callback);
             }
@@ -107,13 +107,13 @@ namespace Kafka.Client.Producers
         /// Can be used when all config parameters will be specified through the config object
         /// and will be instantiated via reflection
         /// </remarks>
-        public Producer(ProducerConfig config)
+        public Producer(ProducerConfiguration config)
             : this(
                 config, 
                 ReflectionHelper.Instantiate<IPartitioner<TKey>>(config.PartitionerClass),
                 ProducerPool<TData>.CreatePool(config, ReflectionHelper.Instantiate<IEncoder<TData>>(config.SerializerClass)))
         {
-            Guard.Assert<ArgumentNullException>(() => config != null);
+            Guard.NotNull(config, "config");
         }
 
         /// <summary>
@@ -131,7 +131,7 @@ namespace Kafka.Client.Producers
         /// that would otherwise be instantiated via reflection.
         /// </remarks>
         public Producer(
-            ProducerConfig config,
+            ProducerConfiguration config,
             IPartitioner<TKey> partitioner,
             IEncoder<TData> encoder,
             ICallbackHandler callbackHandler)
@@ -140,8 +140,8 @@ namespace Kafka.Client.Producers
                 partitioner,
                 ProducerPool<TData>.CreatePool(config, encoder, callbackHandler))
         {
-            Guard.Assert<ArgumentNullException>(() => config != null);
-            Guard.Assert<ArgumentNullException>(() => encoder != null);
+            Guard.NotNull(config, "config");
+            Guard.NotNull(encoder, "encoder");
         }
 
         /// <summary>
@@ -157,7 +157,7 @@ namespace Kafka.Client.Producers
         /// that would otherwise be instantiated via reflection.
         /// </remarks>
         public Producer(
-            ProducerConfig config,
+            ProducerConfiguration config,
             IPartitioner<TKey> partitioner,
             IEncoder<TData> encoder)
             : this(
@@ -165,8 +165,8 @@ namespace Kafka.Client.Producers
                 partitioner,
                 ProducerPool<TData>.CreatePool(config, encoder, null))
         {
-            Guard.Assert<ArgumentNullException>(() => config != null);
-            Guard.Assert<ArgumentNullException>(() => encoder != null);
+            Guard.NotNull(config, "config");
+            Guard.NotNull(encoder, "encoder");
         }
 
         /// <summary>
@@ -176,8 +176,9 @@ namespace Kafka.Client.Producers
         /// <param name="data">The producer data objects that encapsulate the topic, key and message data.</param>
         public void Send(IEnumerable<ProducerData<TKey, TData>> data)
         {
-            Guard.Assert<ArgumentNullException>(() => data != null);
-            Guard.Assert<ArgumentException>(() => data.Count() > 0);
+            Guard.NotNull(data, "data");
+            Guard.Greater(data.Count(), 0, "data");
+
             this.EnsuresNotDisposed();
             var poolRequests = new List<ProducerPoolData<TData>>();
             foreach (var dataItem in data)
@@ -197,10 +198,11 @@ namespace Kafka.Client.Producers
         /// <param name="data">The producer data object that encapsulates the topic, key and message data.</param>
         public void Send(ProducerData<TKey, TData> data)
         {
-            Guard.Assert<ArgumentNullException>(() => data != null);
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(data.Topic));
-            Guard.Assert<ArgumentNullException>(() => data.Data != null);
-            Guard.Assert<ArgumentException>(() => data.Data.Count() > 0);
+            Guard.NotNull(data, "data");
+            Guard.NotNullNorEmpty(data.Topic, "data.Topic");
+            Guard.NotNull(data.Data, "data.Data");
+            Guard.Greater(data.Data.Count(), 0, "data.Data");
+
             this.EnsuresNotDisposed();
             this.Send(new[] { data });
         }
@@ -233,6 +235,11 @@ namespace Kafka.Client.Producers
                 {
                     this.brokerPartitionInfo.Dispose();
                 }
+
+                if (this.producerPool != null)
+                {
+                    this.producerPool.Dispose();
+                }
             }
             catch (Exception exc)
             {
@@ -249,8 +256,8 @@ namespace Kafka.Client.Producers
         /// <param name="port">The broker port.</param>
         private void Callback(int bid, string host, int port)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(host));
-            Guard.Assert<ArgumentOutOfRangeException>(() => port > 0);
+            Guard.NotNullNorEmpty(host, "host");
+            Guard.Greater(port, 0, "port");
 
             if (this.populateProducerPool)
             {
@@ -270,7 +277,7 @@ namespace Kafka.Client.Producers
         /// <returns>Partition Id</returns>
         private int GetPartitionId(TKey key, int numPartitions)
         {
-            Guard.Assert<ArgumentOutOfRangeException>(() => numPartitions > 0);
+            Guard.Greater(numPartitions, 0, "numPartitions");
             return key == null 
                 ? Randomizer.Next(numPartitions) 
                 : this.partitioner.Partition(key, numPartitions);
@@ -297,7 +304,7 @@ namespace Kafka.Client.Producers
             int partitionId = this.GetPartitionId(dataItem.Key, totalNumPartitions);
             Partition brokerIdPartition = brokerPartitions.ToList()[partitionId];
             Broker brokerInfo = this.brokerPartitionInfo.GetBrokerInfo(brokerIdPartition.BrokerId);
-            if (this.IsZooKeeperEnabled)
+            if (this.config.IsZooKeeperEnabled)
             {
                 Logger.DebugFormat(
                     CultureInfo.CurrentCulture,

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs Tue Oct 18 17:52:13 2011
@@ -33,6 +33,8 @@ namespace Kafka.Client.Producers
     internal abstract class ProducerPool<TData> : IProducerPool<TData>
         where TData : class 
     {
+        protected bool Disposed { get; set; }
+
         /// <summary>
         /// Factory method used to instantiating either, 
         /// synchronous or asynchronous, producer pool based on configuration.
@@ -46,7 +48,7 @@ namespace Kafka.Client.Producers
         /// <returns>
         /// Instantiated either, synchronous or asynchronous, producer pool
         /// </returns>
-        public static ProducerPool<TData> CreatePool(ProducerConfig config, IEncoder<TData> serializer)
+        public static ProducerPool<TData> CreatePool(ProducerConfiguration config, IEncoder<TData> serializer)
         {
             if (config.ProducerType == ProducerTypes.Async)
             {
@@ -78,7 +80,7 @@ namespace Kafka.Client.Producers
         /// Instantiated either, synchronous or asynchronous, producer pool
         /// </returns>
         public static ProducerPool<TData> CreatePool(
-            ProducerConfig config,
+            ProducerConfiguration config,
             IEncoder<TData> serializer,
             ICallbackHandler cbkHandler)
         {
@@ -104,11 +106,11 @@ namespace Kafka.Client.Producers
         /// Should be used for testing purpose only
         /// </remarks>
         protected ProducerPool(
-            ProducerConfig config,
+            ProducerConfiguration config,
             IEncoder<TData> serializer)
         {
-            Guard.Assert<ArgumentNullException>(() => config != null);
-            Guard.Assert<ArgumentNullException>(() => serializer != null);
+            Guard.NotNull(config, "config");
+            Guard.NotNull(serializer, "serializer");
 
             this.Config = config;
             this.Serializer = serializer;
@@ -127,19 +129,19 @@ namespace Kafka.Client.Producers
         /// The callback invoked after new broker is added.
         /// </param>
         protected ProducerPool(
-            ProducerConfig config,
+            ProducerConfiguration config,
             IEncoder<TData> serializer,
             ICallbackHandler callbackHandler)
         {
-            Guard.Assert<ArgumentNullException>(() => config != null);
-            Guard.Assert<ArgumentNullException>(() => serializer != null);
+            Guard.NotNull(config, "config");
+            Guard.NotNull(serializer, "serializer");
 
             this.Config = config;
             this.Serializer = serializer;
             this.CallbackHandler = callbackHandler;
         }
 
-        protected ProducerConfig Config { get; private set; }
+        protected ProducerConfiguration Config { get; private set; }
 
         protected IEncoder<TData> Serializer { get; private set; }
 
@@ -162,7 +164,9 @@ namespace Kafka.Client.Producers
         /// </remarks>
         public void Send(ProducerPoolData<TData> poolData)
         {
-            Guard.Assert<ArgumentNullException>(() => poolData != null);
+            this.EnsuresNotDisposed();
+            Guard.NotNull(poolData, "poolData");
+
             this.Send(new[] { poolData });
         }
 
@@ -176,5 +180,27 @@ namespace Kafka.Client.Producers
         /// Used for multi-topic request
         /// </remarks>
         public abstract void Send(IEnumerable<ProducerPoolData<TData>> poolData);
+
+        /// <summary>
+        /// Releases all unmanaged and managed resources
+        /// </summary>
+        public void Dispose()
+        {
+            this.Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected abstract void Dispose(bool disposing);
+
+        /// <summary>
+        /// Ensures that object was not disposed
+        /// </summary>
+        protected void EnsuresNotDisposed()
+        {
+            if (this.Disposed)
+            {
+                throw new ObjectDisposedException(this.GetType().Name);
+            }
+        }
     }
 }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs Tue Oct 18 17:52:13 2011
@@ -17,6 +17,7 @@
 
 namespace Kafka.Client.Producers.Sync
 {
+    using System;
     using System.Collections.Generic;
     using Kafka.Client.Messages;
     using Kafka.Client.Requests;
@@ -24,7 +25,7 @@ namespace Kafka.Client.Producers.Sync
     /// <summary>
     /// Sends messages encapsulated in request to Kafka server synchronously
     /// </summary>
-    public interface ISyncProducer
+    public interface ISyncProducer : IDisposable
     {
         /// <summary>
         /// Constructs producer request and sends it to given broker partition synchronously

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs Tue Oct 18 17:52:13 2011
@@ -30,15 +30,14 @@ namespace Kafka.Client.Producers.Sync
     /// </summary>
     public class SyncProducer : ISyncProducer
     {
-        private readonly SyncProducerConfig config;
+        private readonly KafkaConnection connection;
+
+        private volatile bool disposed;
 
         /// <summary>
         /// Gets producer config
         /// </summary>
-        public SyncProducerConfig Config
-        {
-            get { return config; }
-        }
+        public SyncProducerConfiguration Config { get; private set; }
 
         /// <summary>
         /// Initializes a new instance of the <see cref="SyncProducer"/> class.
@@ -46,10 +45,15 @@ namespace Kafka.Client.Producers.Sync
         /// <param name="config">
         /// The producer config.
         /// </param>
-        public SyncProducer(SyncProducerConfig config)
+        public SyncProducer(SyncProducerConfiguration config)
         {
-            Guard.Assert<ArgumentNullException>(() => config != null);
-            this.config = config;
+            Guard.NotNull(config, "config");
+            this.Config = config;
+            this.connection = new KafkaConnection(
+                this.Config.Host, 
+                this.Config.Port,
+                config.BufferSize,
+                config.SocketTimeout);
         }
 
         /// <summary>
@@ -66,14 +70,13 @@ namespace Kafka.Client.Producers.Sync
         /// </param>
         public void Send(string topic, int partition, IEnumerable<Message> messages)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(topic));
-            Guard.Assert<ArgumentNullException>(() => messages != null);
-            Guard.Assert<ArgumentNullException>(
-                () => messages.All(x => x != null));
+            Guard.NotNullNorEmpty(topic, "topic");
+            Guard.NotNull(messages, "messages");
+            Guard.AllNotNull(messages, "messages.items");
             Guard.Assert<ArgumentOutOfRangeException>(
                 () => messages.All(
                     x => x.PayloadSize <= this.Config.MaxMessageSize));
-            
+            this.EnsuresNotDisposed();
             this.Send(new ProducerRequest(topic, partition, messages));
         }
 
@@ -85,11 +88,8 @@ namespace Kafka.Client.Producers.Sync
         /// </param>
         public void Send(ProducerRequest request)
         {
-            Guard.Assert<ArgumentNullException>(() => request != null);
-            using (var conn = new KafkaConnection(this.config.Host, this.config.Port))
-            {
-                conn.Write(request);
-            }
+            this.EnsuresNotDisposed();
+            this.connection.Write(request);
         }
 
         /// <summary>
@@ -100,7 +100,7 @@ namespace Kafka.Client.Producers.Sync
         /// </param>
         public void MultiSend(IEnumerable<ProducerRequest> requests)
         {
-            Guard.Assert<ArgumentNullException>(() => requests != null);
+            Guard.NotNull(requests, "requests");
             Guard.Assert<ArgumentNullException>(
                 () => requests.All(
                     x => x != null && x.MessageSet != null && x.MessageSet.Messages != null));
@@ -108,11 +108,47 @@ namespace Kafka.Client.Producers.Sync
                 () => requests.All(
                     x => x.MessageSet.Messages.All(
                         y => y != null && y.PayloadSize <= this.Config.MaxMessageSize)));
-
+            this.EnsuresNotDisposed();
             var multiRequest = new MultiProducerRequest(requests);
-            using (var conn = new KafkaConnection(this.config.Host, this.config.Port))
+            this.connection.Write(multiRequest);
+        }
+
+        /// <summary>
+        /// Releases all unmanaged and managed resources
+        /// </summary>
+        public void Dispose()
+        {
+            this.Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected virtual void Dispose(bool disposing)
+        {
+            if (!disposing)
+            {
+                return;
+            }
+
+            if (this.disposed)
+            {
+                return;
+            }
+
+            this.disposed = true;
+            if (this.connection != null)
+            {
+                this.connection.Dispose();
+            }
+        }
+
+        /// <summary>
+        /// Ensures that object was not disposed
+        /// </summary>
+        private void EnsuresNotDisposed()
+        {
+            if (this.disposed)
             {
-                conn.Write(multiRequest);
+                throw new ObjectDisposedException(this.GetType().Name);
             }
         }
     }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs Tue Oct 18 17:52:13 2011
@@ -40,6 +40,7 @@ namespace Kafka.Client.Producers.Sync
     {
         private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
         private readonly IDictionary<int, ISyncProducer> syncProducers;
+        private volatile bool disposed;
 
         /// <summary>
         /// Factory method used to instantiating synchronous producer pool
@@ -53,7 +54,7 @@ namespace Kafka.Client.Producers.Sync
         /// <returns>
         /// Instantiated synchronous producer pool
         /// </returns>
-        public static SyncProducerPool<TData> CreateSyncPool(ProducerConfig config, IEncoder<TData> serializer)
+        public static SyncProducerPool<TData> CreateSyncPool(ProducerConfiguration config, IEncoder<TData> serializer)
         {
             return new SyncProducerPool<TData>(config, serializer);
         }
@@ -73,7 +74,7 @@ namespace Kafka.Client.Producers.Sync
         /// <returns>
         /// Instantiated synchronous producer pool
         /// </returns>
-        public static SyncProducerPool<TData> CreateSyncPool(ProducerConfig config, IEncoder<TData> serializer, ICallbackHandler callbackHandler)
+        public static SyncProducerPool<TData> CreateSyncPool(ProducerConfiguration config, IEncoder<TData> serializer, ICallbackHandler callbackHandler)
         {
             return new SyncProducerPool<TData>(config, serializer, callbackHandler);
         }
@@ -97,7 +98,7 @@ namespace Kafka.Client.Producers.Sync
         /// Should be used for testing purpose only
         /// </remarks>
         private SyncProducerPool(
-            ProducerConfig config, 
+            ProducerConfiguration config, 
             IEncoder<TData> serializer,
             IDictionary<int, ISyncProducer> syncProducers,
             ICallbackHandler cbkHandler)
@@ -122,7 +123,7 @@ namespace Kafka.Client.Producers.Sync
         /// Should be used for testing purpose only
         /// </remarks>
         private SyncProducerPool(
-            ProducerConfig config,
+            ProducerConfiguration config,
             IEncoder<TData> serializer,
             ICallbackHandler cbkHandler)
             : this(config, serializer, new Dictionary<int, ISyncProducer>(), cbkHandler)
@@ -138,12 +139,12 @@ namespace Kafka.Client.Producers.Sync
         /// <param name="serializer">
         /// The serializer.
         /// </param>
-        private SyncProducerPool(ProducerConfig config, IEncoder<TData> serializer)
+        private SyncProducerPool(ProducerConfiguration config, IEncoder<TData> serializer)
             : this(
                 config,
                 serializer,
                 new Dictionary<int, ISyncProducer>(),
-                ReflectionHelper.Instantiate<ICallbackHandler>(config.CallbackHandler))
+                ReflectionHelper.Instantiate<ICallbackHandler>(config.CallbackHandlerClass))
         {
         }
 
@@ -158,7 +159,8 @@ namespace Kafka.Client.Producers.Sync
         /// </remarks>
         public override void Send(IEnumerable<ProducerPoolData<TData>> poolData)
         {
-            Guard.Assert<ArgumentNullException>(() => poolData != null);
+            this.EnsuresNotDisposed();
+            Guard.NotNull(poolData, "poolData");
             Dictionary<int, List<ProducerPoolData<TData>>> distinctBrokers = poolData.GroupBy(
                 x => x.BidPid.BrokerId, x => x)
                 .ToDictionary(x => x.Key, x => x.ToList());
@@ -188,15 +190,10 @@ namespace Kafka.Client.Producers.Sync
         /// <param name="broker">The broker informations.</param>
         public override void AddProducer(Broker broker)
         {
-            Guard.Assert<ArgumentNullException>(() => broker != null);
-            var syncConfig = new SyncProducerConfig
-            {
-                Host = broker.Host,
-                Port = broker.Port,
-                BufferSize = this.Config.BufferSize,
-                ConnectTimeout = this.Config.ConnectTimeout,
-                ReconnectInterval = this.Config.ReconnectInterval
-            };
+            this.EnsuresNotDisposed();
+            Guard.NotNull(broker, "broker");
+
+            var syncConfig = new SyncProducerConfiguration(this.Config, broker.Id, broker.Host, broker.Port);
             var syncProducer = new SyncProducer(syncConfig);
             Logger.InfoFormat(
                 CultureInfo.CurrentCulture,
@@ -206,5 +203,24 @@ namespace Kafka.Client.Producers.Sync
                 broker.Port);
             this.syncProducers.Add(broker.Id, syncProducer);
         }
+
+        protected override void Dispose(bool disposing)
+        {
+            if (!disposing)
+            {
+                return;
+            }
+
+            if (this.disposed)
+            {
+                return;
+            }
+
+            this.disposed = true;
+            foreach (var syncProducer in this.syncProducers.Values)
+            {
+                syncProducer.Dispose();
+            }
+        }
     }
 }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs Tue Oct 18 17:52:13 2011
@@ -116,7 +116,7 @@ namespace Kafka.Client.Requests
         /// </param>
         public void WriteTo(MemoryStream output)
         {
-            Guard.Assert<ArgumentNullException>(() => output != null);
+            Guard.NotNull(output, "output");
 
             using (var writer = new KafkaBinaryWriter(output))
             {
@@ -134,7 +134,7 @@ namespace Kafka.Client.Requests
         /// </param>
         public void WriteTo(KafkaBinaryWriter writer)
         {
-            Guard.Assert<ArgumentNullException>(() => writer != null);
+            Guard.NotNull(writer, "writer");
 
             writer.WriteTopic(this.Topic, DefaultEncoding);
             writer.Write(this.Partition);

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs Tue Oct 18 17:52:13 2011
@@ -51,7 +51,7 @@ namespace Kafka.Client.Requests
         /// <param name="requests">Requests to package up and batch.</param>
         public MultiFetchRequest(IList<FetchRequest> requests)
         {
-            Guard.Assert<ArgumentNullException>(() => requests != null);
+            Guard.NotNull(requests, "requests");
             ConsumerRequests = requests;
             int length = GetRequestLength(requests, DefaultEncoding);
             this.RequestBuffer = new BoundedBuffer(length);
@@ -79,7 +79,7 @@ namespace Kafka.Client.Requests
         /// </param>
         public void WriteTo(MemoryStream output)
         {
-            Guard.Assert<ArgumentNullException>(() => output != null);
+            Guard.NotNull(output, "output");
 
             using (var writer = new KafkaBinaryWriter(output))
             {
@@ -98,7 +98,7 @@ namespace Kafka.Client.Requests
         /// </param>
         public void WriteTo(KafkaBinaryWriter writer)
         {
-            Guard.Assert<ArgumentNullException>(() => writer != null);
+            Guard.NotNull(writer, "writer");
 
             foreach (var consumerRequest in ConsumerRequests)
             {

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs Tue Oct 18 17:52:13 2011
@@ -35,7 +35,7 @@ namespace Kafka.Client.Requests
 
         public static int GetBufferLength(IEnumerable<ProducerRequest> requests)
         {
-            Guard.Assert<ArgumentNullException>(() => requests != null);
+            Guard.NotNull(requests, "requests");
 
             return DefaultRequestSizeSize 
                 + DefaultRequestIdSize 
@@ -51,7 +51,8 @@ namespace Kafka.Client.Requests
         /// </param>
         public MultiProducerRequest(IEnumerable<ProducerRequest> requests)
         {
-            Guard.Assert<ArgumentNullException>(() => requests != null);
+            Guard.NotNull(requests, "requests");
+
             int length = GetBufferLength(requests);
             ProducerRequests = requests;
             this.RequestBuffer = new BoundedBuffer(length);
@@ -79,7 +80,7 @@ namespace Kafka.Client.Requests
         /// </param>
         public void WriteTo(MemoryStream output)
         {
-            Guard.Assert<ArgumentNullException>(() => output != null);
+            Guard.NotNull(output, "output");
 
             using (var writer = new KafkaBinaryWriter(output))
             {
@@ -97,7 +98,7 @@ namespace Kafka.Client.Requests
         /// </param>
         public void WriteTo(KafkaBinaryWriter writer)
         {
-            Guard.Assert<ArgumentNullException>(() => writer != null);
+            Guard.NotNull(writer, "writer");
 
             writer.Write((short)this.ProducerRequests.Count());
             foreach (var request in ProducerRequests)
@@ -108,33 +109,21 @@ namespace Kafka.Client.Requests
 
         public override string ToString()
         {
-            using (var reader = new KafkaBinaryReader(this.RequestBuffer))
-            {
-                return ParseFrom(reader, (int)this.RequestBuffer.Length);
-            }
-        }
-
-        public static string ParseFrom(KafkaBinaryReader reader, int count)
-        {
-            Guard.Assert<ArgumentNullException>(() => reader != null);
-
             var sb = new StringBuilder();
             sb.Append("Request size: ");
-            sb.Append(reader.ReadInt32());
+            sb.Append(this.RequestBuffer.Capacity - DefaultRequestSizeSize);
             sb.Append(", RequestId: ");
-            short reqId = reader.ReadInt16();
-            sb.Append(reqId);
+            sb.Append(this.RequestTypeId);
             sb.Append("(");
-            sb.Append((RequestTypes)reqId);
+            sb.Append((RequestTypes)this.RequestTypeId);
             sb.Append("), Single Requests: {");
             int i = 1;
-            while (reader.BaseStream.Position != reader.BaseStream.Length)
+            foreach (var request in ProducerRequests)
             {
                 sb.Append("Request ");
                 sb.Append(i);
                 sb.Append(" {");
-                int msgSize = 0;
-                sb.Append(ProducerRequest.ParseFrom(reader, msgSize));
+                sb.Append(request.ToString());
                 sb.AppendLine("} ");
                 i++;
             }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs Tue Oct 18 17:52:13 2011
@@ -107,7 +107,7 @@ namespace Kafka.Client.Requests
         /// </param>
         public void WriteTo(System.IO.MemoryStream output)
         {
-            Guard.Assert<ArgumentNullException>(() => output != null);
+            Guard.NotNull(output, "output");
 
             using (var writer = new KafkaBinaryWriter(output))
             {
@@ -125,7 +125,7 @@ namespace Kafka.Client.Requests
         /// </param>
         public void WriteTo(KafkaBinaryWriter writer)
         {
-            Guard.Assert<ArgumentNullException>(() => writer != null);
+            Guard.NotNull(writer, "writer");
 
             writer.WriteTopic(this.Topic, DefaultEncoding);
             writer.Write(this.Partition);

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs Tue Oct 18 17:52:13 2011
@@ -35,7 +35,6 @@ namespace Kafka.Client.Requests
         public const byte DefaultPartitionSize = 4;
         public const byte DefaultSetSizeSize = 4;
         public const byte DefaultHeaderSize = DefaultRequestSizeSize + DefaultTopicSizeSize + DefaultPartitionSize + DefaultRequestIdSize + DefaultSetSizeSize;
-        public const short DefaultTopicLengthIfNonePresent = 2;
 
         public static int GetRequestLength(string topic, int messegesSize, string encoding = DefaultEncoding)
         {
@@ -45,7 +44,8 @@ namespace Kafka.Client.Requests
 
         public ProducerRequest(string topic, int partition, BufferedMessageSet messages)
         {
-            Guard.Assert<ArgumentNullException>(() => messages != null);
+            Guard.NotNull(messages, "messages");
+
             int length = GetRequestLength(topic, messages.SetSize);
             this.RequestBuffer = new BoundedBuffer(length);
             this.Topic = topic;
@@ -91,7 +91,8 @@ namespace Kafka.Client.Requests
         /// </param>
         public void WriteTo(MemoryStream output)
         {
-            Guard.Assert<ArgumentNullException>(() => output != null);
+            Guard.NotNull(output, "output");
+
             using (var writer = new KafkaBinaryWriter(output))
             {
                 writer.Write(this.RequestBuffer.Capacity - DefaultRequestSizeSize);
@@ -108,7 +109,8 @@ namespace Kafka.Client.Requests
         /// </param>
         public void WriteTo(KafkaBinaryWriter writer)
         {
-            Guard.Assert<ArgumentNullException>(() => writer != null);
+            Guard.NotNull(writer, "writer");
+
             writer.WriteTopic(this.Topic, DefaultEncoding);
             writer.Write(this.Partition);
             writer.Write(this.MessageSet.SetSize);
@@ -117,39 +119,22 @@ namespace Kafka.Client.Requests
 
         public override string ToString()
         {
-            using (var reader = new KafkaBinaryReader(this.RequestBuffer))
-            {
-                return ParseFrom(reader, this.TotalSize);
-            }
-        }
-
-        public static string ParseFrom(KafkaBinaryReader reader, int count, bool skipReqInfo = false)
-        {
-            Guard.Assert<ArgumentNullException>(() => reader != null);
             var sb = new StringBuilder();
-
-            if (!skipReqInfo)
-            {
-                sb.Append("Request size: ");
-                sb.Append(reader.ReadInt32());
-                sb.Append(", RequestId: ");
-                short reqId = reader.ReadInt16();
-                sb.Append(reqId);
-                sb.Append("(");
-                sb.Append((RequestTypes)reqId);
-                sb.Append(")");
-            }
-
+            sb.Append("Request size: ");
+            sb.Append(this.TotalSize);
+            sb.Append(", RequestId: ");
+            sb.Append(this.RequestTypeId);
+            sb.Append("(");
+            sb.Append((RequestTypes)this.RequestTypeId);
+            sb.Append(")");
             sb.Append(", Topic: ");
-            string topic = reader.ReadTopic(DefaultEncoding);
-            sb.Append(topic);
+            sb.Append(this.Topic);
             sb.Append(", Partition: ");
-            sb.Append(reader.ReadInt32());
+            sb.Append(this.Partition);
             sb.Append(", Set size: ");
-            sb.Append(reader.ReadInt32());
-            int size = count - DefaultHeaderSize - GetTopicLength(topic);
+            sb.Append(this.MessageSet.SetSize);
             sb.Append(", Set {");
-            sb.Append(BufferedMessageSet.ParseFrom(reader, size));
+            sb.Append(this.MessageSet.ToString());
             sb.Append("}");
             return sb.ToString();
         }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs Tue Oct 18 17:52:13 2011
@@ -20,6 +20,7 @@ namespace Kafka.Client.Serialization
     using System.IO;
     using System.Net;
     using System.Text;
+    using System.Net.Sockets;
 
     /// <summary>
     /// Reads data from underlying stream using big endian bytes order for primitive types
@@ -47,7 +48,10 @@ namespace Kafka.Client.Serialization
         /// </param>
         protected override void Dispose(bool disposing)
         {
-            this.BaseStream.Position = 0;
+            if (this.BaseStream.CanSeek)
+            {
+                this.BaseStream.Position = 0;
+            }
         }
 
         /// <summary>
@@ -127,5 +131,18 @@ namespace Kafka.Client.Serialization
             Encoding encoder = Encoding.GetEncoding(encoding);
             return encoder.GetString(bytes);
         }
+
+        public bool DataAvailabe
+        {
+            get
+            {
+                if (this.BaseStream is NetworkStream)
+                {
+                    return ((NetworkStream)this.BaseStream).DataAvailable;
+                }
+
+                return this.BaseStream.Length != this.BaseStream.Position;
+            }
+        }
     }
 }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs Tue Oct 18 17:52:13 2011
@@ -18,29 +18,53 @@
 namespace Kafka.Client.Utils
 {
     using System;
+    using System.Collections;
     using System.Collections.Generic;
     using System.Linq.Expressions;
     using System.Text.RegularExpressions;
 
     internal static class Guard
     {
-        /// <summary>
-        /// Checks whether given expression is true. Throws <see cref="InvalidOperationException" /> if not.
-        /// </summary>
-        /// <param name="assertion">
-        /// The assertion.
-        /// </param>
-        /// <exception cref="InvalidOperationException">
-        /// Thrown when condition is not met.
-        /// </exception>
-        public static void Assert(Expression<Func<bool>> assertion)
+        public static void NotNull(object parameter, string paramName)
         {
-            var compiled = assertion.Compile();
-            var evaluatedValue = compiled();
-            if (!evaluatedValue)
+            if (parameter == null)
+            {
+                throw new ArgumentNullException(paramName);
+            }
+        }
+
+        public static void Count(ICollection parameter, int length, string paramName)
+        {
+            if (parameter.Count != length)
+            {
+                throw new ArgumentOutOfRangeException(paramName, parameter.Count, string.Empty);
+            }
+        }
+
+        public static void Greater(int parameter, int expected, string paramName)
+        {
+            if (parameter <= expected)
+            {
+                throw new ArgumentOutOfRangeException(paramName, parameter, string.Empty);
+            }
+        }
+
+        public static void NotNullNorEmpty(string parameter, string paramName)
+        {
+            if (string.IsNullOrEmpty(parameter))
+            {
+                throw new ArgumentException("Given string is empty", paramName);
+            }
+        }
+
+        public static void AllNotNull(IEnumerable parameter, string paramName)
+        {
+            foreach (var par in parameter)
             {
-                throw new InvalidOperationException(
-                    string.Format("'{0}' is not met.", Normalize(assertion.ToString())));
+                if (par == null)
+                {
+                    throw new ArgumentNullException(paramName);
+                }
             }
         }
 

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs Tue Oct 18 17:52:13 2011
@@ -25,23 +25,21 @@ namespace Kafka.Client.Utils
         public static T Instantiate<T>(string className)
             where T : class
         {
-            Type t1;
             object o1;
             if (string.IsNullOrEmpty(className))
             {
                 return default(T);
             }
 
-            if (className.Contains("`1"))
+            Type t1 = Type.GetType(className, true);
+            if (t1.IsGenericType)
             {
-                t1 = Type.GetType(className);
                 var t2 = typeof(T).GetGenericArguments();
                 var t3 = t1.MakeGenericType(t2);
                 o1 = Activator.CreateInstance(t3);
                 return o1 as T;
             }
 
-            t1 = Type.GetType(className, true);
             o1 = Activator.CreateInstance(t1);
             return o1 as T;
         }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs Tue Oct 18 17:52:13 2011
@@ -28,7 +28,7 @@ namespace Kafka.Client
         /// Initializes a new instance of the <see cref="ZooKeeperAwareKafkaClientBase"/> class.
         /// </summary>
         /// <param name="config">The config.</param>
-        protected ZooKeeperAwareKafkaClientBase(ZKConfig config)
+        protected ZooKeeperAwareKafkaClientBase(ZooKeeperConfiguration config)
         {
             this.IsZooKeeperEnabled = config != null && !string.IsNullOrEmpty(config.ZkConnect);
         }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs Tue Oct 18 17:52:13 2011
@@ -34,7 +34,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// <remarks>
         /// Used for testing purpose
         /// </remarks>
-        int IdleTime { get; }
+        int? IdleTime { get; }
 
         /// <summary>
         /// Connects to ZooKeeper server within given time period and installs watcher in ZooKeeper

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs Tue Oct 18 17:52:13 2011
@@ -77,9 +77,9 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </param>
         public void HandleChildChange(ZooKeeperChildChangedEventArgs e)
         {
-            Guard.Assert<ArgumentNullException>(() => e != null);
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(e.Path));
-            Guard.Assert<ArgumentNullException>(() => e.Children != null);
+            Guard.NotNull(e, "e");
+            Guard.NotNullNorEmpty(e.Path, "e.Path");
+            Guard.NotNull(e.Children, "e.Children");
 
             lock (this.syncLock)
             {

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs Tue Oct 18 17:52:13 2011
@@ -49,7 +49,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
 
         private readonly object syncLock;
 
-        private readonly ConsumerConfig config;
+        private readonly ConsumerConfiguration config;
 
         private readonly IZooKeeperClient zkClient;
 
@@ -60,7 +60,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         private readonly ZookeeperConsumerConnector zkConsumerConnector;
 
         internal ZKRebalancerListener(
-            ConsumerConfig config,
+            ConsumerConfiguration config,
             string consumerIdString,
             IDictionary<string, IDictionary<Partition, PartitionTopicInfo>> topicRegistry,
             IZooKeeperClient zkClient,
@@ -106,7 +106,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
                     //// release all partitions, reset state and retry
                     this.ReleasePartitionOwnership();
                     this.ResetState();
-                    Thread.Sleep(config.ZkSyncTimeMs);
+                    Thread.Sleep(config.ZooKeeper.ZkSyncTimeMs);
                 }
             }
 
@@ -124,9 +124,9 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </remarks>
         public void HandleChildChange(ZooKeeperChildChangedEventArgs args)
         {
-            Guard.Assert<ArgumentNullException>(() => args != null);
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(args.Path));
-            Guard.Assert<ArgumentNullException>(() => args.Children != null);
+            Guard.NotNull(args, "args");
+            Guard.NotNullNorEmpty(args.Path, "args.Path");
+            Guard.NotNull(args.Children, "args.Children");
 
             SyncedRebalance();
         }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs Tue Oct 18 17:52:13 2011
@@ -58,7 +58,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </remarks>
         public void HandleStateChanged(ZooKeeperStateChangedEventArgs args)
         {
-            Guard.Assert<ArgumentNullException>(() => args != null);
+            Guard.NotNull(args, "args");
             Guard.Assert<ArgumentException>(() => args.State != KeeperState.Unknown);
         }
 
@@ -72,7 +72,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </remarks>
         public void HandleSessionCreated(ZooKeeperSessionCreatedEventArgs args)
         {
-            Guard.Assert<ArgumentNullException>(() => args != null);
+            Guard.NotNull(args, "args");
 
             Logger.InfoFormat(
                 CultureInfo.CurrentCulture,

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs Tue Oct 18 17:52:13 2011
@@ -209,7 +209,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </returns>
         public bool WaitUntilConnected(int connectionTimeout)
         {
-            Guard.Assert<ArgumentOutOfRangeException>(() => connectionTimeout > 0);
+            Guard.Greater(connectionTimeout, 0, "connectionTimeout");
+
             this.EnsuresNotDisposed();
             if (this.eventWorker != null && this.eventWorker == Thread.CurrentThread)
             {
@@ -250,7 +251,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </returns>
         public T RetryUntilConnected<T>(Func<T> callback)
         {
-            Guard.Assert<ArgumentNullException>(() => callback != null);
+            Guard.NotNull(callback, "callback");
+
             this.EnsuresNotDisposed();
             if (this.zooKeeperEventWorker != null && this.zooKeeperEventWorker == Thread.CurrentThread)
             {
@@ -290,7 +292,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </remarks>
         public bool Exists(string path)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             bool hasListeners = this.HasListeners(path);
@@ -311,7 +313,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </returns>
         public bool Exists(string path, bool watch)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             return this.RetryUntilConnected(
@@ -332,7 +334,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </remarks>
         public IList<string> GetChildren(string path)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             bool hasListeners = this.HasListeners(path);
@@ -353,7 +355,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </returns>
         public IList<string> GetChildren(string path, bool watch)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             return this.RetryUntilConnected(
@@ -375,7 +377,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </remarks>
         public int CountChildren(string path)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             try
@@ -413,7 +415,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         public T ReadData<T>(string path, Stat stats, bool watch)
             where T : class 
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             byte[] bytes = this.RetryUntilConnected(
@@ -443,7 +445,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </remarks>
         public T ReadData<T>(string path, Stat stats) where T : class
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             bool hasListeners = this.HasListeners(path);
@@ -461,7 +463,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </param>
         public void WriteData(string path, object data)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             this.WriteData(path, data, -1);
@@ -484,7 +486,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </remarks>
         public void WriteData(string path, object data, int expectedVersion)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             byte[] bytes = this.serializer.Serialize(data);
@@ -507,7 +509,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </returns>
         public bool Delete(string path)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             return this.RetryUntilConnected(
@@ -536,7 +538,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </returns>
         public bool DeleteRecursive(string path)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             IList<string> children;
@@ -568,7 +570,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </param>
         public void MakeSurePersistentPathExists(string path)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             if (!this.Exists(path))
@@ -588,7 +590,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </returns>
         public IList<string> GetChildrenParentMayNotExist(string path)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             try
@@ -616,7 +618,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         public T ReadData<T>(string path)
             where T : class
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             return this.ReadData<T>(path, false);
@@ -671,8 +673,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </remarks>
         public void CreatePersistent(string path, bool createParents)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
-
+            Guard.NotNullNorEmpty(path, "path");
             this.EnsuresNotDisposed();
             try
             {
@@ -710,8 +711,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </remarks>
         public void CreatePersistent(string path)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
-
+            Guard.NotNullNorEmpty(path, "path");
+            this.EnsuresNotDisposed();
             this.CreatePersistent(path, false);
         }
 
@@ -730,8 +731,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </remarks>
         public void CreatePersistent(string path, object data)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
-
+            Guard.NotNullNorEmpty(path, "path");
+            this.EnsuresNotDisposed();
             this.Create(path, data, CreateMode.Persistent);
         }
 
@@ -753,8 +754,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </returns>
         public string CreatePersistentSequential(string path, object data)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
-
+            Guard.NotNullNorEmpty(path, "path");
+            this.EnsuresNotDisposed();
             return this.Create(path, data, CreateMode.PersistentSequential);
         }
 
@@ -796,8 +797,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </remarks>
         public void CreateEphemeral(string path)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
-
+            Guard.NotNullNorEmpty(path, "path");
+            this.EnsuresNotDisposed();
             this.Create(path, null, CreateMode.Ephemeral);
         }
 
@@ -815,8 +816,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </remarks>
         public void CreateEphemeral(string path, object data)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
-
+            Guard.NotNullNorEmpty(path, "path");
+            this.EnsuresNotDisposed();
             this.Create(path, data, CreateMode.Ephemeral);
         }
 
@@ -837,8 +838,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </returns>
         public string CreateEphemeralSequential(string path, object data)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
-
+            Guard.NotNullNorEmpty(path, "path");
+            this.EnsuresNotDisposed();
             return this.Create(path, data, CreateMode.EphemeralSequential);
         }
 
@@ -861,8 +862,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
         public T ReadData<T>(string path, bool returnNullIfPathNotExists)
             where T : class 
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
-
+            Guard.NotNullNorEmpty(path, "path");
+            this.EnsuresNotDisposed();
             try
             {
                 return this.ReadData<T>(path, null);

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs Tue Oct 18 17:52:13 2011
@@ -130,7 +130,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </param>
         public void Delete(string path)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             this.Client.Delete(path, -1);
@@ -150,7 +150,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </returns>
         public bool Exists(string path, bool watch)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             return this.Client.Exists(path, true) != null;
@@ -173,7 +173,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </returns>
         public string Create(string path, byte[] data, CreateMode mode)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             return this.Client.Create(path, data, Ids.OPEN_ACL_UNSAFE, mode);
@@ -193,7 +193,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </returns>
         public IList<string> GetChildren(string path, bool watch)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             return this.Client.GetChildren(path, watch);
@@ -216,7 +216,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </returns>
         public byte[] ReadData(string path, Stat stats, bool watch)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             return this.Client.GetData(path, watch, stats);
@@ -233,7 +233,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </param>
         public void WriteData(string path, byte[] data)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             this.WriteData(path, data, -1);
@@ -253,7 +253,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </param>
         public void WriteData(string path, byte[] data, int version)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             this.Client.SetData(path, data, version);
@@ -270,7 +270,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </returns>
         public long GetCreateTime(string path)
         {
-            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.NotNullNorEmpty(path, "path");
 
             this.EnsuresNotDisposed();
             Stat stats = this.Client.Exists(path, false);

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs Tue Oct 18 17:52:13 2011
@@ -48,7 +48,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </returns>
         public byte[] Serialize(object obj)
         {
-            Guard.Assert<ArgumentNullException>(() => obj != null);
+            Guard.NotNull(obj, "obj");
             return Encoding.UTF8.GetBytes(obj.ToString());
         }
 
@@ -63,8 +63,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
         /// </returns>
         public object Deserialize(byte[] bytes)
         {
-            Guard.Assert<ArgumentNullException>(() => bytes != null);
-            Guard.Assert<ArgumentException>(() => bytes.Count() > 0);
+            Guard.NotNull(bytes, "bytes");
+            Guard.Greater(bytes.Count(), 0, "bytes");
 
             return bytes == null ? null : Encoding.UTF8.GetString(bytes);
         }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config Tue Oct 18 17:52:13 2011
@@ -26,7 +26,7 @@
   </configSections>
   <kafkaClientConfiguration>
     <kafkaServer address="192.168.1.39" port="9092"></kafkaServer>
-    <consumer numberOfTries="2" groupId="testGroup" timeout="10000" autoOffsetReset="smallest" autoCommit="true" autoCommitIntervalMs="10000" fetchSize="307200" backOffIncrementMs="2000"/>
+    <consumer numberOfTries="2" groupId="testGroup" timeout="10000" autoOffsetReset="smallest" autoCommit="true" autoCommitIntervalMs="1000" fetchSize="307200" backOffIncrementMs="2000"/>
     <brokerPartitionInfos>
       <add id="0" address="192.168.1.39" port="9092" />
       <add id="1" address="192.168.1.39" port="9101" />
@@ -34,4 +34,4 @@
     </brokerPartitionInfos>
     <zooKeeperServers addressList="192.168.1.39:2181" sessionTimeout="30000" connectionTimeout="3000"></zooKeeperServers>
   </kafkaClientConfiguration>
-</configuration>
\ No newline at end of file
+</configuration>