You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/21 21:17:25 UTC

svn commit: r1173797 [4/10] - in /incubator/kafka/trunk/clients/csharp: ./ lib/StyleCop/ src/Kafka/ src/Kafka/Kafka.Client/ src/Kafka/Kafka.Client/Cfg/ src/Kafka/Kafka.Client/Cluster/ src/Kafka/Kafka.Client/Consumers/ src/Kafka/Kafka.Client/Exceptions/...

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,336 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers.Partitioning
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Globalization;
+    using System.Reflection;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Cluster;
+    using Kafka.Client.Utils;
+    using Kafka.Client.ZooKeeperIntegration;
+    using Kafka.Client.ZooKeeperIntegration.Events;
+    using Kafka.Client.ZooKeeperIntegration.Listeners;
+    using log4net;
+    using ZooKeeperNet;
+
+    /// <summary>
+    /// Fetch broker info like ID, host, port and number of partitions from ZooKeeper.
+    /// </summary>
+    /// <remarks>
+    /// Used when zookeeper based auto partition discovery is enabled
+    /// </remarks>
+    internal class ZKBrokerPartitionInfo : IBrokerPartitionInfo, IZooKeeperStateListener
+    {
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);   
+        private readonly Action<int, string, int> callback;
+        private IDictionary<int, Broker> brokers;
+        private IDictionary<string, SortedSet<Partition>> topicBrokerPartitions;
+        private readonly IZooKeeperClient zkclient;
+        private readonly BrokerTopicsListener brokerTopicsListener;
+        private volatile bool disposed;
+        private readonly object shuttingDownLock = new object();
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ZKBrokerPartitionInfo"/> class.
+        /// </summary>
+        /// <param name="zkclient">The wrapper above ZooKeeper client.</param>
+        public ZKBrokerPartitionInfo(IZooKeeperClient zkclient)
+        {
+            this.zkclient = zkclient;
+            this.zkclient.Connect();
+            this.InitializeBrokers();
+            this.InitializeTopicBrokerPartitions();
+            this.brokerTopicsListener = new BrokerTopicsListener(this.zkclient, this.topicBrokerPartitions, this.brokers, this.callback);
+            this.RegisterListeners();
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ZKBrokerPartitionInfo"/> class.
+        /// </summary>
+        /// <param name="config">The config.</param>
+        /// <param name="callback">The callback invoked when new broker is added.</param>
+        public ZKBrokerPartitionInfo(ZKConfig config, Action<int, string, int> callback)
+            : this(new ZooKeeperClient(config.ZkConnect, config.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+        {
+            this.callback = callback;
+        }
+
+        /// <summary>
+        /// Gets a mapping from broker ID to the host and port for all brokers
+        /// </summary>
+        /// <returns>
+        /// Mapping from broker ID to the host and port for all brokers
+        /// </returns>
+        public IDictionary<int, Broker> GetAllBrokerInfo()
+        {
+            this.EnsuresNotDisposed();
+            return this.brokers;
+        }
+
+        /// <summary>
+        /// Gets a mapping from broker ID to partition IDs
+        /// </summary>
+        /// <param name="topic">The topic for which this information is to be returned</param>
+        /// <returns>
+        /// Mapping from broker ID to partition IDs
+        /// </returns>
+        public SortedSet<Partition> GetBrokerPartitionInfo(string topic)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(topic));
+
+            this.EnsuresNotDisposed();
+            SortedSet<Partition> brokerPartitions = null;
+            if (this.topicBrokerPartitions.ContainsKey(topic))
+            {
+                brokerPartitions = this.topicBrokerPartitions[topic];
+            }
+
+            if (brokerPartitions == null || brokerPartitions.Count == 0)
+            {
+                var numBrokerPartitions = this.BootstrapWithExistingBrokers(topic);
+                this.topicBrokerPartitions.Add(topic, numBrokerPartitions);
+                return numBrokerPartitions;
+            }
+
+            return brokerPartitions;
+        }
+
+        /// <summary>
+        /// Gets the host and port information for the broker identified by the given broker ID
+        /// </summary>
+        /// <param name="brokerId">The broker ID.</param>
+        /// <returns>
+        /// Host and port of broker
+        /// </returns>
+        public Broker GetBrokerInfo(int brokerId)
+        {
+            this.EnsuresNotDisposed();
+            return this.brokers.ContainsKey(brokerId) ? this.brokers[brokerId] : null;
+        }
+
+        /// <summary>
+        /// Closes underlying connection to ZooKeeper
+        /// </summary>
+        public void Dispose()
+        {
+            if (this.disposed)
+            {
+                return;
+            }
+
+            lock (this.shuttingDownLock)
+            {
+                if (this.disposed)
+                {
+                    return;
+                }
+
+                this.disposed = true;
+            }
+
+            try
+            {
+                if (this.zkclient != null)
+                {
+                    this.zkclient.Dispose();
+                }
+            }
+            catch (Exception exc)
+            {
+                Logger.Warn("Ignoring unexpected errors on closing", exc);
+            }
+        }
+
+        /// <summary>
+        /// Initializes the list of brokers.
+        /// </summary>
+        private void InitializeBrokers()
+        {
+            if (this.brokers != null)
+            {
+                return;
+            }
+
+            this.brokers = new Dictionary<int, Broker>();
+            IList<string> brokerIds = this.zkclient.GetChildrenParentMayNotExist(ZooKeeperClient.DefaultBrokerIdsPath);
+            foreach (var brokerId in brokerIds)
+            {
+                string path = ZooKeeperClient.DefaultBrokerIdsPath + "/" + brokerId;
+                int id = int.Parse(brokerId, CultureInfo.InvariantCulture);
+                var info = this.zkclient.ReadData<string>(path, null);
+                string[] parts = info.Split(':');
+                int port = int.Parse(parts[2], CultureInfo.InvariantCulture);
+                this.brokers.Add(id, new Broker(id, parts[0], parts[1], port));
+            }
+        }
+
+        /// <summary>
+        /// Initializes the topic - broker's partitions mappings.
+        /// </summary>
+        private void InitializeTopicBrokerPartitions()
+        {
+            if (this.topicBrokerPartitions != null)
+            {
+                return;
+            }
+
+            this.topicBrokerPartitions = new Dictionary<string, SortedSet<Partition>>();
+            this.zkclient.MakeSurePersistentPathExists(ZooKeeperClient.DefaultBrokerTopicsPath);
+            IList<string> topics = this.zkclient.GetChildrenParentMayNotExist(ZooKeeperClient.DefaultBrokerTopicsPath);
+            foreach (string topic in topics)
+            {
+                string brokerTopicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic;
+                IList<string> brokersPerTopic = this.zkclient.GetChildrenParentMayNotExist(brokerTopicPath);
+                var brokerPartitions = new SortedDictionary<int, int>();
+                foreach (string brokerId in brokersPerTopic)
+                {
+                    string path = brokerTopicPath + "/" + brokerId;
+                    var numPartitionsPerBrokerAndTopic = this.zkclient.ReadData<string>(path);
+                    brokerPartitions.Add(int.Parse(brokerId, CultureInfo.InvariantCulture), int.Parse(numPartitionsPerBrokerAndTopic, CultureInfo.CurrentCulture));
+                }              
+
+                var brokerParts = new SortedSet<Partition>();
+                foreach (var brokerPartition in brokerPartitions)
+                {
+                    for (int i = 0; i < brokerPartition.Value; i++)
+                    {
+                        var bidPid = new Partition(brokerPartition.Key, i);
+                        brokerParts.Add(bidPid);
+                    }
+                }
+
+                this.topicBrokerPartitions.Add(topic, brokerParts);
+            }
+        }
+
+        /// <summary>
+        /// Add the all available brokers with default one partition for new topic, so all of the brokers
+        /// participate in hosting this topic
+        /// </summary>
+        /// <param name="topic">The new topic.</param>
+        /// <returns>Default partitions for new broker</returns>
+        /// <remarks>
+        /// Since we do not have the in formation about number of partitions on these brokers, just assume single partition
+        /// just pick partition 0 from each broker as a candidate
+        /// </remarks>
+        private SortedSet<Partition> BootstrapWithExistingBrokers(string topic)
+        {
+            Logger.Debug("Currently, no brokers are registered under topic: " + topic);
+            Logger.Debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default "
+                + "number of partitions = 1");
+            var numBrokerPartitions = new SortedSet<Partition>();
+            var allBrokers = this.zkclient.GetChildrenParentMayNotExist(ZooKeeperClient.DefaultBrokerIdsPath);
+            Logger.Debug("List of all brokers currently registered in zookeeper -> " + string.Join(", ", allBrokers));
+            foreach (var broker in allBrokers)
+            {
+                numBrokerPartitions.Add(new Partition(int.Parse(broker, CultureInfo.InvariantCulture), 0));
+            }
+
+            Logger.Debug("Adding following broker id, partition id for NEW topic: " + topic + " -> " + string.Join(", ", numBrokerPartitions));
+            return numBrokerPartitions;
+        }
+
+        /// <summary>
+        /// Registers the listeners under several path in ZooKeeper 
+        /// to keep related data structures updated.
+        /// </summary>
+        /// <remarks>
+        /// Watch on following path:
+        /// /broker/topics
+        /// /broker/topics/[topic]
+        /// /broker/ids
+        /// </remarks>
+        private void RegisterListeners()
+        {
+            this.zkclient.Subscribe(ZooKeeperClient.DefaultBrokerTopicsPath, this.brokerTopicsListener);
+            Logger.Debug("Registering listener on path: " + ZooKeeperClient.DefaultBrokerTopicsPath);
+            foreach (string topic in this.topicBrokerPartitions.Keys)
+            {
+                string path = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic;
+                this.zkclient.Subscribe(path, this.brokerTopicsListener);
+                Logger.Debug("Registering listener on path: " + path);
+            }
+
+            this.zkclient.Subscribe(ZooKeeperClient.DefaultBrokerIdsPath, this.brokerTopicsListener);
+            Logger.Debug("Registering listener on path: " + ZooKeeperClient.DefaultBrokerIdsPath);
+
+            this.zkclient.Subscribe(this);
+            Logger.Debug("Registering listener on state changed event");
+        }
+
+        /// <summary>
+        /// Resets the related data structures
+        /// </summary>
+        private void Reset()
+        {
+            this.topicBrokerPartitions = null;
+            this.brokers = null;
+            this.InitializeBrokers();
+            this.InitializeTopicBrokerPartitions();
+        }
+
+        /// <summary>
+        /// Ensures that object was not disposed
+        /// </summary>
+        private void EnsuresNotDisposed()
+        {
+            if (this.disposed)
+            {
+                throw new ObjectDisposedException(this.GetType().Name);
+            }
+        }
+
+        /// <summary>
+        /// Called when the ZooKeeper connection state has changed.
+        /// </summary>
+        /// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperStateChangedEventArgs"/> instance containing the event data.</param>
+        /// <remarks>
+        /// Do nothing, since zkclient will do reconnect for us.
+        /// </remarks>
+        public void HandleStateChanged(ZooKeeperStateChangedEventArgs args)
+        {
+            Guard.Assert<ArgumentNullException>(() => args != null);
+            Guard.Assert<ArgumentException>(() => args.State != KeeperState.Unknown);
+
+            this.EnsuresNotDisposed();
+            Logger.Debug("Handle state change: do nothing, since zkclient will do reconnect for us.");
+        }
+
+        /// <summary>
+        /// Called after the ZooKeeper session has expired and a new session has been created.
+        /// </summary>
+        /// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperSessionCreatedEventArgs"/> instance containing the event data.</param>
+        /// <remarks>
+        /// We would have to re-create any ephemeral nodes here.
+        /// </remarks>
+        public void HandleSessionCreated(ZooKeeperSessionCreatedEventArgs args)
+        {
+            Guard.Assert<ArgumentNullException>(() => args != null);
+
+            this.EnsuresNotDisposed();
+            Logger.Debug("ZK expired; release old list of broker partitions for topics ");
+            this.Reset();
+            this.brokerTopicsListener.ResetState();
+            foreach (var topic in this.topicBrokerPartitions.Keys)
+            {
+                this.zkclient.Subscribe(ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic, this.brokerTopicsListener);   
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers
+{
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Producers.Async;
+    using Kafka.Client.Producers.Partitioning;
+    using Kafka.Client.Serialization;
+
+    /// <summary>
+    /// High-level Producer API that exposes all the producer functionality to the client 
+    /// using <see cref="System.String" /> as type of key and <see cref="Message" /> as type of data
+    /// </summary>
+    public class Producer : Producer<string, Message>
+    {
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Producer"/> class.
+        /// </summary>
+        /// <param name="config">The config object.</param>
+        /// <param name="partitioner">The partitioner that implements <see cref="IPartitioner&lt;String&gt;" /> 
+        /// used to supply a custom partitioning strategy based on the message key.</param>
+        /// <param name="producerPool">Pool of producers, one per broker.</param>
+        /// <param name="populateProducerPool">if set to <c>true</c>, producers should be populated.</param>
+        /// <remarks>
+        /// Should be used for testing purpose only.
+        /// </remarks>
+        internal Producer(ProducerConfig config, IPartitioner<string> partitioner, IProducerPool<Message> producerPool, bool populateProducerPool)
+            : base(config, partitioner, producerPool, populateProducerPool)
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Producer"/> class.
+        /// </summary>
+        /// <param name="config">The config object.</param>
+        /// <remarks>
+        /// Can be used when all config parameters will be specified through the config object
+        /// and will be instantiated via reflection
+        /// </remarks>
+        public Producer(ProducerConfig config)
+            : base(config)
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Producer"/> class.
+        /// </summary>
+        /// <param name="config">The config object.</param>
+        /// <param name="partitioner">The partitioner that implements <see cref="IPartitioner&lt;String&gt;" /> 
+        /// used to supply a custom partitioning strategy based on the message key.</param>
+        /// <param name="encoder">The encoder that implements <see cref="IEncoder&lt;Message&gt;" /></param>
+        /// <param name="callbackHandler">The callback handler that implements <see cref="ICallbackHandler" />, used 
+        /// to supply callback invoked when sending asynchronous request is completed.</param>
+        /// <remarks>
+        /// Can be used to provide pre-instantiated objects for all config parameters
+        /// that would otherwise be instantiated via reflection.
+        /// </remarks>
+        public Producer(ProducerConfig config, IPartitioner<string> partitioner, IEncoder<Message> encoder, ICallbackHandler callbackHandler)
+            : base(config, partitioner, encoder, callbackHandler)
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Producer"/> class.
+        /// </summary>
+        /// <param name="config">The config object.</param>
+        /// <param name="partitioner">The partitioner that implements <see cref="IPartitioner&lt;TKey&gt;" /> 
+        /// used to supply a custom partitioning strategy based on the message key.</param>
+        /// <param name="encoder">The encoder that implements <see cref="IEncoder&lt;Message&gt;" /> 
+        /// </param>
+        /// <remarks>
+        /// Can be used to provide pre-instantiated objects for all config parameters
+        /// that would otherwise be instantiated via reflection.
+        /// </remarks>
+        public Producer(ProducerConfig config, IPartitioner<string> partitioner, IEncoder<Message> encoder)
+            : base(config, partitioner, encoder)
+        {
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,329 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Globalization;
+    using System.Linq;
+    using System.Reflection;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Cluster;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Producers.Async;
+    using Kafka.Client.Producers.Partitioning;
+    using Kafka.Client.Requests;
+    using Kafka.Client.Serialization;
+    using Kafka.Client.Utils;
+    using log4net;
+
+    /// <summary>
+    /// High-level Producer API that exposes all the producer functionality to the client
+    /// </summary>
+    /// <typeparam name="TKey">The type of the key.</typeparam>
+    /// <typeparam name="TData">The type of the data.</typeparam>
+    /// <remarks>
+    /// Provides serialization of data through a user-specified encoder, zookeeper based automatic broker discovery
+    /// and software load balancing through an optionally user-specified partitioner
+    /// </remarks>
+    public class Producer<TKey, TData> : ZooKeeperAwareKafkaClientBase, IProducer<TKey, TData>
+        where TKey : class 
+        where TData : class 
+    {
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);       
+        private static readonly Random Randomizer = new Random();
+        private readonly ProducerConfig config;
+        private readonly IProducerPool<TData> producerPool;
+        private readonly IPartitioner<TKey> partitioner;
+        private readonly bool populateProducerPool;
+        private readonly IBrokerPartitionInfo brokerPartitionInfo;
+        private volatile bool disposed;
+        private readonly object shuttingDownLock = new object();
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Producer&lt;TKey, TData&gt;"/> class.
+        /// </summary>
+        /// <param name="config">The config object.</param>
+        /// <param name="partitioner">The partitioner that implements <see cref="IPartitioner&lt;TKey&gt;" /> 
+        /// used to supply a custom partitioning strategy based on the message key.</param>
+        /// <param name="producerPool">Pool of producers, one per broker.</param>
+        /// <param name="populateProducerPool">if set to <c>true</c>, producers should be populated.</param>
+        /// <remarks>
+        /// Should be used for testing purpose only.
+        /// </remarks>
+        internal Producer(
+            ProducerConfig config,
+            IPartitioner<TKey> partitioner,
+            IProducerPool<TData> producerPool,
+            bool populateProducerPool = true)
+            : base(config)
+        {
+            Guard.Assert<ArgumentNullException>(() => config != null);
+            Guard.Assert<ArgumentNullException>(() => producerPool != null);
+            this.config = config;
+            this.partitioner = partitioner ?? new DefaultPartitioner<TKey>();
+            this.populateProducerPool = populateProducerPool;
+            this.producerPool = producerPool;
+            if (this.IsZooKeeperEnabled)
+            {
+                this.brokerPartitionInfo = new ZKBrokerPartitionInfo(this.config, this.Callback);
+            }
+            else
+            {
+                this.brokerPartitionInfo = new ConfigBrokerPartitionInfo(this.config);   
+            }
+
+            if (this.populateProducerPool)
+            {
+                IDictionary<int, Broker> allBrokers = this.brokerPartitionInfo.GetAllBrokerInfo();
+                foreach (var broker in allBrokers)
+                {
+                    this.producerPool.AddProducer(
+                        new Broker(broker.Key, broker.Value.Host, broker.Value.Host, broker.Value.Port));
+                }
+            }
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Producer&lt;TKey, TData&gt;"/> class.
+        /// </summary>
+        /// <param name="config">The config object.</param>
+        /// <remarks>
+        /// Can be used when all config parameters will be specified through the config object
+        /// and will be instantiated via reflection
+        /// </remarks>
+        public Producer(ProducerConfig config)
+            : this(
+                config, 
+                ReflectionHelper.Instantiate<IPartitioner<TKey>>(config.PartitionerClass),
+                ProducerPool<TData>.CreatePool(config, ReflectionHelper.Instantiate<IEncoder<TData>>(config.SerializerClass)))
+        {
+            Guard.Assert<ArgumentNullException>(() => config != null);
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Producer&lt;TKey, TData&gt;"/> class.
+        /// </summary>
+        /// <param name="config">The config object.</param>
+        /// <param name="partitioner">The partitioner that implements <see cref="IPartitioner&lt;TKey&gt;" /> 
+        /// used to supply a custom partitioning strategy based on the message key.</param>
+        /// <param name="encoder">The encoder that implements <see cref="IEncoder&lt;TData&gt;" /> 
+        /// used to convert an object of type TData to <see cref="Message" />.</param>
+        /// <param name="callbackHandler">The callback handler that implements <see cref="ICallbackHandler" />, used 
+        /// to supply callback invoked when sending asynchronous request is completed.</param>
+        /// <remarks>
+        /// Can be used to provide pre-instantiated objects for all config parameters
+        /// that would otherwise be instantiated via reflection.
+        /// </remarks>
+        public Producer(
+            ProducerConfig config,
+            IPartitioner<TKey> partitioner,
+            IEncoder<TData> encoder,
+            ICallbackHandler callbackHandler)
+            : this(
+                config, 
+                partitioner,
+                ProducerPool<TData>.CreatePool(config, encoder, callbackHandler))
+        {
+            Guard.Assert<ArgumentNullException>(() => config != null);
+            Guard.Assert<ArgumentNullException>(() => encoder != null);
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Producer&lt;TKey, TData&gt;"/> class.
+        /// </summary>
+        /// <param name="config">The config object.</param>
+        /// <param name="partitioner">The partitioner that implements <see cref="IPartitioner&lt;TKey&gt;" /> 
+        /// used to supply a custom partitioning strategy based on the message key.</param>
+        /// <param name="encoder">The encoder that implements <see cref="IEncoder&lt;TData&gt;" /> 
+        /// used to convert an object of type TData to <see cref="Message" />.</param>
+        /// <remarks>
+        /// Can be used to provide pre-instantiated objects for all config parameters
+        /// that would otherwise be instantiated via reflection.
+        /// </remarks>
+        public Producer(
+            ProducerConfig config,
+            IPartitioner<TKey> partitioner,
+            IEncoder<TData> encoder)
+            : this(
+                config, 
+                partitioner,
+                ProducerPool<TData>.CreatePool(config, encoder, null))
+        {
+            Guard.Assert<ArgumentNullException>(() => config != null);
+            Guard.Assert<ArgumentNullException>(() => encoder != null);
+        }
+
+        /// <summary>
+        /// Sends the data to a multiple topics, partitioned by key, using either the
+        /// synchronous or the asynchronous producer.
+        /// </summary>
+        /// <param name="data">The producer data objects that encapsulate the topic, key and message data.</param>
+        public void Send(IEnumerable<ProducerData<TKey, TData>> data)
+        {
+            Guard.Assert<ArgumentNullException>(() => data != null);
+            Guard.Assert<ArgumentException>(() => data.Count() > 0);
+            this.EnsuresNotDisposed();
+            var poolRequests = new List<ProducerPoolData<TData>>();
+            foreach (var dataItem in data)
+            {
+                Partition partition = this.GetPartition(dataItem);
+                var poolRequest = new ProducerPoolData<TData>(dataItem.Topic, partition, dataItem.Data);
+                poolRequests.Add(poolRequest);
+            }
+
+            this.producerPool.Send(poolRequests);
+        }
+
+        /// <summary>
+        /// Sends the data to a single topic, partitioned by key, using either the
+        /// synchronous or the asynchronous producer.
+        /// </summary>
+        /// <param name="data">The producer data object that encapsulates the topic, key and message data.</param>
+        public void Send(ProducerData<TKey, TData> data)
+        {
+            Guard.Assert<ArgumentNullException>(() => data != null);
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(data.Topic));
+            Guard.Assert<ArgumentNullException>(() => data.Data != null);
+            Guard.Assert<ArgumentException>(() => data.Data.Count() > 0);
+            this.EnsuresNotDisposed();
+            this.Send(new[] { data });
+        }
+
+        protected override void Dispose(bool disposing)
+        {
+            if (!disposing)
+            {
+                return;
+            }
+
+            if (this.disposed)
+            {
+                return;
+            }
+
+            lock (this.shuttingDownLock)
+            {
+                if (this.disposed)
+                {
+                    return;
+                }
+
+                this.disposed = true;
+            }
+
+            try
+            {
+                if (this.brokerPartitionInfo != null)
+                {
+                    this.brokerPartitionInfo.Dispose();
+                }
+            }
+            catch (Exception exc)
+            {
+                Logger.Warn("Ignoring unexpected errors on closing", exc);
+            }
+        }
+
+        /// <summary>
+        /// Callback to add a new producer to the producer pool.
+        /// Used by <see cref="ZKBrokerPartitionInfo" /> on registration of new broker in ZooKeeper
+        /// </summary>
+        /// <param name="bid">The broker Id.</param>
+        /// <param name="host">The broker host address.</param>
+        /// <param name="port">The broker port.</param>
+        private void Callback(int bid, string host, int port)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(host));
+            Guard.Assert<ArgumentOutOfRangeException>(() => port > 0);
+
+            if (this.populateProducerPool)
+            {
+                this.producerPool.AddProducer(new Broker(bid, host, host, port));
+            }
+            else
+            {
+                Logger.Debug("Skipping the callback since populating producers is off");
+            }
+        }
+
+        /// <summary>
+        /// Retrieves the partition id based on key using given partitioner or select random partition if key is null
+        /// </summary>
+        /// <param name="key">The partition key.</param>
+        /// <param name="numPartitions">The total number of available partitions.</param>
+        /// <returns>Partition Id</returns>
+        private int GetPartitionId(TKey key, int numPartitions)
+        {
+            Guard.Assert<ArgumentOutOfRangeException>(() => numPartitions > 0);
+            return key == null 
+                ? Randomizer.Next(numPartitions) 
+                : this.partitioner.Partition(key, numPartitions);
+        }
+
+        /// <summary>
+        /// Gets the partition for topic.
+        /// </summary>
+        /// <param name="dataItem">The producer data object that encapsulates the topic, key and message data.</param>
+        /// <returns>Partition for topic</returns>
+        private Partition GetPartition(ProducerData<TKey, TData> dataItem)
+        {
+            Logger.DebugFormat(
+                CultureInfo.CurrentCulture,
+                "Getting the number of broker partitions registered for topic: {0}",
+                dataItem.Topic);
+            SortedSet<Partition> brokerPartitions = this.brokerPartitionInfo.GetBrokerPartitionInfo(dataItem.Topic);
+            int totalNumPartitions = brokerPartitions.Count;
+            Logger.DebugFormat(
+                CultureInfo.CurrentCulture,
+                "Broker partitions registered for topic: {0} = {1}",
+                dataItem.Topic,
+                totalNumPartitions);
+            int partitionId = this.GetPartitionId(dataItem.Key, totalNumPartitions);
+            Partition brokerIdPartition = brokerPartitions.ToList()[partitionId];
+            Broker brokerInfo = this.brokerPartitionInfo.GetBrokerInfo(brokerIdPartition.BrokerId);
+            if (this.IsZooKeeperEnabled)
+            {
+                Logger.DebugFormat(
+                    CultureInfo.CurrentCulture,
+                    "Sending message to broker {0}:{1} on partition {2}",
+                    brokerInfo.Host,
+                    brokerInfo.Port,
+                    brokerIdPartition.PartId);
+                return new Partition(brokerIdPartition.BrokerId, brokerIdPartition.PartId);
+            }
+
+            Logger.DebugFormat(
+                CultureInfo.CurrentCulture,
+                "Sending message to broker {0}:{1} on a randomly chosen partition",
+                brokerInfo.Host,
+                brokerInfo.Port);
+            return new Partition(brokerIdPartition.BrokerId, ProducerRequest.RandomPartition);
+        }
+
+        /// <summary>
+        /// Ensures that object was not disposed
+        /// </summary>
+        private void EnsuresNotDisposed()
+        {
+            if (this.disposed)
+            {
+                throw new ObjectDisposedException(this.GetType().Name);
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerData.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerData.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerData.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerData.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers
+{
+    using System.Collections.Generic;
+
+    /// <summary>
+    /// Encapsulates data to be send on topic
+    /// </summary>
+    /// <typeparam name="TKey">
+    /// Type of partitioning key
+    /// </typeparam>
+    /// <typeparam name="TData">
+    /// Type of data
+    /// </typeparam>
+    public class ProducerData<TKey, TData>
+    {
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ProducerData{TKey,TData}"/> class.
+        /// </summary>
+        /// <param name="topic">
+        /// The topic.
+        /// </param>
+        /// <param name="key">
+        /// The partitioning key.
+        /// </param>
+        /// <param name="data">
+        /// The list of data to send on the same topic.
+        /// </param>
+        public ProducerData(string topic, TKey key, IEnumerable<TData> data)
+            : this(topic, data)
+        {
+            this.Key = key;
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ProducerData{TKey,TData}"/> class.
+        /// </summary>
+        /// <param name="topic">
+        /// The topic.
+        /// </param>
+        /// <param name="data">
+        /// The list of data to send on the same topic.
+        /// </param>
+        public ProducerData(string topic, IEnumerable<TData> data)
+        {
+            this.Topic = topic;
+            this.Data = data;
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ProducerData{TKey,TData}"/> class.
+        /// </summary>
+        /// <param name="topic">
+        /// The topic.
+        /// </param>
+        /// <param name="data">
+        /// The data to send on the topic.
+        /// </param>
+        public ProducerData(string topic, TData data)
+            : this(topic, new[] { data })
+        {
+        }
+
+        /// <summary>
+        /// Gets topic.
+        /// </summary>
+        public string Topic { get; private set; }
+
+        /// <summary>
+        /// Gets the partitioning key.
+        /// </summary>
+        public TKey Key { get; private set; }
+
+        /// <summary>
+        /// Gets the data.
+        /// </summary>
+        public IEnumerable<TData> Data { get; private set; }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers
+{
+    using System;
+    using System.Collections.Generic;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Cluster;
+    using Kafka.Client.Producers.Async;
+    using Kafka.Client.Producers.Sync;
+    using Kafka.Client.Serialization;
+    using Kafka.Client.Utils;
+
+    /// <summary>
+    /// The base for all classes that represents pool of producers used by high-level API
+    /// </summary>
+    /// <typeparam name="TData">The type of the data.</typeparam>
+    internal abstract class ProducerPool<TData> : IProducerPool<TData>
+        where TData : class 
+    {
+        /// <summary>
+        /// Factory method used to instantiating either, 
+        /// synchronous or asynchronous, producer pool based on configuration.
+        /// </summary>
+        /// <param name="config">
+        /// The producer pool configuration.
+        /// </param>
+        /// <param name="serializer">
+        /// The serializer.
+        /// </param>
+        /// <returns>
+        /// Instantiated either, synchronous or asynchronous, producer pool
+        /// </returns>
+        public static ProducerPool<TData> CreatePool(ProducerConfig config, IEncoder<TData> serializer)
+        {
+            if (config.ProducerType == ProducerTypes.Async)
+            {
+                return AsyncProducerPool<TData>.CreateAsyncPool(config, serializer);
+            }
+
+            if (config.ProducerType == ProducerTypes.Sync)
+            {
+                return SyncProducerPool<TData>.CreateSyncPool(config, serializer);
+            }
+
+            throw new InvalidOperationException("Not supported producer type " + config.ProducerType);
+        }
+
+        /// <summary>
+        /// Factory method used to instantiating either, 
+        /// synchronous or asynchronous, producer pool based on configuration.
+        /// </summary>
+        /// <param name="config">
+        /// The producer pool configuration.
+        /// </param>
+        /// <param name="serializer">
+        /// The serializer.
+        /// </param>
+        /// <param name="cbkHandler">
+        /// The callback invoked after new broker is added.
+        /// </param>
+        /// <returns>
+        /// Instantiated either, synchronous or asynchronous, producer pool
+        /// </returns>
+        public static ProducerPool<TData> CreatePool(
+            ProducerConfig config,
+            IEncoder<TData> serializer,
+            ICallbackHandler cbkHandler)
+        {
+            if (config.ProducerType == ProducerTypes.Async)
+            {
+                return AsyncProducerPool<TData>.CreateAsyncPool(config, serializer, cbkHandler);
+            }
+
+            if (config.ProducerType == ProducerTypes.Sync)
+            {
+                return SyncProducerPool<TData>.CreateSyncPool(config, serializer, cbkHandler);
+            }
+
+            throw new InvalidOperationException("Not supported producer type " + config.ProducerType);
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ProducerPool&lt;TData&gt;"/> class.
+        /// </summary>
+        /// <param name="config">The config.</param>
+        /// <param name="serializer">The serializer.</param>
+        /// <remarks>
+        /// Should be used for testing purpose only
+        /// </remarks>
+        protected ProducerPool(
+            ProducerConfig config,
+            IEncoder<TData> serializer)
+        {
+            Guard.Assert<ArgumentNullException>(() => config != null);
+            Guard.Assert<ArgumentNullException>(() => serializer != null);
+
+            this.Config = config;
+            this.Serializer = serializer;
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ProducerPool&lt;TData&gt;"/> class.
+        /// </summary>
+        /// <param name="config">
+        /// The config.
+        /// </param>
+        /// <param name="serializer">
+        /// The serializer.
+        /// </param>
+        /// <param name="callbackHandler">
+        /// The callback invoked after new broker is added.
+        /// </param>
+        protected ProducerPool(
+            ProducerConfig config,
+            IEncoder<TData> serializer,
+            ICallbackHandler callbackHandler)
+        {
+            Guard.Assert<ArgumentNullException>(() => config != null);
+            Guard.Assert<ArgumentNullException>(() => serializer != null);
+
+            this.Config = config;
+            this.Serializer = serializer;
+            this.CallbackHandler = callbackHandler;
+        }
+
+        protected ProducerConfig Config { get; private set; }
+
+        protected IEncoder<TData> Serializer { get; private set; }
+
+        protected ICallbackHandler CallbackHandler { get; private set; }
+
+        /// <summary>
+        /// Add a new producer, either synchronous or asynchronous, to the pool
+        /// </summary>
+        /// <param name="broker">The broker informations.</param>
+        public abstract void AddProducer(Broker broker);
+
+        /// <summary>
+        /// Selects either a synchronous or an asynchronous producer, for
+        /// the specified broker id and calls the send API on the selected
+        /// producer to publish the data to the specified broker partition.
+        /// </summary>
+        /// <param name="poolData">The producer pool request object.</param>
+        /// <remarks>
+        /// Used for single-topic request
+        /// </remarks>
+        public void Send(ProducerPoolData<TData> poolData)
+        {
+            Guard.Assert<ArgumentNullException>(() => poolData != null);
+            this.Send(new[] { poolData });
+        }
+
+        /// <summary>
+        /// Selects either a synchronous or an asynchronous producer, for
+        /// the specified broker id and calls the send API on the selected
+        /// producer to publish the data to the specified broker partition.
+        /// </summary>
+        /// <param name="poolData">The producer pool request object.</param>
+        /// <remarks>
+        /// Used for multi-topic request
+        /// </remarks>
+        public abstract void Send(IEnumerable<ProducerPoolData<TData>> poolData);
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPoolData.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPoolData.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPoolData.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPoolData.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers
+{
+    using System.Collections.Generic;
+    using Kafka.Client.Cluster;
+
+    /// <summary>
+    /// Encapsulates data to be send on chosen partition
+    /// </summary>
+    /// <typeparam name="TData">
+    /// Type of data
+    /// </typeparam>
+    internal class ProducerPoolData<TData>
+    {
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ProducerPoolData{TData}"/> class.
+        /// </summary>
+        /// <param name="topic">
+        /// The topic.
+        /// </param>
+        /// <param name="bidPid">
+        /// The chosen partition.
+        /// </param>
+        /// <param name="data">
+        /// The data.
+        /// </param>
+        public ProducerPoolData(string topic, Partition bidPid, IEnumerable<TData> data)
+        {
+            this.Topic = topic;
+            this.BidPid = bidPid;
+            this.Data = data;
+        }
+
+        /// <summary>
+        /// Gets the topic.
+        /// </summary>
+        public string Topic { get; private set; }
+
+        /// <summary>
+        /// Gets the chosen partition.
+        /// </summary>
+        public Partition BidPid { get; private set; }
+
+        /// <summary>
+        /// Gets the data.
+        /// </summary>
+        public IEnumerable<TData> Data { get; private set; }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerTypes.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerTypes.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerTypes.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerTypes.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers
+{
+    /// <summary>
+    /// Type of producer
+    /// </summary>
+    public enum ProducerTypes
+    {
+        Unknow = 0,
+        Sync = 1,
+        Async = 2
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers.Sync
+{
+    using System.Collections.Generic;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Requests;
+
+    /// <summary>
+    /// Sends messages encapsulated in request to Kafka server synchronously
+    /// </summary>
+    public interface ISyncProducer
+    {
+        /// <summary>
+        /// Constructs producer request and sends it to given broker partition synchronously
+        /// </summary>
+        /// <param name="topic">
+        /// The topic.
+        /// </param>
+        /// <param name="partition">
+        /// The partition.
+        /// </param>
+        /// <param name="messages">
+        /// The list of messages messages.
+        /// </param>
+        void Send(string topic, int partition, IEnumerable<Message> messages);
+
+        /// <summary>
+        /// Sends request to Kafka server synchronously
+        /// </summary>
+        /// <param name="request">
+        /// The request.
+        /// </param>
+        void Send(ProducerRequest request);
+
+        /// <summary>
+        /// Sends the data to a multiple topics on Kafka server synchronously
+        /// </summary>
+        /// <param name="requests">
+        /// The requests.
+        /// </param>
+        void MultiSend(IEnumerable<ProducerRequest> requests);
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers.Sync
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Requests;
+    using Kafka.Client.Utils;
+
+    /// <summary>
+    /// Sends messages encapsulated in request to Kafka server synchronously
+    /// </summary>
+    public class SyncProducer : ISyncProducer
+    {
+        private readonly SyncProducerConfig config;
+
+        /// <summary>
+        /// Gets producer config
+        /// </summary>
+        public SyncProducerConfig Config
+        {
+            get { return config; }
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="SyncProducer"/> class.
+        /// </summary>
+        /// <param name="config">
+        /// The producer config.
+        /// </param>
+        public SyncProducer(SyncProducerConfig config)
+        {
+            Guard.Assert<ArgumentNullException>(() => config != null);
+            this.config = config;
+        }
+
+        /// <summary>
+        /// Constructs producer request and sends it to given broker partition synchronously
+        /// </summary>
+        /// <param name="topic">
+        /// The topic.
+        /// </param>
+        /// <param name="partition">
+        /// The partition.
+        /// </param>
+        /// <param name="messages">
+        /// The list of messages messages.
+        /// </param>
+        public void Send(string topic, int partition, IEnumerable<Message> messages)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(topic));
+            Guard.Assert<ArgumentNullException>(() => messages != null);
+            Guard.Assert<ArgumentNullException>(
+                () => messages.All(x => x != null));
+            Guard.Assert<ArgumentOutOfRangeException>(
+                () => messages.All(
+                    x => x.PayloadSize <= this.Config.MaxMessageSize));
+            
+            this.Send(new ProducerRequest(topic, partition, messages));
+        }
+
+        /// <summary>
+        /// Sends request to Kafka server synchronously
+        /// </summary>
+        /// <param name="request">
+        /// The request.
+        /// </param>
+        public void Send(ProducerRequest request)
+        {
+            Guard.Assert<ArgumentNullException>(() => request != null);
+            using (var conn = new KafkaConnection(this.config.Host, this.config.Port))
+            {
+                conn.Write(request);
+            }
+        }
+
+        /// <summary>
+        /// Sends the data to a multiple topics on Kafka server synchronously
+        /// </summary>
+        /// <param name="requests">
+        /// The requests.
+        /// </param>
+        public void MultiSend(IEnumerable<ProducerRequest> requests)
+        {
+            Guard.Assert<ArgumentNullException>(() => requests != null);
+            Guard.Assert<ArgumentNullException>(
+                () => requests.All(
+                    x => x != null && x.MessageSet != null && x.MessageSet.Messages != null));
+            Guard.Assert<ArgumentNullException>(
+                () => requests.All(
+                    x => x.MessageSet.Messages.All(
+                        y => y != null && y.PayloadSize <= this.Config.MaxMessageSize)));
+
+            var multiRequest = new MultiProducerRequest(requests);
+            using (var conn = new KafkaConnection(this.config.Host, this.config.Port))
+            {
+                conn.Write(multiRequest);
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers.Sync
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Globalization;
+    using System.Linq;
+    using System.Reflection;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Cluster;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Producers.Async;
+    using Kafka.Client.Requests;
+    using Kafka.Client.Serialization;
+    using Kafka.Client.Utils;
+    using log4net;
+
+    /// <summary>
+    /// Pool of synchronous producers used by high-level API
+    /// </summary>
+    /// <typeparam name="TData">The type of the data.</typeparam>
+    internal class SyncProducerPool<TData> : ProducerPool<TData>
+        where TData : class 
+    {
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+        private readonly IDictionary<int, ISyncProducer> syncProducers;
+
+        /// <summary>
+        /// Factory method used to instantiating synchronous producer pool
+        /// </summary>
+        /// <param name="config">
+        /// The synchronous producer pool configuration.
+        /// </param>
+        /// <param name="serializer">
+        /// The serializer.
+        /// </param>
+        /// <returns>
+        /// Instantiated synchronous producer pool
+        /// </returns>
+        public static SyncProducerPool<TData> CreateSyncPool(ProducerConfig config, IEncoder<TData> serializer)
+        {
+            return new SyncProducerPool<TData>(config, serializer);
+        }
+
+        /// <summary>
+        /// Factory method used to instantiating synchronous producer pool
+        /// </summary>
+        /// <param name="config">
+        /// The synchronous producer pool configuration.
+        /// </param>
+        /// <param name="serializer">
+        /// The serializer.
+        /// </param>
+        /// <param name="callbackHandler">
+        /// The callback invoked after new broker is added.
+        /// </param>
+        /// <returns>
+        /// Instantiated synchronous producer pool
+        /// </returns>
+        public static SyncProducerPool<TData> CreateSyncPool(ProducerConfig config, IEncoder<TData> serializer, ICallbackHandler callbackHandler)
+        {
+            return new SyncProducerPool<TData>(config, serializer, callbackHandler);
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="SyncProducerPool{TData}"/> class. 
+        /// </summary>
+        /// <param name="config">
+        /// The synchronous producer pool configuration.
+        /// </param>
+        /// <param name="serializer">
+        /// The serializer.
+        /// </param>
+        /// <param name="syncProducers">
+        /// The list of synchronous producers.
+        /// </param>
+        /// <param name="cbkHandler">
+        /// The callback invoked after new broker is added.
+        /// </param>
+        /// <remarks>
+        /// Should be used for testing purpose only
+        /// </remarks>
+        private SyncProducerPool(
+            ProducerConfig config, 
+            IEncoder<TData> serializer,
+            IDictionary<int, ISyncProducer> syncProducers,
+            ICallbackHandler cbkHandler)
+            : base(config, serializer, cbkHandler)
+        {
+            this.syncProducers = syncProducers;
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="SyncProducerPool{TData}"/> class. 
+        /// </summary>
+        /// <param name="config">
+        /// The synchronous producer pool configuration.
+        /// </param>
+        /// <param name="serializer">
+        /// The serializer.
+        /// </param>
+        /// <param name="cbkHandler">
+        /// The callback invoked after new broker is added.
+        /// </param>
+        /// <remarks>
+        /// Should be used for testing purpose only
+        /// </remarks>
+        private SyncProducerPool(
+            ProducerConfig config,
+            IEncoder<TData> serializer,
+            ICallbackHandler cbkHandler)
+            : this(config, serializer, new Dictionary<int, ISyncProducer>(), cbkHandler)
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="SyncProducerPool{TData}"/> class. 
+        /// </summary>
+        /// <param name="config">
+        /// The synchronous producer pool configuration.
+        /// </param>
+        /// <param name="serializer">
+        /// The serializer.
+        /// </param>
+        private SyncProducerPool(ProducerConfig config, IEncoder<TData> serializer)
+            : this(
+                config,
+                serializer,
+                new Dictionary<int, ISyncProducer>(),
+                ReflectionHelper.Instantiate<ICallbackHandler>(config.CallbackHandler))
+        {
+        }
+
+        /// <summary>
+        /// Selects a synchronous producer, for
+        /// the specified broker id and calls the send API on the selected
+        /// producer to publish the data to the specified broker partition.
+        /// </summary>
+        /// <param name="poolData">The producer pool request object.</param>
+        /// <remarks>
+        /// Used for multi-topic request
+        /// </remarks>
+        public override void Send(IEnumerable<ProducerPoolData<TData>> poolData)
+        {
+            Guard.Assert<ArgumentNullException>(() => poolData != null);
+            Dictionary<int, List<ProducerPoolData<TData>>> distinctBrokers = poolData.GroupBy(
+                x => x.BidPid.BrokerId, x => x)
+                .ToDictionary(x => x.Key, x => x.ToList());
+            foreach (var broker in distinctBrokers)
+            {
+                Logger.DebugFormat(CultureInfo.CurrentCulture, "Fetching sync producer for broker id: {0}", broker.Key);
+                ISyncProducer producer = this.syncProducers[broker.Key];
+                IEnumerable<ProducerRequest> requests = broker.Value.Select(x => new ProducerRequest(
+                    x.Topic,
+                    x.BidPid.PartId,
+                    new BufferedMessageSet(x.Data.Select(y => this.Serializer.ToMessage(y)))));
+                Logger.DebugFormat(CultureInfo.CurrentCulture, "Sending message to broker {0}", broker.Key);
+                if (requests.Count() > 1)
+                {
+                    producer.MultiSend(requests);
+                }
+                else
+                {
+                    producer.Send(requests.First());
+                }
+            }
+        }
+
+        /// <summary>
+        /// Add a new synchronous producer to the pool
+        /// </summary>
+        /// <param name="broker">The broker informations.</param>
+        public override void AddProducer(Broker broker)
+        {
+            Guard.Assert<ArgumentNullException>(() => broker != null);
+            var syncConfig = new SyncProducerConfig
+            {
+                Host = broker.Host,
+                Port = broker.Port,
+                BufferSize = this.Config.BufferSize,
+                ConnectTimeout = this.Config.ConnectTimeout,
+                ReconnectInterval = this.Config.ReconnectInterval
+            };
+            var syncProducer = new SyncProducer(syncConfig);
+            Logger.InfoFormat(
+                CultureInfo.CurrentCulture,
+                "Creating sync producer for broker id = {0} at {1}:{2}",
+                broker.Id,
+                broker.Host,
+                broker.Port);
+            this.syncProducers.Add(broker.Id, syncProducer);
+        }
+    }
+}

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs Wed Sep 21 19:17:19 2011
@@ -1,36 +1,18 @@
-using System.Reflection;
-using System.Runtime.CompilerServices;
-using System.Runtime.InteropServices;
-
-// General Information about an assembly is controlled through the following 
-// set of attributes. Change these attribute values to modify the information
-// associated with an assembly.
-[assembly: AssemblyTitle("Kafka.Client")]
-[assembly: AssemblyDescription("")]
-[assembly: AssemblyConfiguration("")]
-[assembly: AssemblyCompany("Microsoft")]
-[assembly: AssemblyProduct("Kafka.Client")]
-[assembly: AssemblyCopyright("Copyright © Microsoft 2011")]
-[assembly: AssemblyTrademark("")]
-[assembly: AssemblyCulture("")]
-
-// Setting ComVisible to false makes the types in this assembly not visible 
-// to COM components.  If you need to access a type in this assembly from 
-// COM, set the ComVisible attribute to true on that type.
-[assembly: ComVisible(false)]
-
-// The following GUID is for the ID of the typelib if this project is exposed to COM
-[assembly: Guid("93d702e5-9998-49a8-8c16-5b04b3ba55c1")]
-
-// Version information for an assembly consists of the following four values:
-//
-//      Major Version
-//      Minor Version 
-//      Build Number
-//      Revision
-//
-// You can specify all the values or you can default the Build and Revision Numbers 
-// by using the '*' as shown below:
-// [assembly: AssemblyVersion("1.0.*")]
-[assembly: AssemblyVersion("1.0.0.0")]
-[assembly: AssemblyFileVersion("1.0.0.0")]
+using System;
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+[assembly: AssemblyTitle("Kafka.Client")]
+[assembly: AssemblyDescription(".NET Client for Kafka")]
+[assembly: AssemblyCompany("ExactTarget")]
+[assembly: AssemblyProduct("Kafka.Client")]
+[assembly: AssemblyCopyright("Copyright © ExactTarget 2011")]
+
+[assembly: ComVisible(false)]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
+[assembly: InternalsVisibleTo("Kafka.Client.Tests")]
+[assembly: InternalsVisibleTo("Kafka.Client.IntegrationTests")]
+[assembly: CLSCompliant(true)]
+

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/RequestContext.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/RequestContext.cs?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/RequestContext.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/RequestContext.cs Wed Sep 21 19:17:19 2011
@@ -1,36 +1,53 @@
-using System.Net.Sockets;
-
-namespace Kafka.Client
-{
-    /// <summary>
-    /// The context of a request made to Kafka.
-    /// </summary>
-    /// <typeparam name="T">
-    /// Must be of type <see cref="AbstractRequest"/> and represents the type of request
-    /// sent to Kafka.
-    /// </typeparam>
-    public class RequestContext<T> where T : AbstractRequest
-    {
-        /// <summary>
-        /// Initializes a new instance of the RequestContext class.
-        /// </summary>
-        /// <param name="networkStream">The network stream that sent the message.</param>
-        /// <param name="request">The request sent over the stream.</param>
-        public RequestContext(NetworkStream networkStream, T request)
-        {
-            NetworkStream = networkStream;
-            Request = request;
-        }
-
-        /// <summary>
-        /// Gets the <see cref="NetworkStream"/> instance of the request.
-        /// </summary>
-        public NetworkStream NetworkStream { get; private set; }
-
-        /// <summary>
-        /// Gets the <see cref="FetchRequest"/> or <see cref="ProducerRequest"/> object
-        /// associated with the <see cref="RequestContext"/>.
-        /// </summary>
-        public T Request { get; private set; }
-    }
-}
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client
+{
+    using System.Net.Sockets;
+    using Kafka.Client.Requests;
+
+    /// <summary>
+    /// The context of a request made to Kafka.
+    /// </summary>
+    /// <typeparam name="T">
+    /// Must be of type <see cref="AbstractRequest"/> and represents the type of request
+    /// sent to Kafka.
+    /// </typeparam>
+    public class RequestContext<T> where T : AbstractRequest
+    {
+        /// <summary>
+        /// Initializes a new instance of the RequestContext class.
+        /// </summary>
+        /// <param name="networkStream">The network stream that sent the message.</param>
+        /// <param name="request">The request sent over the stream.</param>
+        public RequestContext(NetworkStream networkStream, T request)
+        {
+            NetworkStream = networkStream;
+            Request = request;
+        }
+
+        /// <summary>
+        /// Gets the <see cref="NetworkStream"/> instance of the request.
+        /// </summary>
+        public NetworkStream NetworkStream { get; private set; }
+
+        /// <summary>
+        /// Gets the <see cref="FetchRequest"/> or <see cref="ProducerRequest"/> object
+        /// associated with the <see cref="RequestContext"/>.
+        /// </summary>
+        public T Request { get; private set; }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/AbstractRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/AbstractRequest.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/AbstractRequest.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/AbstractRequest.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Requests
+{
+    using System.IO;
+    using System.Text;
+
+    /// <summary>
+    /// Base request to make to Kafka.
+    /// </summary>
+    public abstract class AbstractRequest
+    {
+        public const string DefaultEncoding = "UTF-8";
+        public const byte DefaultRequestSizeSize = 4;
+        public const byte DefaultRequestIdSize = 2;
+        public const short DefaultTopicLengthIfNonePresent = 2;
+
+        /// <summary>
+        /// Gets or sets the topic to publish to.
+        /// </summary>
+        public string Topic { get; set; }
+
+        /// <summary>
+        /// Gets or sets the partition to publish to.
+        /// </summary>
+        public int Partition { get; set; }
+
+        public MemoryStream RequestBuffer { get; protected set; }
+
+        public abstract RequestTypes RequestType { get; }
+
+        protected short RequestTypeId
+        {
+            get
+            {
+                return (short)this.RequestType;
+            }
+        }
+
+        protected static short GetTopicLength(string topic, string encoding = DefaultEncoding)
+        {
+            Encoding encoder = Encoding.GetEncoding(encoding);
+            return string.IsNullOrEmpty(topic) ? DefaultTopicLengthIfNonePresent : (short)encoder.GetByteCount(topic);
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Requests
+{
+    using System;
+    using System.Globalization;
+    using System.IO;
+    using System.Text;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Serialization;
+    using Kafka.Client.Utils;
+
+    /// <summary>
+    /// Constructs a request to send to Kafka.
+    /// </summary>
+    public class FetchRequest : AbstractRequest, IWritable
+    {
+        /// <summary>
+        /// Maximum size.
+        /// </summary>
+        private static readonly int DefaultMaxSize = 1048576;
+        public const byte DefaultTopicSizeSize = 2;
+        public const byte DefaultPartitionSize = 4;
+        public const byte DefaultOffsetSize = 8;
+        public const byte DefaultMaxSizeSize = 4;
+        public const byte DefaultHeaderSize = DefaultRequestSizeSize + DefaultTopicSizeSize + DefaultPartitionSize + DefaultRequestIdSize + DefaultOffsetSize + DefaultMaxSizeSize;
+        public const byte DefaultHeaderAsPartOfMultirequestSize = DefaultTopicSizeSize + DefaultPartitionSize + DefaultOffsetSize + DefaultMaxSizeSize;
+
+        public static int GetRequestLength(string topic, string encoding = DefaultEncoding)
+        {
+            short topicLength = GetTopicLength(topic, encoding);
+            return topicLength + DefaultHeaderSize;
+        }
+
+        public static int GetRequestAsPartOfMultirequestLength(string topic, string encoding = DefaultEncoding)
+        {
+            short topicLength = GetTopicLength(topic, encoding);
+            return topicLength + DefaultHeaderAsPartOfMultirequestSize;
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the FetchRequest class.
+        /// </summary>
+        public FetchRequest()
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the FetchRequest class.
+        /// </summary>
+        /// <param name="topic">The topic to publish to.</param>
+        /// <param name="partition">The partition to publish to.</param>
+        /// <param name="offset">The offset in the topic/partition to retrieve from.</param>
+        public FetchRequest(string topic, int partition, long offset)
+            : this(topic, partition, offset, DefaultMaxSize)
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the FetchRequest class.
+        /// </summary>
+        /// <param name="topic">The topic to publish to.</param>
+        /// <param name="partition">The partition to publish to.</param>
+        /// <param name="offset">The offset in the topic/partition to retrieve from.</param>
+        /// <param name="maxSize">The maximum size.</param>
+        public FetchRequest(string topic, int partition, long offset, int maxSize)
+        {
+            Topic = topic;
+            Partition = partition;
+            Offset = offset;
+            MaxSize = maxSize;
+
+            int length = GetRequestLength(topic, DefaultEncoding);
+            this.RequestBuffer = new BoundedBuffer(length);
+            this.WriteTo(this.RequestBuffer);
+        }
+
+        /// <summary>
+        /// Gets or sets the offset to request.
+        /// </summary>
+        public long Offset { get; set; }
+
+        /// <summary>
+        /// Gets or sets the maximum size to pass in the request.
+        /// </summary>
+        public int MaxSize { get; set; }
+
+        public override RequestTypes RequestType
+        {
+            get
+            {
+                return RequestTypes.Fetch;
+            }
+        }
+
+        /// <summary>
+        /// Writes content into given stream
+        /// </summary>
+        /// <param name="output">
+        /// The output stream.
+        /// </param>
+        public void WriteTo(MemoryStream output)
+        {
+            Guard.Assert<ArgumentNullException>(() => output != null);
+
+            using (var writer = new KafkaBinaryWriter(output))
+            {
+                writer.Write(this.RequestBuffer.Capacity - DefaultRequestSizeSize);
+                writer.Write(this.RequestTypeId);
+                this.WriteTo(writer);
+            }
+        }
+
+        /// <summary>
+        /// Writes content into given writer
+        /// </summary>
+        /// <param name="writer">
+        /// The writer.
+        /// </param>
+        public void WriteTo(KafkaBinaryWriter writer)
+        {
+            Guard.Assert<ArgumentNullException>(() => writer != null);
+
+            writer.WriteTopic(this.Topic, DefaultEncoding);
+            writer.Write(this.Partition);
+            writer.Write(this.Offset);
+            writer.Write(this.MaxSize);
+        }
+
+        public override string ToString()
+        {
+            return String.Format(
+                CultureInfo.CurrentCulture,
+                "topic: {0}, part: {1}, offset: {2}, maxSize: {3}",
+                this.Topic,
+                this.Partition,
+                this.Offset,
+                this.MaxSize);
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Requests
+{
+    using System;
+    using System.Collections.Generic;
+    using System.IO;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Serialization;
+    using Kafka.Client.Utils;
+
+    /// <summary>
+    /// Constructs a multi-consumer request to send to Kafka.
+    /// </summary>
+    public class MultiFetchRequest : AbstractRequest, IWritable
+    {
+        public const byte DefaultNumberOfRequestsSize = 2;
+
+        public const byte DefaultHeaderSize =
+            DefaultRequestSizeSize + DefaultRequestIdSize + DefaultNumberOfRequestsSize;
+
+        public static int GetRequestLength(IList<FetchRequest> requests, string encoding = DefaultEncoding)
+        {
+            int requestsLength = 0;
+            foreach (var request in requests)
+            {
+                requestsLength += FetchRequest.GetRequestAsPartOfMultirequestLength(request.Topic, encoding);
+            }
+
+            return requestsLength + DefaultHeaderSize;
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the MultiFetchRequest class.
+        /// </summary>
+        /// <param name="requests">Requests to package up and batch.</param>
+        public MultiFetchRequest(IList<FetchRequest> requests)
+        {
+            Guard.Assert<ArgumentNullException>(() => requests != null);
+            ConsumerRequests = requests;
+            int length = GetRequestLength(requests, DefaultEncoding);
+            this.RequestBuffer = new BoundedBuffer(length);
+            this.WriteTo(this.RequestBuffer);
+        }
+
+        /// <summary>
+        /// Gets or sets the consumer requests to be batched into this multi-request.
+        /// </summary>
+        public IList<FetchRequest> ConsumerRequests { get; set; }
+
+        public override RequestTypes RequestType
+        {
+            get
+            {
+                return RequestTypes.MultiFetch;
+            }
+        }
+
+        /// <summary>
+        /// Writes content into given stream
+        /// </summary>
+        /// <param name="output">
+        /// The output stream.
+        /// </param>
+        public void WriteTo(MemoryStream output)
+        {
+            Guard.Assert<ArgumentNullException>(() => output != null);
+
+            using (var writer = new KafkaBinaryWriter(output))
+            {
+                writer.Write(this.RequestBuffer.Capacity - DefaultRequestSizeSize);
+                writer.Write(this.RequestTypeId);
+                writer.Write((short)this.ConsumerRequests.Count);
+                this.WriteTo(writer);
+            }
+        }
+
+        /// <summary>
+        /// Writes content into given writer
+        /// </summary>
+        /// <param name="writer">
+        /// The writer.
+        /// </param>
+        public void WriteTo(KafkaBinaryWriter writer)
+        {
+            Guard.Assert<ArgumentNullException>(() => writer != null);
+
+            foreach (var consumerRequest in ConsumerRequests)
+            {
+                consumerRequest.WriteTo(writer);
+            }
+        }
+    }
+}