You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/21 21:17:25 UTC

svn commit: r1173797 [2/10] - in /incubator/kafka/trunk/clients/csharp: ./ lib/StyleCop/ src/Kafka/ src/Kafka/Kafka.Client/ src/Kafka/Kafka.Client/Cfg/ src/Kafka/Kafka.Client/Cluster/ src/Kafka/Kafka.Client/Consumers/ src/Kafka/Kafka.Client/Exceptions/...

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIterator.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIterator.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIterator.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIterator.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+    using System;
+    using System.Collections;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Globalization;
+    using System.Reflection;
+    using Kafka.Client.Exceptions;
+    using Kafka.Client.Messages;
+    using log4net;
+
+    /// <summary>
+    /// An iterator that blocks until a value can be read from the supplied queue.
+    /// </summary>
+    /// <remarks>
+    /// The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
+    /// </remarks>
+    internal class ConsumerIterator : IEnumerator<Message>
+    {
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+        private readonly BlockingCollection<FetchedDataChunk> channel;
+        private readonly int consumerTimeoutMs;
+        private PartitionTopicInfo currentTopicInfo;
+        private ConsumerIteratorState state = ConsumerIteratorState.NotReady;
+        private IEnumerator<Message> current;
+        private Message nextItem;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ConsumerIterator"/> class.
+        /// </summary>
+        /// <param name="channel">
+        /// The queue containing 
+        /// </param>
+        /// <param name="consumerTimeoutMs">
+        /// The consumer timeout in ms.
+        /// </param>
+        public ConsumerIterator(BlockingCollection<FetchedDataChunk> channel, int consumerTimeoutMs)
+        {
+            this.channel = channel;
+            this.consumerTimeoutMs = consumerTimeoutMs;
+        }
+
+        /// <summary>
+        /// Gets the element in the collection at the current position of the enumerator.
+        /// </summary>
+        /// <returns>
+        /// The element in the collection at the current position of the enumerator.
+        /// </returns>
+        public Message Current
+        {
+            get
+            {
+                if (!MoveNext())
+                {
+                    throw new Exception("No element");
+                }
+
+                state = ConsumerIteratorState.NotReady;
+                if (nextItem != null)
+                {
+                    currentTopicInfo.Consumed(MessageSet.GetEntrySize(nextItem));
+                    return nextItem;
+                }
+
+                throw new Exception("Expected item but none found.");
+            }
+        }
+
+        /// <summary>
+        /// Gets the current element in the collection.
+        /// </summary>
+        /// <returns>
+        /// The current element in the collection.
+        /// </returns>
+        object IEnumerator.Current
+        {
+            get { return this.Current; }
+        }
+
+        /// <summary>
+        /// Advances the enumerator to the next element of the collection.
+        /// </summary>
+        /// <returns>
+        /// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection.
+        /// </returns>
+        public bool MoveNext()
+        {
+            if (state == ConsumerIteratorState.Failed)
+            {
+                throw new Exception("Iterator is in failed state");
+            }
+            
+            switch (state)
+            {
+                case ConsumerIteratorState.Done:
+                    return false;
+                case ConsumerIteratorState.Ready:
+                    return true;
+                default:
+                    return MaybeComputeNext();
+            }
+        }
+
+        /// <summary>
+        /// Resets the enumerator's state to NotReady.
+        /// </summary>
+        public void Reset()
+        {
+            state = ConsumerIteratorState.NotReady;
+        }
+
+        public void Dispose()
+        {
+        }
+
+        private bool MaybeComputeNext()
+        {
+            state = ConsumerIteratorState.Failed;
+            nextItem = this.MakeNext();
+            if (state == ConsumerIteratorState.Done)
+            {
+                return false;
+            }
+
+            state = ConsumerIteratorState.Ready;
+            return true;
+        }
+
+        private Message MakeNext()
+        {
+            if (current == null || !current.MoveNext())
+            {
+                FetchedDataChunk found;
+                if (consumerTimeoutMs < 0)
+                {
+                    found = this.channel.Take();
+                }
+                else
+                {
+                    bool done = channel.TryTake(out found, consumerTimeoutMs);
+                    if (!done)
+                    {
+                        Logger.Debug("Consumer iterator timing out...");
+                        throw new ConsumerTimeoutException();
+                    }
+                }
+
+                if (found.Equals(ZookeeperConsumerConnector.ShutdownCommand))
+                {
+                    Logger.Debug("Received the shutdown command");
+                    channel.Add(found);
+                    return this.AllDone();
+                }
+
+                currentTopicInfo = found.TopicInfo;
+                if (currentTopicInfo.GetConsumeOffset() != found.FetchOffset)
+                {
+                    Logger.ErrorFormat(
+                        CultureInfo.CurrentCulture,
+                        "consumed offset: {0} doesn't match fetch offset: {1} for {2}; consumer may lose data",
+                        currentTopicInfo.GetConsumeOffset(),
+                        found.FetchOffset,
+                        currentTopicInfo);
+                    currentTopicInfo.ResetConsumeOffset(found.FetchOffset);
+                }
+
+                current = found.Messages.Messages.GetEnumerator();
+                current.MoveNext();
+            }
+
+            return current.Current;
+        }
+
+        private Message AllDone()
+        {
+            this.state = ConsumerIteratorState.Done;
+            return null;
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIteratorState.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIteratorState.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIteratorState.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIteratorState.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+    internal enum ConsumerIteratorState
+    {
+        Done,
+        Ready,
+        NotReady,
+        Failed
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetchedDataChunk.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetchedDataChunk.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetchedDataChunk.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetchedDataChunk.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+    using System;
+    using Kafka.Client.Messages;
+
+    internal class FetchedDataChunk : IEquatable<FetchedDataChunk>
+    {
+        public BufferedMessageSet Messages { get; set; }
+
+        public PartitionTopicInfo TopicInfo { get; set; }
+
+        public long FetchOffset { get; set; }
+
+        public FetchedDataChunk(BufferedMessageSet messages, PartitionTopicInfo topicInfo, long fetchOffset)
+        {
+            this.Messages = messages;
+            this.TopicInfo = topicInfo;
+            this.FetchOffset = fetchOffset;
+        }
+
+        public override bool Equals(object obj)
+        {
+            FetchedDataChunk other = obj as FetchedDataChunk;
+            if (other == null)
+            {
+                return false;
+            }
+            else
+            {
+                return this.Equals(other);
+            }
+        }
+
+        public bool Equals(FetchedDataChunk other)
+        {
+            return this.Messages == other.Messages &&
+                    this.TopicInfo == other.TopicInfo &&
+                    this.FetchOffset == other.FetchOffset;
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Fetcher.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Fetcher.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Fetcher.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Fetcher.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Reflection;
+    using System.Threading;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Cluster;
+    using Kafka.Client.ZooKeeperIntegration;
+    using log4net;
+
+    /// <summary>
+    /// Background thread that fetches data from a set of servers
+    /// </summary>
+    internal class Fetcher : IDisposable
+    {
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+        private readonly ConsumerConfig config;
+        private readonly IZooKeeperClient zkClient;
+        private FetcherRunnable[] fetcherWorkerObjects;
+        private volatile bool disposed;
+        private readonly object shuttingDownLock = new object();
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Fetcher"/> class.
+        /// </summary>
+        /// <param name="config">
+        /// The consumer configuration.
+        /// </param>
+        /// <param name="zkClient">
+        /// The wrapper above ZooKeeper client.
+        /// </param>
+        public Fetcher(ConsumerConfig config, IZooKeeperClient zkClient)
+        {
+            this.config = config;
+            this.zkClient = zkClient;
+        }
+
+        /// <summary>
+        /// Shuts down all fetch threads
+        /// </summary>
+        private void Shutdown()
+        {
+            if (fetcherWorkerObjects != null)
+            {
+                foreach (FetcherRunnable fetcherRunnable in fetcherWorkerObjects)
+                {
+                    fetcherRunnable.Shutdown();
+                }
+
+                fetcherWorkerObjects = null;
+            }
+        }
+
+        /// <summary>
+        /// Opens connections to brokers.
+        /// </summary>
+        /// <param name="topicInfos">
+        /// The topic infos.
+        /// </param>
+        /// <param name="cluster">
+        /// The cluster.
+        /// </param>
+        /// <param name="queuesToBeCleared">
+        /// The queues to be cleared.
+        /// </param>
+        public void InitConnections(IEnumerable<PartitionTopicInfo> topicInfos, Cluster cluster, IEnumerable<BlockingCollection<FetchedDataChunk>> queuesToBeCleared)
+        {
+            this.EnsuresNotDisposed();
+            this.Shutdown();
+            if (topicInfos == null)
+            {
+                return;
+            }
+
+            foreach (var queueToBeCleared in queuesToBeCleared)
+            {
+                while (queueToBeCleared.Count > 0)
+                {
+                    queueToBeCleared.Take();
+                }
+            }
+
+            var partitionTopicInfoMap = new Dictionary<int, List<PartitionTopicInfo>>();
+
+            //// re-arrange by broker id
+            foreach (var topicInfo in topicInfos)
+            {
+                if (!partitionTopicInfoMap.ContainsKey(topicInfo.BrokerId))
+                {
+                    partitionTopicInfoMap.Add(topicInfo.BrokerId, new List<PartitionTopicInfo>() { topicInfo });
+                }
+                else
+                {
+                    partitionTopicInfoMap[topicInfo.BrokerId].Add(topicInfo);
+                } 
+            }
+
+            //// open a new fetcher thread for each broker
+            fetcherWorkerObjects = new FetcherRunnable[partitionTopicInfoMap.Count];
+            int i = 0;
+            foreach (KeyValuePair<int, List<PartitionTopicInfo>> item in partitionTopicInfoMap)
+            {
+                Broker broker = cluster.GetBroker(item.Key);
+                var fetcherRunnable = new FetcherRunnable("FetcherRunnable-" + i, zkClient, config, broker, item.Value);
+                var threadStart = new ThreadStart(fetcherRunnable.Run);
+                var fetcherThread = new Thread(threadStart);
+                fetcherWorkerObjects[i] = fetcherRunnable;
+                fetcherThread.Start();
+                i++;
+            }
+        }
+
+        public void Dispose()
+        {
+            if (this.disposed)
+            {
+                return;
+            }
+
+            lock (this.shuttingDownLock)
+            {
+                if (this.disposed)
+                {
+                    return;
+                }
+
+                this.disposed = true;
+            }
+
+            try
+            {
+                this.Shutdown();
+            }
+            catch (Exception exc)
+            {
+                Logger.Warn("Ignoring unexpected errors on closing", exc);
+            }
+        }
+
+        /// <summary>
+        /// Ensures that object was not disposed
+        /// </summary>
+        private void EnsuresNotDisposed()
+        {
+            if (this.disposed)
+            {
+                throw new ObjectDisposedException(this.GetType().Name);
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetcherRunnable.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetcherRunnable.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetcherRunnable.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetcherRunnable.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Globalization;
+    using System.Linq;
+    using System.Reflection;
+    using System.Threading;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Cluster;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Requests;
+    using Kafka.Client.Utils;
+    using Kafka.Client.ZooKeeperIntegration;
+    using log4net;
+
+    /// <summary>
+    /// Background thread worker class that is used to fetch data from a single broker
+    /// </summary>
+    internal class FetcherRunnable
+    {
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+        private readonly string name;
+
+        private readonly IZooKeeperClient zkClient;
+
+        private readonly ConsumerConfig config;
+
+        private readonly Broker broker;
+
+        private readonly IList<PartitionTopicInfo> partitionTopicInfos;
+
+        private readonly IConsumer simpleConsumer;
+
+        private bool shouldStop;
+
+        internal FetcherRunnable(string name, IZooKeeperClient zkClient, ConsumerConfig config, Broker broker, List<PartitionTopicInfo> partitionTopicInfos)
+        {
+            this.name = name;
+            this.zkClient = zkClient;
+            this.config = config;
+            this.broker = broker;
+            this.partitionTopicInfos = partitionTopicInfos;
+            this.simpleConsumer = new Consumer(this.config);
+        }
+
+        /// <summary>
+        /// Method to be used for starting a new thread
+        /// </summary>
+        internal void Run()
+        {
+            foreach (var partitionTopicInfo in partitionTopicInfos)
+            {
+                Logger.InfoFormat(
+                    CultureInfo.CurrentCulture,
+                    "{0} start fetching topic: {1} part: {2} offset: {3} from {4}:{5}",
+                    this.name,
+                    partitionTopicInfo.Topic,
+                    partitionTopicInfo.Partition.PartId,
+                    partitionTopicInfo.GetFetchOffset(),
+                    this.broker.Host,
+                    this.broker.Port);
+            }
+
+            try
+            {
+                while (!this.shouldStop)
+                {
+                    var requestList = new List<FetchRequest>();
+                    foreach (var partitionTopicInfo in this.partitionTopicInfos)
+                    {
+                        var singleRequest = new FetchRequest(partitionTopicInfo.Topic, partitionTopicInfo.Partition.PartId, partitionTopicInfo.GetFetchOffset(), this.config.FetchSize);
+                        requestList.Add(singleRequest);
+                    }
+
+                    Logger.Debug("Fetch request: " + string.Join(", ", requestList.Select(x => x.ToString())));
+                    var request = new MultiFetchRequest(requestList);
+                    var response = this.simpleConsumer.MultiFetch(request);
+                    int read = 0;
+                    var items = this.partitionTopicInfos.Zip(
+                        response,
+                        (x, y) =>
+                        new Tuple<PartitionTopicInfo, BufferedMessageSet>(x, y));
+                    foreach (Tuple<PartitionTopicInfo, BufferedMessageSet> item in items)
+                    {
+                        BufferedMessageSet messages = item.Item2;
+                        PartitionTopicInfo info = item.Item1;
+                        try
+                        {
+                            bool done = false;
+                            if (messages.ErrorCode == ErrorMapping.OffsetOutOfRangeCode)
+                            {
+                                Logger.InfoFormat(CultureInfo.CurrentCulture, "offset {0} out of range", info.GetFetchOffset());
+                                //// see if we can fix this error
+                                var resetOffset = this.ResetConsumerOffsets(info.Topic, info.Partition);
+                                if (resetOffset >= 0)
+                                {
+                                    info.ResetFetchOffset(resetOffset);
+                                    info.ResetConsumeOffset(resetOffset);
+                                    done = true;
+                                }
+                            }
+
+                            if (!done)
+                            {
+                                read += info.Add(messages, info.GetFetchOffset());
+                            }
+                        }
+                        catch (Exception ex)
+                        {
+                            if (!shouldStop)
+                            {
+                                Logger.ErrorFormat(CultureInfo.CurrentCulture, "error in FetcherRunnable for {0}" + info, ex);
+                            }
+
+                            throw;
+                        }
+                    }
+
+                    Logger.Info("Fetched bytes: " + read);
+                    if (read == 0)
+                    {
+                        Logger.DebugFormat(CultureInfo.CurrentCulture, "backing off {0} ms", this.config.BackOffIncrementMs);
+                        Thread.Sleep(this.config.BackOffIncrementMs);
+                    }
+                }
+            }
+            catch (Exception ex)
+            {
+                if (shouldStop)
+                {
+                    Logger.InfoFormat(CultureInfo.CurrentCulture, "FetcherRunnable {0} interrupted", this);
+                }
+                else
+                {
+                    Logger.ErrorFormat(CultureInfo.CurrentCulture, "error in FetcherRunnable {0}", ex);
+                }
+            }
+
+            Logger.InfoFormat(CultureInfo.CurrentCulture, "stopping fetcher {0} to host {1}", this.name, this.broker.Host);
+        }
+
+        internal void Shutdown()
+        {
+            this.shouldStop = true;
+        }
+
+        private long ResetConsumerOffsets(string topic, Partition partition)
+        {
+            long offset;
+            switch (this.config.AutoOffsetReset)
+            {
+                case OffsetRequest.SmallestTime:
+                    offset = OffsetRequest.EarliestTime;
+                    break;
+                case OffsetRequest.LargestTime:
+                    offset = OffsetRequest.LatestTime;
+                    break;
+                default:
+                    return -1;
+            }
+
+            var request = new OffsetRequest(topic, partition.PartId, offset, 1);
+            var offsets = this.simpleConsumer.GetOffsetsBefore(request);
+            var topicDirs = new ZKGroupTopicDirs(this.config.GroupId, topic);
+            Logger.InfoFormat(CultureInfo.CurrentCulture, "updating partition {0} with {1} offset {2}", partition.Name, offset == OffsetRequest.EarliestTime ? "earliest" : "latest", offsets[0]);
+            ZkUtils.UpdatePersistentPath(this.zkClient, topicDirs.ConsumerOffsetDir + "/" + partition.Name, offsets[0].ToString());
+
+            return offsets[0];
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+    using System.Collections.Generic;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Requests;
+
+    /// <summary>
+    /// The low-level API of consumer of Kafka messages
+    /// </summary>
+    /// <remarks>
+    /// Maintains a connection to a single broker and has a close correspondence 
+    /// to the network requests sent to the server.
+    /// </remarks>
+    public interface IConsumer
+    {
+        /// <summary>
+        /// Gets the server to which the connection is to be established.
+        /// </summary>
+        string Host { get; }
+
+        /// <summary>
+        /// Gets the port to which the connection is to be established.
+        /// </summary>
+        int Port { get; }
+
+        /// <summary>
+        /// Fetch a set of messages from a topic.
+        /// </summary>
+        /// <param name="request">
+        /// Specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
+        /// </param>
+        /// <returns>
+        /// A set of fetched messages.
+        /// </returns>
+        /// <remarks>
+        /// Offset is passed in on every request, allowing the user to maintain this metadata 
+        /// however they choose.
+        /// </remarks>
+        BufferedMessageSet Fetch(FetchRequest request);
+
+        /// <summary>
+        /// Combine multiple fetch requests in one call.
+        /// </summary>
+        /// <param name="request">
+        /// The list of fetch requests.
+        /// </param>
+        /// <returns>
+        /// A list of sets of fetched messages.
+        /// </returns>
+        /// <remarks>
+        /// Offset is passed in on every request, allowing the user to maintain this metadata 
+        /// however they choose.
+        /// </remarks>
+        IList<BufferedMessageSet> MultiFetch(MultiFetchRequest request);
+
+        /// <summary>
+        /// Gets a list of valid offsets (up to maxSize) before the given time.
+        /// </summary>
+        /// <param name="request">
+        /// The offset request.
+        /// </param>
+        /// <returns>
+        /// The list of offsets, in descending order.
+        /// </returns>
+        IList<long> GetOffsetsBefore(OffsetRequest request);
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumerConnector.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumerConnector.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumerConnector.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumerConnector.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+    using System;
+    using System.Collections.Generic;
+
+    /// <summary>
+    /// The consumer high-level API, that hides the details of brokers from the consumer 
+    /// It also maintains the state of what has been consumed. 
+    /// </summary>
+    public interface IConsumerConnector : IDisposable
+    {
+        /// <summary>
+        /// Creates a list of message streams for each topic.
+        /// </summary>
+        /// <param name="topicCountDict">
+        /// The map of topic on number of streams
+        /// </param>
+        /// <returns>
+        /// The list of <see cref="KafkaMessageStream"/>, which are iterators over topic.
+        /// </returns>
+        IDictionary<string, IList<KafkaMessageStream>> CreateMessageStreams(IDictionary<string, int> topicCountDict);
+
+        /// <summary>
+        /// Commits the offsets of all messages consumed so far.
+        /// </summary>
+        void CommitOffsets();
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/KafkaMessageStream.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/KafkaMessageStream.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/KafkaMessageStream.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/KafkaMessageStream.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+    using System.Collections;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using Kafka.Client.Messages;
+
+    /// <summary>
+    /// This class is a thread-safe IEnumerable of <see cref="Message"/> that can be enumerated to get messages.
+    /// </summary>
+    public class KafkaMessageStream : IEnumerable<Message>
+    {
+        private readonly BlockingCollection<FetchedDataChunk> queue;
+
+        private readonly int consumerTimeoutMs;
+
+        private readonly ConsumerIterator iterator;
+
+        internal KafkaMessageStream(BlockingCollection<FetchedDataChunk> queue, int consumerTimeoutMs)
+        {
+            this.consumerTimeoutMs = consumerTimeoutMs;
+            this.queue = queue;
+            this.iterator = new ConsumerIterator(this.queue, this.consumerTimeoutMs);
+        }
+
+        public IEnumerator<Message> GetEnumerator()
+        {
+            return this.iterator;
+        }
+
+        IEnumerator IEnumerable.GetEnumerator()
+        {
+            return this.GetEnumerator();
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/PartitionTopicInfo.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/PartitionTopicInfo.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/PartitionTopicInfo.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/PartitionTopicInfo.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+    using System.Collections.Concurrent;
+    using System.Globalization;
+    using System.Reflection;
+    using System.Threading;
+    using Kafka.Client.Cluster;
+    using Kafka.Client.Messages;
+    using log4net;
+
+    /// <summary>
+    /// Represents topic in brokers's partition.
+    /// </summary>
+    internal class PartitionTopicInfo
+    {
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+        private readonly object consumedOffsetLock = new object();
+
+        private readonly object fetchedOffsetLock = new object();
+
+        private readonly BlockingCollection<FetchedDataChunk> chunkQueue;
+
+        private long consumedOffset;
+
+        private long fetchedOffset;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="PartitionTopicInfo"/> class.
+        /// </summary>
+        /// <param name="topic">
+        /// The topic.
+        /// </param>
+        /// <param name="brokerId">
+        /// The broker ID.
+        /// </param>
+        /// <param name="partition">
+        /// The broker's partition.
+        /// </param>
+        /// <param name="chunkQueue">
+        /// The chunk queue.
+        /// </param>
+        /// <param name="consumedOffset">
+        /// The consumed offset value.
+        /// </param>
+        /// <param name="fetchedOffset">
+        /// The fetched offset value.
+        /// </param>
+        /// <param name="fetchSize">
+        /// The fetch size.
+        /// </param>
+        public PartitionTopicInfo(
+            string topic, 
+            int brokerId, 
+            Partition partition, 
+            BlockingCollection<FetchedDataChunk> chunkQueue, 
+            long consumedOffset, 
+            long fetchedOffset, 
+            int fetchSize)
+        {
+            this.Topic = topic;
+            this.Partition = partition;
+            this.chunkQueue = chunkQueue;
+            this.BrokerId = brokerId;
+            this.consumedOffset = consumedOffset;
+            this.fetchedOffset = fetchedOffset;
+            this.FetchSize = fetchSize;
+            if (Logger.IsDebugEnabled)
+            {
+                Logger.DebugFormat(
+                    CultureInfo.CurrentCulture, "initial consumer offset of {0} is {1}", this, consumedOffset);
+                Logger.DebugFormat(
+                    CultureInfo.CurrentCulture, "initial fetch offset of {0} is {1}", this, fetchedOffset);
+            }
+        }
+
+        /// <summary>
+        /// Gets broker ID.
+        /// </summary>
+        public int BrokerId { get; private set; }
+
+        /// <summary>
+        /// Gets the fetch size.
+        /// </summary>
+        public int FetchSize { get; private set; }
+
+        /// <summary>
+        /// Gets the partition.
+        /// </summary>
+        public Partition Partition { get; private set; }
+
+        /// <summary>
+        /// Gets the topic.
+        /// </summary>
+        public string Topic { get; private set; }
+
+        /// <summary>
+        /// Records the given number of bytes as having been consumed
+        /// </summary>
+        /// <param name="messageSize">
+        /// The message size.
+        /// </param>
+        public void Consumed(int messageSize)
+        {
+            long newOffset;
+            lock (this.consumedOffsetLock)
+            {
+                this.consumedOffset += messageSize;
+                newOffset = this.consumedOffset;
+            }
+
+            if (Logger.IsDebugEnabled)
+            {
+                Logger.DebugFormat(
+                    CultureInfo.CurrentCulture, "updated consume offset of {0} to {1}", this, newOffset);
+            }
+        }
+
+        public int Add(BufferedMessageSet messages, long fetchOffset)
+        {
+            int size = messages.SetSize;
+            if (size > 0)
+            {
+                long newOffset = Interlocked.Add(ref this.fetchedOffset, size);
+                Logger.Debug("Updated fetch offset of " + this + " to " + newOffset);
+                this.chunkQueue.Add(new FetchedDataChunk(messages, this, fetchOffset));
+            }
+
+            return size;
+        }
+
+        public long GetConsumeOffset()
+        {
+            lock (this.consumedOffsetLock)
+            {
+                return this.consumedOffset;
+            }
+        }
+
+        public long GetFetchOffset()
+        {
+            lock (this.fetchedOffsetLock)
+            {
+                return this.fetchedOffset;
+            }
+        }
+
+        public void ResetConsumeOffset(long newConsumeOffset)
+        {
+            lock (this.consumedOffsetLock)
+            {
+                this.consumedOffset = newConsumeOffset;
+            }
+
+            if (Logger.IsDebugEnabled)
+            {
+                Logger.DebugFormat(
+                    CultureInfo.CurrentCulture, "reset consume offset of {0} to {1}", this, newConsumeOffset);
+            }
+        }
+
+        public void ResetFetchOffset(long newFetchOffset)
+        {
+            lock (this.fetchedOffsetLock)
+            {
+                this.fetchedOffset = newFetchOffset;
+            }
+
+            if (Logger.IsDebugEnabled)
+            {
+                Logger.DebugFormat(
+                    CultureInfo.CurrentCulture, "reset fetch offset of {0} to {1}", this, newFetchOffset);
+            }
+        }
+
+        public override string ToString()
+        {
+            return this.Topic + ":" + this.Partition;
+        }
+    }
+}
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/TopicCount.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/TopicCount.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/TopicCount.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/TopicCount.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Globalization;
+    using System.Reflection;
+    using System.Text;
+    using System.Web.Script.Serialization;
+    using log4net;
+
+    internal class TopicCount
+    {
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+        private readonly IDictionary<string, int> topicCountMap;
+        private readonly string consumerIdString;
+
+        public TopicCount(string consumerIdString, IDictionary<string, int> topicCountMap)
+        {
+            this.topicCountMap = topicCountMap;
+            this.consumerIdString = consumerIdString;
+        }
+
+        public static TopicCount ConstructTopicCount(string consumerIdString, string json)
+        {
+            Dictionary<string, int> result = null;
+            var ser = new JavaScriptSerializer();
+            try
+            {
+                result = ser.Deserialize<Dictionary<string, int>>(json);
+            }
+            catch (Exception ex)
+            {
+                Logger.ErrorFormat(CultureInfo.CurrentCulture, "error parsing consumer json string {0}. {1}", json, ex);
+            }
+
+            return new TopicCount(consumerIdString, result);
+        }
+
+        public IDictionary<string, IList<string>> GetConsumerThreadIdsPerTopic()
+        {
+            var result = new Dictionary<string, IList<string>>();
+            foreach (KeyValuePair<string, int> item in topicCountMap)
+            {
+                var consumerSet = new List<string>();
+                for (int i = 0; i < item.Value; i++)
+                {
+                    consumerSet.Add(consumerIdString + "-" + i);
+                }
+
+                result.Add(item.Key, consumerSet);
+            }
+
+            return result;
+        }
+
+        public override bool Equals(object obj)
+        {
+            var o = obj as TopicCount;
+            if (o != null)
+            {
+                return this.consumerIdString == o.consumerIdString && this.topicCountMap == o.topicCountMap;
+            }
+
+            return false;
+        }
+
+        /*
+         return json of
+         { "topic1" : 4,
+           "topic2" : 4
+         }
+        */
+        public string ToJsonString()
+        {
+            var sb = new StringBuilder();
+            sb.Append("{ ");
+            int i = 0;
+            foreach (KeyValuePair<string, int> entry in this.topicCountMap)
+            {
+                if (i > 0)
+                {
+                    sb.Append(",");
+                }
+
+                sb.Append("\"" + entry.Key + "\": " + entry.Value);
+                i++;
+            }
+
+            sb.Append(" }");
+            return sb.ToString();
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ZookeeperConsumerConnector.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ZookeeperConsumerConnector.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ZookeeperConsumerConnector.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ZookeeperConsumerConnector.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,322 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Globalization;
+    using System.Reflection;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Cluster;
+    using Kafka.Client.Utils;
+    using Kafka.Client.ZooKeeperIntegration;
+    using Kafka.Client.ZooKeeperIntegration.Listeners;
+    using log4net;
+
+    /// <summary>
+    /// The consumer high-level API, that hides the details of brokers from the consumer. 
+    /// It also maintains the state of what has been consumed. 
+    /// </summary>
+    public class ZookeeperConsumerConnector : ZooKeeperAwareKafkaClientBase, IConsumerConnector
+    {
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+        
+        public static readonly int MaxNRetries = 4;
+        
+        internal static readonly FetchedDataChunk ShutdownCommand = new FetchedDataChunk(null, null, -1);
+
+        private readonly ConsumerConfig config;
+       
+        private IZooKeeperClient zkClient;
+       
+        private readonly object shuttingDownLock = new object();
+       
+        private readonly bool enableFetcher;
+        
+        private Fetcher fetcher;
+        
+        private readonly KafkaScheduler scheduler = new KafkaScheduler();
+        
+        private readonly IDictionary<string, IDictionary<Partition, PartitionTopicInfo>> topicRegistry = new ConcurrentDictionary<string, IDictionary<Partition, PartitionTopicInfo>>();
+        
+        private readonly IDictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>> queues = new Dictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>>();
+
+        private readonly object syncLock = new object();
+
+        private volatile bool disposed;
+
+        /// <summary>
+        /// Gets the consumer group ID.
+        /// </summary>
+        public string ConsumerGroup
+        {
+            get { return this.config.GroupId; }
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ZookeeperConsumerConnector"/> class.
+        /// </summary>
+        /// <param name="config">
+        /// The consumer configuration. At the minimum, need to specify the group ID 
+        /// of the consumer and the ZooKeeper connection string.
+        /// </param>
+        /// <param name="enableFetcher">
+        /// Indicates whether fetchers should be enabled
+        /// </param>
+        public ZookeeperConsumerConnector(ConsumerConfig config, bool enableFetcher)
+            : base(config)
+        {
+            this.config = config;
+            this.enableFetcher = enableFetcher;
+            this.ConnectZk();
+            this.CreateFetcher();
+
+            if (this.config.AutoCommit)
+            {
+                Logger.InfoFormat(CultureInfo.CurrentCulture, "starting auto committer every {0} ms", this.config.AutoCommitIntervalMs);
+                scheduler.ScheduleWithRate(this.AutoCommit, this.config.AutoCommitIntervalMs, this.config.AutoCommitIntervalMs);
+            }
+        }
+
+        /// <summary>
+        /// Commits the offsets of all messages consumed so far.
+        /// </summary>
+        public void CommitOffsets()
+        {
+            this.EnsuresNotDisposed();
+            if (this.zkClient == null)
+            {
+                return;
+            }
+
+            foreach (KeyValuePair<string, IDictionary<Partition, PartitionTopicInfo>> topic in topicRegistry)
+            {
+                var topicDirs = new ZKGroupTopicDirs(this.config.GroupId, topic.Key);
+                foreach (KeyValuePair<Partition, PartitionTopicInfo> partition in topic.Value)
+                {
+                    var newOffset = partition.Value.GetConsumeOffset();
+                    try
+                    {
+                        ZkUtils.UpdatePersistentPath(zkClient, topicDirs.ConsumerOffsetDir + "/" + partition.Value.Partition.Name, newOffset.ToString());
+                    }
+                    catch (Exception ex)
+                    {
+                        Logger.WarnFormat(CultureInfo.CurrentCulture, "exception during CommitOffsets: {0}", ex);
+                    }
+
+                    if (Logger.IsDebugEnabled)
+                    {
+                        Logger.DebugFormat(CultureInfo.CurrentCulture, "Commited offset {0} for topic {1}", newOffset, partition);
+                    }
+                }
+            }
+        }
+
+        public void AutoCommit()
+        {
+            this.EnsuresNotDisposed();
+            try
+            {
+                this.CommitOffsets();
+            }
+            catch (Exception ex)
+            {
+                Logger.ErrorFormat(CultureInfo.CurrentCulture, "exception during AutoCommit: {0}", ex);
+            }
+        }
+
+        protected override void Dispose(bool disposing)
+        {
+            if (!disposing)
+            {
+                return;
+            }
+
+            if (this.disposed)
+            {
+                return;
+            }
+
+            lock (this.shuttingDownLock)
+            {
+                if (this.disposed)
+                {
+                    return;
+                }
+
+                Logger.Info("ZookeeperConsumerConnector shutting down");
+                this.disposed = true;
+            }
+
+            try
+            {
+                if (this.scheduler != null)
+                {
+                    this.scheduler.Dispose();
+                }
+
+                if (this.fetcher != null)
+                {
+                    this.fetcher.Dispose();
+                }
+
+                this.SendShutdownToAllQueues();
+                if (this.zkClient != null)
+                {
+                    this.zkClient.Dispose();
+                }
+            }
+            catch (Exception exc)
+            {
+                Logger.Debug("Ignoring unexpected errors on shutting down", exc);
+            }
+
+            Logger.Info("ZookeeperConsumerConnector shut down completed");
+        }
+
+        /// <summary>
+        /// Creates a list of message streams for each topic.
+        /// </summary>
+        /// <param name="topicCountDict">
+        /// The map of topic on number of streams
+        /// </param>
+        /// <returns>
+        /// The list of <see cref="KafkaMessageStream"/>, which are iterators over topic.
+        /// </returns>
+        /// <remarks>
+        /// Explicitly triggers load balancing for this consumer
+        /// </remarks>
+        public IDictionary<string, IList<KafkaMessageStream>> CreateMessageStreams(IDictionary<string, int> topicCountDict)
+        {
+            this.EnsuresNotDisposed();
+            return this.Consume(topicCountDict);
+        }
+
+        private void ConnectZk()
+        {
+            Logger.InfoFormat(CultureInfo.CurrentCulture, "Connecting to zookeeper instance at {0}", this.config.ZkConnect);
+            this.zkClient = new ZooKeeperClient(this.config.ZkConnect, this.config.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer);
+            this.zkClient.Connect();
+        }
+
+        private void CreateFetcher()
+        {
+            if (this.enableFetcher)
+            {
+                this.fetcher = new Fetcher(this.config, this.zkClient);
+            }
+        }
+
+        private IDictionary<string, IList<KafkaMessageStream>> Consume(IDictionary<string, int> topicCountDict)
+        {
+            Logger.Debug("entering consume");
+
+            if (topicCountDict == null)
+            {
+                throw new ArgumentNullException();
+            }
+
+            var dirs = new ZKGroupDirs(this.config.GroupId);
+            var result = new Dictionary<string, IList<KafkaMessageStream>>();
+
+            string consumerUuid = Environment.MachineName + "-" + DateTime.Now.Millisecond;
+            string consumerIdString = this.config.GroupId + "_" + consumerUuid;
+            var topicCount = new TopicCount(consumerIdString, topicCountDict);
+
+            // listener to consumer and partition changes
+            var loadBalancerListener = new ZKRebalancerListener(
+                this.config, 
+                consumerIdString, 
+                this.topicRegistry, 
+                this.zkClient, 
+                this, 
+                queues, 
+                this.fetcher, 
+                this.syncLock);
+            this.RegisterConsumerInZk(dirs, consumerIdString, topicCount);
+            this.zkClient.Subscribe(dirs.ConsumerRegistryDir, loadBalancerListener);
+
+            //// create a queue per topic per consumer thread
+            var consumerThreadIdsPerTopicMap = topicCount.GetConsumerThreadIdsPerTopic();
+            foreach (var topic in consumerThreadIdsPerTopicMap.Keys)
+            {
+                var streamList = new List<KafkaMessageStream>();
+                foreach (string threadId in consumerThreadIdsPerTopicMap[topic])
+                {
+                    var stream = new BlockingCollection<FetchedDataChunk>(new ConcurrentQueue<FetchedDataChunk>());
+                    this.queues.Add(new Tuple<string, string>(topic, threadId), stream);
+                    streamList.Add(new KafkaMessageStream(stream, this.config.Timeout));
+                }
+
+                result.Add(topic, streamList);
+                Logger.DebugFormat(CultureInfo.CurrentCulture, "adding topic {0} and stream to map...", topic);
+
+                // register on broker partition path changes
+                string partitionPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic;
+                this.zkClient.MakeSurePersistentPathExists(partitionPath);
+                this.zkClient.Subscribe(partitionPath, loadBalancerListener);
+            }
+
+            //// register listener for session expired event
+            this.zkClient.Subscribe(new ZKSessionExpireListener(dirs, consumerIdString, topicCount, loadBalancerListener, this));
+
+            //// explicitly trigger load balancing for this consumer
+            lock (this.syncLock)
+            {
+                loadBalancerListener.SyncedRebalance();
+            }
+
+            return result;
+        }
+
+        private void SendShutdownToAllQueues()
+        {
+            foreach (var queue in this.queues)
+            {
+                Logger.Debug("Clearing up queue");
+                //// clear the queue
+                while (queue.Value.Count > 0)
+                {
+                    queue.Value.Take();
+                }
+
+                queue.Value.Add(ShutdownCommand);
+                Logger.Debug("Cleared queue and sent shutdown command");
+            }
+        }
+
+        internal void RegisterConsumerInZk(ZKGroupDirs dirs, string consumerIdString, TopicCount topicCount)
+        {
+            this.EnsuresNotDisposed();
+            Logger.InfoFormat(CultureInfo.CurrentCulture, "begin registering consumer {0} in ZK", consumerIdString);
+            ZkUtils.CreateEphemeralPathExpectConflict(this.zkClient, dirs.ConsumerRegistryDir + "/" + consumerIdString, topicCount.ToJsonString());
+            Logger.InfoFormat(CultureInfo.CurrentCulture, "end registering consumer {0} in ZK", consumerIdString);
+        }
+
+        /// <summary>
+        /// Ensures that object was not disposed
+        /// </summary>
+        private void EnsuresNotDisposed()
+        {
+            if (this.disposed)
+            {
+                throw new ObjectDisposedException(this.GetType().Name);
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ConsumerTimeoutException.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ConsumerTimeoutException.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ConsumerTimeoutException.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ConsumerTimeoutException.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,11 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Kafka.Client.Exceptions
+{
+    public class ConsumerTimeoutException : Exception
+    {
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/KafkaException.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/KafkaException.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/KafkaException.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/KafkaException.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Exceptions
+{
+    using System;
+
+    /// <summary>
+    /// A wrapping of an error code returned from Kafka.
+    /// </summary>
+    public class KafkaException : Exception
+    {
+        /// <summary>
+        /// No error occurred.
+        /// </summary>
+        public const int NoError = 0;
+
+        /// <summary>
+        /// The offset requested was out of range.
+        /// </summary>
+        public const int OffsetOutOfRangeCode = 1;
+
+        /// <summary>
+        /// The message was invalid.
+        /// </summary>
+        public const int InvalidMessageCode = 2;
+
+        /// <summary>
+        /// The wrong partition.
+        /// </summary>
+        public const int WrongPartitionCode = 3;
+
+        /// <summary>
+        /// Invalid message size.
+        /// </summary>
+        public const int InvalidRetchSizeCode = 4;
+
+        public KafkaException()
+        {
+            ErrorCode = NoError;
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the KafkaException class.
+        /// </summary>
+        /// <param name="errorCode">The error code generated by a request to Kafka.</param>
+        public KafkaException(int errorCode) : base(GetMessage(errorCode))
+        {
+            ErrorCode = errorCode;
+        }
+
+        /// <summary>
+        /// Gets the error code that was sent from Kafka.
+        /// </summary>
+        public int ErrorCode { get; private set; }
+
+        /// <summary>
+        /// Gets the message for the exception based on the Kafka error code.
+        /// </summary>
+        /// <param name="errorCode">The error code from Kafka.</param>
+        /// <returns>A string message representation </returns>
+        private static string GetMessage(int errorCode)
+        {
+            if (errorCode == OffsetOutOfRangeCode)
+            {
+                return "Offset out of range";
+            }
+            else if (errorCode == InvalidMessageCode)
+            {
+                return "Invalid message";
+            }
+            else if (errorCode == WrongPartitionCode)
+            {
+                return "Wrong partition";
+            }
+            else if (errorCode == InvalidRetchSizeCode)
+            {
+                return "Invalid message size";
+            }
+            else
+            {
+                return "Unknown error";
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/MessageSizeTooLargeException.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/MessageSizeTooLargeException.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/MessageSizeTooLargeException.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/MessageSizeTooLargeException.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,8 @@
+namespace Kafka.Client.Exceptions
+{
+    using System;
+
+    public class MessageSizeTooLargeException : Exception
+    {
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZKRebalancerException.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZKRebalancerException.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZKRebalancerException.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZKRebalancerException.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,19 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Kafka.Client.Exceptions
+{
+    public class ZKRebalancerException : Exception
+    {
+        public ZKRebalancerException()
+        {
+        }
+
+        public ZKRebalancerException(string message)
+            : base(message)
+        {
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperException.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperException.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperException.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperException.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,28 @@
+namespace Kafka.Client.Exceptions
+{
+    using System;
+    using System.Runtime.Serialization;
+
+    [Serializable]
+    public class ZooKeeperException : Exception
+    {
+        public ZooKeeperException()
+        {
+        }
+
+        public ZooKeeperException(string message)
+            : base(message)
+        {
+        }
+
+        public ZooKeeperException(string message, Exception exc)
+            : base(message, exc)
+        {
+        }
+
+        protected ZooKeeperException(SerializationInfo info, StreamingContext context)
+            : base(info, context)
+        {
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperTimeoutException.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperTimeoutException.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperTimeoutException.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperTimeoutException.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,17 @@
+namespace Kafka.Client.Exceptions
+{
+    using System;
+
+    public class ZooKeeperTimeoutException : Exception
+    {
+        public ZooKeeperTimeoutException()
+            : base("Unable to connect to zookeeper server within timeout: unknown value")
+        {
+        }
+
+        public ZooKeeperTimeoutException(int connectionTimeout)
+            : base("Unable to connect to zookeeper server within timeout: " + connectionTimeout)
+        {
+        }
+    }
+}

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj Wed Sep 21 19:17:19 2011
@@ -1,69 +1,230 @@
-<?xml version="1.0" encoding="utf-8"?>
-<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
-  <PropertyGroup>
-    <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
-    <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
-    <ProductVersion>8.0.30703</ProductVersion>
-    <SchemaVersion>2.0</SchemaVersion>
-    <ProjectGuid>{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}</ProjectGuid>
-    <OutputType>Library</OutputType>
-    <AppDesignerFolder>Properties</AppDesignerFolder>
-    <RootNamespace>Kafka.Client</RootNamespace>
-    <AssemblyName>Kafka.Client</AssemblyName>
-    <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
-    <FileAlignment>512</FileAlignment>
-  </PropertyGroup>
-  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
-    <DebugSymbols>true</DebugSymbols>
-    <DebugType>full</DebugType>
-    <Optimize>false</Optimize>
-    <OutputPath>bin\Debug\</OutputPath>
-    <DefineConstants>DEBUG;TRACE</DefineConstants>
-    <ErrorReport>prompt</ErrorReport>
-    <WarningLevel>4</WarningLevel>
-  </PropertyGroup>
-  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
-    <DebugType>pdbonly</DebugType>
-    <Optimize>true</Optimize>
-    <OutputPath>bin\Release\</OutputPath>
-    <DefineConstants>TRACE</DefineConstants>
-    <ErrorReport>prompt</ErrorReport>
-    <WarningLevel>4</WarningLevel>
-  </PropertyGroup>
-  <ItemGroup>
-    <Reference Include="System" />
-    <Reference Include="System.Core" />
-    <Reference Include="System.Xml.Linq" />
-    <Reference Include="System.Data.DataSetExtensions" />
-    <Reference Include="Microsoft.CSharp" />
-    <Reference Include="System.Data" />
-    <Reference Include="System.Xml" />
-  </ItemGroup>
-  <ItemGroup>
-    <Compile Include="AbstractRequest.cs" />
-    <Compile Include="Consumer.cs" />
-    <Compile Include="KafkaException.cs" />
-    <Compile Include="RequestContext.cs" />
-    <Compile Include="Request\FetchRequest.cs" />
-    <Compile Include="Request\MultiFetchRequest.cs" />
-    <Compile Include="Request\MultiProducerRequest.cs" />
-    <Compile Include="Request\OffsetRequest.cs" />
-    <Compile Include="Request\ProducerRequest.cs" />
-    <Compile Include="Util\Crc32.cs" />
-    <Compile Include="KafkaConnection.cs" />
-    <Compile Include="Message.cs" />
-    <Compile Include="Producer.cs" />
-    <Compile Include="Properties\AssemblyInfo.cs" />
-    <Compile Include="RequestType.cs" />
-    <Compile Include="Util\BitWorks.cs" />
-  </ItemGroup>
-  <ItemGroup />
-  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
-  <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 
-       Other similar extension points exist, see Microsoft.Common.targets.
-  <Target Name="BeforeBuild">
-  </Target>
-  <Target Name="AfterBuild">
-  </Target>
-  -->
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <PropertyGroup>
+    <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+    <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+    <ProductVersion>8.0.30703</ProductVersion>
+    <SchemaVersion>2.0</SchemaVersion>
+    <ProjectGuid>{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}</ProjectGuid>
+    <OutputType>Library</OutputType>
+    <AppDesignerFolder>Properties</AppDesignerFolder>
+    <RootNamespace>Kafka.Client</RootNamespace>
+    <AssemblyName>Kafka.Client</AssemblyName>
+    <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
+    <FileAlignment>512</FileAlignment>
+    <CodeContractsAssemblyMode>0</CodeContractsAssemblyMode>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+    <DebugSymbols>true</DebugSymbols>
+    <DebugType>full</DebugType>
+    <Optimize>false</Optimize>
+    <OutputPath>bin\Debug\</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+    <CodeContractsEnableRuntimeChecking>False</CodeContractsEnableRuntimeChecking>
+    <CodeContractsRuntimeOnlyPublicSurface>False</CodeContractsRuntimeOnlyPublicSurface>
+    <CodeContractsRuntimeThrowOnFailure>True</CodeContractsRuntimeThrowOnFailure>
+    <CodeContractsRuntimeCallSiteRequires>False</CodeContractsRuntimeCallSiteRequires>
+    <CodeContractsRuntimeSkipQuantifiers>False</CodeContractsRuntimeSkipQuantifiers>
+    <CodeContractsRunCodeAnalysis>False</CodeContractsRunCodeAnalysis>
+    <CodeContractsNonNullObligations>False</CodeContractsNonNullObligations>
+    <CodeContractsBoundsObligations>False</CodeContractsBoundsObligations>
+    <CodeContractsArithmeticObligations>False</CodeContractsArithmeticObligations>
+    <CodeContractsEnumObligations>False</CodeContractsEnumObligations>
+    <CodeContractsRedundantAssumptions>False</CodeContractsRedundantAssumptions>
+    <CodeContractsRunInBackground>True</CodeContractsRunInBackground>
+    <CodeContractsShowSquigglies>False</CodeContractsShowSquigglies>
+    <CodeContractsUseBaseLine>False</CodeContractsUseBaseLine>
+    <CodeContractsEmitXMLDocs>False</CodeContractsEmitXMLDocs>
+    <CodeContractsCustomRewriterAssembly />
+    <CodeContractsCustomRewriterClass />
+    <CodeContractsLibPaths />
+    <CodeContractsExtraRewriteOptions />
+    <CodeContractsExtraAnalysisOptions />
+    <CodeContractsBaseLineFile />
+    <CodeContractsCacheAnalysisResults>False</CodeContractsCacheAnalysisResults>
+    <CodeContractsRuntimeCheckingLevel>Full</CodeContractsRuntimeCheckingLevel>
+    <CodeContractsReferenceAssembly>%28none%29</CodeContractsReferenceAssembly>
+    <CodeContractsAnalysisWarningLevel>0</CodeContractsAnalysisWarningLevel>
+    <StyleCopTreatErrorsAsWarnings>true</StyleCopTreatErrorsAsWarnings>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+    <DebugType>pdbonly</DebugType>
+    <Optimize>true</Optimize>
+    <OutputPath>bin\Release\</OutputPath>
+    <DefineConstants>TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+    <StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Integration|AnyCPU'">
+    <DebugSymbols>true</DebugSymbols>
+    <OutputPath>bin\Integration\</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <DebugType>full</DebugType>
+    <PlatformTarget>AnyCPU</PlatformTarget>
+    <ErrorReport>prompt</ErrorReport>
+    <CodeAnalysisIgnoreBuiltInRuleSets>false</CodeAnalysisIgnoreBuiltInRuleSets>
+    <CodeAnalysisIgnoreBuiltInRules>true</CodeAnalysisIgnoreBuiltInRules>
+    <CodeAnalysisFailOnMissingRules>true</CodeAnalysisFailOnMissingRules>
+    <StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
+    <CodeContractsEnableRuntimeChecking>True</CodeContractsEnableRuntimeChecking>
+    <CodeContractsRuntimeOnlyPublicSurface>False</CodeContractsRuntimeOnlyPublicSurface>
+    <CodeContractsRuntimeThrowOnFailure>True</CodeContractsRuntimeThrowOnFailure>
+    <CodeContractsRuntimeCallSiteRequires>False</CodeContractsRuntimeCallSiteRequires>
+    <CodeContractsRuntimeSkipQuantifiers>False</CodeContractsRuntimeSkipQuantifiers>
+    <CodeContractsRunCodeAnalysis>False</CodeContractsRunCodeAnalysis>
+    <CodeContractsNonNullObligations>False</CodeContractsNonNullObligations>
+    <CodeContractsBoundsObligations>False</CodeContractsBoundsObligations>
+    <CodeContractsArithmeticObligations>False</CodeContractsArithmeticObligations>
+    <CodeContractsEnumObligations>False</CodeContractsEnumObligations>
+    <CodeContractsRedundantAssumptions>False</CodeContractsRedundantAssumptions>
+    <CodeContractsRunInBackground>True</CodeContractsRunInBackground>
+    <CodeContractsShowSquigglies>False</CodeContractsShowSquigglies>
+    <CodeContractsUseBaseLine>False</CodeContractsUseBaseLine>
+    <CodeContractsEmitXMLDocs>False</CodeContractsEmitXMLDocs>
+    <CodeContractsCustomRewriterAssembly />
+    <CodeContractsCustomRewriterClass />
+    <CodeContractsLibPaths />
+    <CodeContractsExtraRewriteOptions />
+    <CodeContractsExtraAnalysisOptions />
+    <CodeContractsBaseLineFile />
+    <CodeContractsCacheAnalysisResults>False</CodeContractsCacheAnalysisResults>
+    <CodeContractsRuntimeCheckingLevel>Full</CodeContractsRuntimeCheckingLevel>
+    <CodeContractsReferenceAssembly>%28none%29</CodeContractsReferenceAssembly>
+    <CodeContractsAnalysisWarningLevel>0</CodeContractsAnalysisWarningLevel>
+  </PropertyGroup>
+  <ItemGroup>
+    <Reference Include="log4net">
+      <HintPath>..\..\..\lib\log4Net\log4net.dll</HintPath>
+    </Reference>
+    <Reference Include="System" />
+    <Reference Include="System.Configuration" />
+    <Reference Include="System.Core" />
+    <Reference Include="Microsoft.CSharp" />
+    <Reference Include="System.Web.Extensions" />
+    <Reference Include="ZooKeeperNet">
+      <HintPath>..\..\..\lib\zookeeper\ZooKeeperNet.dll</HintPath>
+    </Reference>
+  </ItemGroup>
+  <ItemGroup>
+    <Compile Include="Cfg\AsyncProducerConfig.cs" />
+    <Compile Include="Cfg\BrokerPartitionInfo.cs" />
+    <Compile Include="Cfg\BrokerPartitionInfoCollection.cs" />
+    <Compile Include="Cfg\Consumer.cs" />
+    <Compile Include="Cfg\ConsumerConfig.cs" />
+    <Compile Include="Cfg\IAsyncProducerConfigShared.cs" />
+    <Compile Include="Cfg\ISyncProducerConfigShared.cs" />
+    <Compile Include="Cfg\KafkaClientConfiguration.cs" />
+    <Compile Include="Cfg\KafkaServer.cs" />
+    <Compile Include="Cfg\ProducerConfig.cs" />
+    <Compile Include="Cfg\ZooKeeperServers.cs" />
+    <Compile Include="Cluster\Cluster.cs" />
+    <Compile Include="Cluster\Partition.cs" />
+    <Compile Include="Consumers\Consumer.cs" />
+    <Compile Include="Consumers\ConsumerIterator.cs" />
+    <Compile Include="Consumers\ConsumerIteratorState.cs" />
+    <Compile Include="Consumers\FetchedDataChunk.cs" />
+    <Compile Include="Consumers\Fetcher.cs" />
+    <Compile Include="Consumers\FetcherRunnable.cs" />
+    <Compile Include="Consumers\IConsumer.cs" />
+    <Compile Include="Consumers\IConsumerConnector.cs" />
+    <Compile Include="Consumers\KafkaMessageStream.cs" />
+    <Compile Include="Consumers\PartitionTopicInfo.cs" />
+    <Compile Include="Consumers\TopicCount.cs" />
+    <Compile Include="Consumers\ZookeeperConsumerConnector.cs" />
+    <Compile Include="Exceptions\ConsumerTimeoutException.cs" />
+    <Compile Include="Exceptions\MessageSizeTooLargeException.cs" />
+    <Compile Include="Exceptions\ZKRebalancerException.cs" />
+    <Compile Include="Exceptions\ZooKeeperException.cs" />
+    <Compile Include="Exceptions\ZooKeeperTimeoutException.cs" />
+    <Compile Include="KafkaConnection.cs">
+      <SubType>Code</SubType>
+    </Compile>
+    <Compile Include="Exceptions\KafkaException.cs">
+      <SubType>Code</SubType>
+    </Compile>
+    <Compile Include="Messages\BoundedBuffer.cs" />
+    <Compile Include="Producers\Async\AsyncProducerPool.cs" />
+    <Compile Include="Producers\Async\MessageSent.cs" />
+    <Compile Include="Producers\Producer.StrMsg.cs" />
+    <Compile Include="Producers\Sync\SyncProducerPool.cs" />
+    <Compile Include="Serialization\StringEncoder.cs" />
+    <Compile Include="Serialization\IWritable.cs" />
+    <Compile Include="Producers\ProducerTypes.cs" />
+    <Compile Include="Cfg\SyncProducerConfig.cs" />
+    <Compile Include="Cfg\ZKConfig.cs" />
+    <Compile Include="Cluster\Broker.cs" />
+    <Compile Include="Producers\Partitioning\ConfigBrokerPartitionInfo.cs" />
+    <Compile Include="Producers\Partitioning\DefaultPartitioner.cs" />
+    <Compile Include="Producers\Partitioning\ZKBrokerPartitionInfo.cs" />
+    <Compile Include="KafkaClientBase.cs" />
+    <Compile Include="Serialization\DefaultEncoder.cs" />
+    <Compile Include="Serialization\KafkaBinaryReader.cs" />
+    <Compile Include="Serialization\KafkaBinaryWriter.cs" />
+    <Compile Include="Utils\ErrorMapping.cs" />
+    <Compile Include="Utils\Extensions.cs" />
+    <Compile Include="Utils\Guard.cs" />
+    <Compile Include="Utils\KafkaScheduler.cs" />
+    <Compile Include="Utils\ZKGroupDirs.cs" />
+    <Compile Include="Utils\ZKGroupTopicDirs.cs" />
+    <Compile Include="Utils\ZkUtils.cs" />
+    <Compile Include="ZooKeeperAwareKafkaClientBase.cs" />
+    <Compile Include="Messages\MessageSet.cs" />
+    <Compile Include="Messages\BufferedMessageSet.cs" />
+    <Compile Include="Producers\Async\AsyncProducer.cs" />
+    <Compile Include="Producers\Async\ICallbackHandler.cs" />
+    <Compile Include="Producers\Partitioning\IBrokerPartitionInfo.cs" />
+    <Compile Include="Producers\Partitioning\IPartitioner.cs" />
+    <Compile Include="Producers\Async\IAsyncProducer.cs" />
+    <Compile Include="Producers\IProducer.cs" />
+    <Compile Include="Producers\IProducerPool.cs" />
+    <Compile Include="Producers\Sync\ISyncProducer.cs" />
+    <Compile Include="Producers\ProducerPool.cs" />
+    <Compile Include="Producers\ProducerPoolData.cs" />
+    <Compile Include="Producers\Sync\SyncProducer.cs" />
+    <Compile Include="Requests\AbstractRequest.cs" />
+    <Compile Include="Messages\Message.cs" />
+    <Compile Include="Producers\ProducerData.cs" />
+    <Compile Include="RequestContext.cs" />
+    <Compile Include="Requests\FetchRequest.cs" />
+    <Compile Include="Requests\MultiFetchRequest.cs" />
+    <Compile Include="Requests\MultiProducerRequest.cs" />
+    <Compile Include="Requests\OffsetRequest.cs" />
+    <Compile Include="Requests\ProducerRequest.cs" />
+    <Compile Include="Serialization\IEncoder.cs" />
+    <Compile Include="Utils\Crc32Hasher.cs" />
+    <Compile Include="Producers\Producer.cs" />
+    <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="Requests\RequestTypes.cs" />
+    <Compile Include="Utils\BitWorks.cs" />
+    <Compile Include="Utils\ReflectionHelper.cs" />
+    <Compile Include="ZooKeeperIntegration\Listeners\BrokerTopicsListener.cs" />
+    <Compile Include="ZooKeeperIntegration\Events\ChildChangedEventItem.cs" />
+    <Compile Include="ZooKeeperIntegration\Events\DataChangedEventItem.cs" />
+    <Compile Include="ZooKeeperIntegration\Events\ZooKeeperChildChangedEventArgs.cs" />
+    <Compile Include="ZooKeeperIntegration\Events\ZooKeeperDataChangedEventArgs.cs" />
+    <Compile Include="ZooKeeperIntegration\Events\ZooKeeperEventTypes.cs" />
+    <Compile Include="ZooKeeperIntegration\Listeners\IZooKeeperDataListener.cs" />
+    <Compile Include="ZooKeeperIntegration\Listeners\IZooKeeperStateListener.cs" />
+    <Compile Include="ZooKeeperIntegration\Events\ZooKeeperEventArgs.cs" />
+    <Compile Include="ZooKeeperIntegration\Events\ZooKeeperSessionCreatedEventArgs.cs" />
+    <Compile Include="ZooKeeperIntegration\Events\ZooKeeperStateChangedEventArgs.cs" />
+    <Compile Include="ZooKeeperIntegration\Listeners\IZooKeeperChildListener.cs" />
+    <Compile Include="ZooKeeperIntegration\IZooKeeperConnection.cs" />
+    <Compile Include="ZooKeeperIntegration\IZooKeeperClient.cs" />
+    <Compile Include="ZooKeeperIntegration\Listeners\ZKRebalancerListener.cs" />
+    <Compile Include="ZooKeeperIntegration\Listeners\ZKSessionExpireListener.cs" />
+    <Compile Include="ZooKeeperIntegration\ZooKeeperConnection.cs" />
+    <Compile Include="ZooKeeperIntegration\ZooKeeperClient.cs" />
+    <Compile Include="ZooKeeperIntegration\ZooKeeperClient.Watcher.cs">
+      <DependentUpon>ZooKeeperClient.cs</DependentUpon>
+    </Compile>
+    <Compile Include="ZooKeeperIntegration\IZooKeeperSerializer.cs" />
+    <Compile Include="ZooKeeperIntegration\ZooKeeperStringSerializer.cs" />
+  </ItemGroup>
+  <ItemGroup />
+  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+  <Import Project="..\..\..\lib\StyleCop\Microsoft.StyleCop.Targets" />
 </Project>
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaClientBase.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaClientBase.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaClientBase.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaClientBase.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client
+{
+    using System;
+
+    /// <summary>
+    /// Base class for all Kafka clients
+    /// </summary>
+    public abstract class KafkaClientBase : IDisposable
+    {
+        /// <summary>
+        /// Releases all unmanaged and managed resources
+        /// </summary>
+        public void Dispose()
+        {
+            this.Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        /// <summary>
+        /// Releases all unmanaged and managed resources
+        /// </summary>
+        /// <param name="disposing">
+        /// Indicates whether release managed resources.
+        /// </param>
+        protected abstract void Dispose(bool disposing);
+    }
+}