You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/21 21:17:25 UTC

svn commit: r1173797 [5/10] - in /incubator/kafka/trunk/clients/csharp: ./ lib/StyleCop/ src/Kafka/ src/Kafka/Kafka.Client/ src/Kafka/Kafka.Client/Cfg/ src/Kafka/Kafka.Client/Cluster/ src/Kafka/Kafka.Client/Consumers/ src/Kafka/Kafka.Client/Exceptions/...

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Requests
+{
+    using System;
+    using System.Collections.Generic;
+    using System.IO;
+    using System.Linq;
+    using System.Text;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Serialization;
+    using Kafka.Client.Utils;
+
+    /// <summary>
+    /// Constructs a request containing multiple producer requests to send to Kafka.
+    /// </summary>
+    public class MultiProducerRequest : AbstractRequest, IWritable
+    {
+        public const byte DefaultRequestsCountSize = 2;
+
+        public static int GetBufferLength(IEnumerable<ProducerRequest> requests)
+        {
+            Guard.Assert<ArgumentNullException>(() => requests != null);
+
+            return DefaultRequestSizeSize 
+                + DefaultRequestIdSize 
+                + DefaultRequestsCountSize
+                + (int)requests.Sum(x => x.RequestBuffer.Length - DefaultRequestIdSize - DefaultRequestSizeSize);
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the MultiProducerRequest class.
+        /// </summary>
+        /// <param name="requests">
+        /// The list of individual producer requests to send in this request.
+        /// </param>
+        public MultiProducerRequest(IEnumerable<ProducerRequest> requests)
+        {
+            Guard.Assert<ArgumentNullException>(() => requests != null);
+            int length = GetBufferLength(requests);
+            ProducerRequests = requests;
+            this.RequestBuffer = new BoundedBuffer(length);
+            this.WriteTo(this.RequestBuffer);
+        }
+
+        /// <summary>
+        /// Gets or sets the list of producer requests to be sent in batch.
+        /// </summary>
+        public IEnumerable<ProducerRequest> ProducerRequests { get; set; }
+
+        public override RequestTypes RequestType
+        {
+            get
+            {
+                return RequestTypes.MultiProduce;
+            }
+        }
+
+        /// <summary>
+        /// Writes content into given stream
+        /// </summary>
+        /// <param name="output">
+        /// The output stream.
+        /// </param>
+        public void WriteTo(MemoryStream output)
+        {
+            Guard.Assert<ArgumentNullException>(() => output != null);
+
+            using (var writer = new KafkaBinaryWriter(output))
+            {
+                writer.Write(this.RequestBuffer.Capacity - DefaultRequestSizeSize);
+                writer.Write(this.RequestTypeId);
+                this.WriteTo(writer);
+            }
+        }
+
+        /// <summary>
+        /// Writes content into given writer
+        /// </summary>
+        /// <param name="writer">
+        /// The writer.
+        /// </param>
+        public void WriteTo(KafkaBinaryWriter writer)
+        {
+            Guard.Assert<ArgumentNullException>(() => writer != null);
+
+            writer.Write((short)this.ProducerRequests.Count());
+            foreach (var request in ProducerRequests)
+            {
+                request.WriteTo(writer);
+            }
+        }
+
+        public override string ToString()
+        {
+            using (var reader = new KafkaBinaryReader(this.RequestBuffer))
+            {
+                return ParseFrom(reader, (int)this.RequestBuffer.Length);
+            }
+        }
+
+        public static string ParseFrom(KafkaBinaryReader reader, int count)
+        {
+            Guard.Assert<ArgumentNullException>(() => reader != null);
+
+            var sb = new StringBuilder();
+            sb.Append("Request size: ");
+            sb.Append(reader.ReadInt32());
+            sb.Append(", RequestId: ");
+            short reqId = reader.ReadInt16();
+            sb.Append(reqId);
+            sb.Append("(");
+            sb.Append((RequestTypes)reqId);
+            sb.Append("), Single Requests: {");
+            int i = 1;
+            while (reader.BaseStream.Position != reader.BaseStream.Length)
+            {
+                sb.Append("Request ");
+                sb.Append(i);
+                sb.Append(" {");
+                int msgSize = 0;
+                sb.Append(ProducerRequest.ParseFrom(reader, msgSize));
+                sb.AppendLine("} ");
+                i++;
+            }
+
+            return sb.ToString();
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Requests
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Text;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Serialization;
+    using Kafka.Client.Utils;
+
+    /// <summary>
+    /// Constructs a request to send to Kafka to get the current offset for a given topic
+    /// </summary>
+    public class OffsetRequest : AbstractRequest, IWritable
+    {
+        /// <summary>
+        /// The latest time constant.
+        /// </summary>
+        public static readonly long LatestTime = -1L;
+
+        /// <summary>
+        /// The earliest time constant.
+        /// </summary>
+        public static readonly long EarliestTime = -2L;
+
+        public const string SmallestTime = "smallest";
+
+        public const string LargestTime = "largest";
+
+        public const byte DefaultTopicSizeSize = 2;
+        public const byte DefaultPartitionSize = 4;
+        public const byte DefaultTimeSize = 8;
+        public const byte DefaultMaxOffsetsSize = 4;
+        public const byte DefaultHeaderSize = DefaultRequestSizeSize + DefaultTopicSizeSize + DefaultPartitionSize + DefaultRequestIdSize + DefaultTimeSize + DefaultMaxOffsetsSize;
+
+        public static int GetRequestLength(string topic, string encoding = DefaultEncoding)
+        {
+            short topicLength = GetTopicLength(topic, encoding);
+            return topicLength + DefaultHeaderSize;
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the OffsetRequest class.
+        /// </summary>
+        public OffsetRequest()
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the OffsetRequest class.
+        /// </summary>
+        /// <param name="topic">The topic to publish to.</param>
+        /// <param name="partition">The partition to publish to.</param>
+        /// <param name="time">The time from which to request offsets.</param>
+        /// <param name="maxOffsets">The maximum amount of offsets to return.</param>
+        public OffsetRequest(string topic, int partition, long time, int maxOffsets)
+        {
+            Topic = topic;
+            Partition = partition;
+            Time = time;
+            MaxOffsets = maxOffsets;
+
+            int length = GetRequestLength(topic, DefaultEncoding);
+            this.RequestBuffer = new BoundedBuffer(length);
+            this.WriteTo(this.RequestBuffer);
+        }
+
+        /// <summary>
+        /// Gets the time.
+        /// </summary>
+        public long Time { get; private set; }
+
+        /// <summary>
+        /// Gets the maximum number of offsets to return.
+        /// </summary>
+        public int MaxOffsets { get; private set; }
+
+        public override RequestTypes RequestType
+        {
+            get
+            {
+                return RequestTypes.Offsets;
+            }
+        }
+
+        /// <summary>
+        /// Writes content into given stream
+        /// </summary>
+        /// <param name="output">
+        /// The output stream.
+        /// </param>
+        public void WriteTo(System.IO.MemoryStream output)
+        {
+            Guard.Assert<ArgumentNullException>(() => output != null);
+
+            using (var writer = new KafkaBinaryWriter(output))
+            {
+                writer.Write(this.RequestBuffer.Capacity - DefaultRequestSizeSize);
+                writer.Write(this.RequestTypeId);
+                this.WriteTo(writer);
+            }
+        }
+
+        /// <summary>
+        /// Writes content into given writer
+        /// </summary>
+        /// <param name="writer">
+        /// The writer.
+        /// </param>
+        public void WriteTo(KafkaBinaryWriter writer)
+        {
+            Guard.Assert<ArgumentNullException>(() => writer != null);
+
+            writer.WriteTopic(this.Topic, DefaultEncoding);
+            writer.Write(this.Partition);
+            writer.Write(this.Time);
+            writer.Write(this.MaxOffsets);
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Requests
+{
+    using System;
+    using System.Collections.Generic;
+    using System.IO;
+    using System.Text;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Serialization;
+    using Kafka.Client.Utils;
+
+    /// <summary>
+    /// Constructs a request to send to Kafka.
+    /// </summary>
+    public class ProducerRequest : AbstractRequest, IWritable
+    {
+        public const int RandomPartition = -1;
+        public const byte DefaultTopicSizeSize = 2;
+        public const byte DefaultPartitionSize = 4;
+        public const byte DefaultSetSizeSize = 4;
+        public const byte DefaultHeaderSize = DefaultRequestSizeSize + DefaultTopicSizeSize + DefaultPartitionSize + DefaultRequestIdSize + DefaultSetSizeSize;
+        public const short DefaultTopicLengthIfNonePresent = 2;
+
+        public static int GetRequestLength(string topic, int messegesSize, string encoding = DefaultEncoding)
+        {
+            short topicLength = GetTopicLength(topic, encoding);
+            return topicLength + DefaultHeaderSize + messegesSize;
+        }
+
+        public ProducerRequest(string topic, int partition, BufferedMessageSet messages)
+        {
+            Guard.Assert<ArgumentNullException>(() => messages != null);
+            int length = GetRequestLength(topic, messages.SetSize);
+            this.RequestBuffer = new BoundedBuffer(length);
+            this.Topic = topic;
+            this.Partition = partition;
+            this.MessageSet = messages;
+            this.WriteTo(this.RequestBuffer);
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the ProducerRequest class.
+        /// </summary>
+        /// <param name="topic">The topic to publish to.</param>
+        /// <param name="partition">The partition to publish to.</param>
+        /// <param name="messages">The list of messages to send.</param>
+        public ProducerRequest(string topic, int partition, IEnumerable<Message> messages)
+            : this(topic, partition, new BufferedMessageSet(messages))
+        {
+        }
+
+        public BufferedMessageSet MessageSet { get; private set; }
+
+        public override RequestTypes RequestType
+        {
+            get
+            {
+                return RequestTypes.Produce;
+            }
+        }
+
+        public int TotalSize
+        {
+            get
+            {
+                return (int)this.RequestBuffer.Length;
+            }
+        }
+
+        /// <summary>
+        /// Writes content into given stream
+        /// </summary>
+        /// <param name="output">
+        /// The output stream.
+        /// </param>
+        public void WriteTo(MemoryStream output)
+        {
+            Guard.Assert<ArgumentNullException>(() => output != null);
+            using (var writer = new KafkaBinaryWriter(output))
+            {
+                writer.Write(this.RequestBuffer.Capacity - DefaultRequestSizeSize);
+                writer.Write(this.RequestTypeId);
+                this.WriteTo(writer);
+            }
+        }
+
+        /// <summary>
+        /// Writes content into given writer
+        /// </summary>
+        /// <param name="writer">
+        /// The writer.
+        /// </param>
+        public void WriteTo(KafkaBinaryWriter writer)
+        {
+            Guard.Assert<ArgumentNullException>(() => writer != null);
+            writer.WriteTopic(this.Topic, DefaultEncoding);
+            writer.Write(this.Partition);
+            writer.Write(this.MessageSet.SetSize);
+            this.MessageSet.WriteTo(writer);
+        }
+
+        public override string ToString()
+        {
+            using (var reader = new KafkaBinaryReader(this.RequestBuffer))
+            {
+                return ParseFrom(reader, this.TotalSize);
+            }
+        }
+
+        public static string ParseFrom(KafkaBinaryReader reader, int count, bool skipReqInfo = false)
+        {
+            Guard.Assert<ArgumentNullException>(() => reader != null);
+            var sb = new StringBuilder();
+
+            if (!skipReqInfo)
+            {
+                sb.Append("Request size: ");
+                sb.Append(reader.ReadInt32());
+                sb.Append(", RequestId: ");
+                short reqId = reader.ReadInt16();
+                sb.Append(reqId);
+                sb.Append("(");
+                sb.Append((RequestTypes)reqId);
+                sb.Append(")");
+            }
+
+            sb.Append(", Topic: ");
+            string topic = reader.ReadTopic(DefaultEncoding);
+            sb.Append(topic);
+            sb.Append(", Partition: ");
+            sb.Append(reader.ReadInt32());
+            sb.Append(", Set size: ");
+            sb.Append(reader.ReadInt32());
+            int size = count - DefaultHeaderSize - GetTopicLength(topic);
+            sb.Append(", Set {");
+            sb.Append(BufferedMessageSet.ParseFrom(reader, size));
+            sb.Append("}");
+            return sb.ToString();
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/RequestTypes.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/RequestTypes.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/RequestTypes.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/RequestTypes.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Requests
+{
+    /// <summary>
+    /// Requests types for Kafka
+    /// </summary>
+    /// <remarks>
+    /// Many of these are not in play yet.
+    /// </remarks>
+    public enum RequestTypes : short 
+    {
+        /// <summary>
+        /// Produce a message.
+        /// </summary>
+        Produce = 0,
+
+        /// <summary>
+        /// Fetch a message.
+        /// </summary>
+        Fetch = 1,
+
+        /// <summary>
+        /// Multi-fetch messages.
+        /// </summary>
+        MultiFetch = 2,
+        
+        /// <summary>
+        /// Multi-produce messages.
+        /// </summary>
+        MultiProduce = 3,
+
+        /// <summary>
+        /// Gets offsets.
+        /// </summary>
+        Offsets = 4
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/DefaultEncoder.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/DefaultEncoder.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/DefaultEncoder.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/DefaultEncoder.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Serialization
+{
+    using Kafka.Client.Messages;
+
+    /// <summary>
+    /// Default serializer that expects <see cref="Message" /> object
+    /// </summary>
+    public class DefaultEncoder : IEncoder<Message>
+    {
+        /// <summary>
+        /// Do nothing with data
+        /// </summary>
+        /// <param name="data">
+        /// The data, that are already in <see cref="Message" /> format.
+        /// </param>
+        /// <returns>
+        /// Serialized data
+        /// </returns>
+        public Message ToMessage(Message data)
+        {
+            return data;
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IEncoder.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IEncoder.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IEncoder.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IEncoder.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Serialization
+{
+    using Kafka.Client.Messages;
+
+    /// <summary>
+    /// User-defined serializer to <see cref="Message" /> format
+    /// </summary>
+    /// <typeparam name="TData">
+    /// Type od data
+    /// </typeparam>
+    public interface IEncoder<TData>
+    {
+        /// <summary>
+        /// Serializes given data to <see cref="Message" /> format
+        /// </summary>
+        /// <param name="data">
+        /// The data to serialize.
+        /// </param>
+        /// <returns>
+        /// Serialized data
+        /// </returns>
+        Message ToMessage(TData data);
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IWritable.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IWritable.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IWritable.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IWritable.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Serialization
+{
+    using System.IO;
+
+    /// <summary>
+    /// Writes content into given stream
+    /// </summary>
+    internal interface IWritable
+    {
+        /// <summary>
+        /// Writes content into given stream
+        /// </summary>
+        /// <param name="output">
+        /// The output stream.
+        /// </param>
+        void WriteTo(MemoryStream output);
+
+        /// <summary>
+        /// Writes content into given writer
+        /// </summary>
+        /// <param name="writer">
+        /// The writer.
+        /// </param>
+        void WriteTo(KafkaBinaryWriter writer);
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Serialization
+{
+    using System.IO;
+    using System.Net;
+    using System.Text;
+
+    /// <summary>
+    /// Reads data from underlying stream using big endian bytes order for primitive types
+    /// and UTF-8 encoding for strings.
+    /// </summary>
+    public class KafkaBinaryReader : BinaryReader
+    {
+        /// <summary>
+        /// Initializes a new instance of the <see cref="KafkaBinaryReader"/> class
+        /// using big endian bytes order for primive types and UTF-8 encoding for strings.
+        /// </summary>
+        /// <param name="input">
+        /// The input stream.
+        /// </param>
+        public KafkaBinaryReader(Stream input)
+            : base(input)
+        { 
+        }
+
+        /// <summary>
+        /// Resets position pointer.
+        /// </summary>
+        /// <param name="disposing">
+        /// Not used
+        /// </param>
+        protected override void Dispose(bool disposing)
+        {
+            this.BaseStream.Position = 0;
+        }
+
+        /// <summary>
+        /// Reads two-bytes signed integer from the current stream using big endian bytes order 
+        /// and advances the stream position by two bytes
+        /// </summary>
+        /// <returns>
+        /// The two-byte signed integer read from the current stream.
+        /// </returns>
+        public override short ReadInt16()
+        {
+            short value = base.ReadInt16();
+            short currentOrdered = IPAddress.NetworkToHostOrder(value);
+            return currentOrdered;
+        }
+
+        /// <summary>
+        /// Reads four-bytes signed integer from the current stream using big endian bytes order 
+        /// and advances the stream position by four bytes
+        /// </summary>
+        /// <returns>
+        /// The four-byte signed integer read from the current stream.
+        /// </returns>
+        public override int ReadInt32()
+        {
+            int value = base.ReadInt32();
+            int currentOrdered = IPAddress.NetworkToHostOrder(value);
+            return currentOrdered;
+        }
+
+        /// <summary>
+        /// Reads eight-bytes signed integer from the current stream using big endian bytes order 
+        /// and advances the stream position by eight bytes
+        /// </summary>
+        /// <returns>
+        /// The eight-byte signed integer read from the current stream.
+        /// </returns>
+        public override long ReadInt64()
+        {
+            long value = base.ReadInt64();
+            long currentOrdered = IPAddress.NetworkToHostOrder(value);
+            return currentOrdered;
+        }
+
+        /// <summary>
+        /// Reads four-bytes signed integer from the current stream using big endian bytes order 
+        /// and advances the stream position by four bytes
+        /// </summary>
+        /// <returns>
+        /// The four-byte signed integer read from the current stream.
+        /// </returns>
+        public override int Read()
+        {
+            int value = base.Read();
+            int currentOrdered = IPAddress.NetworkToHostOrder(value);
+            return currentOrdered;
+        }
+
+        /// <summary>
+        /// Reads fixed-length topic from underlying stream using given encoding.
+        /// </summary>
+        /// <param name="encoding">
+        /// The encoding to use.
+        /// </param>
+        /// <returns>
+        /// The read topic.
+        /// </returns>
+        public string ReadTopic(string encoding)
+        {
+            short length = this.ReadInt16();
+            if (length == -1)
+            {
+                return null;
+            }
+
+            var bytes = this.ReadBytes(length);
+            Encoding encoder = Encoding.GetEncoding(encoding);
+            return encoder.GetString(bytes);
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryWriter.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryWriter.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryWriter.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryWriter.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Serialization
+{
+    using System.IO;
+    using System.Net;
+    using System.Text;
+
+    /// <summary>
+    /// Writes data into underlying stream using big endian bytes order for primitive types
+    /// and UTF-8 encoding for strings.
+    /// </summary>
+    public class KafkaBinaryWriter : BinaryWriter
+    {
+        /// <summary>
+        /// Initializes a new instance of the <see cref="KafkaBinaryWriter"/> class 
+        /// using big endian bytes order for primive types and UTF-8 encoding for strings.
+        /// </summary>
+        protected KafkaBinaryWriter()
+        {  
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="KafkaBinaryWriter"/> class 
+        /// using big endian bytes order for primive types and UTF-8 encoding for strings.
+        /// </summary>
+        /// <param name="output">
+        /// The output stream.
+        /// </param>
+        public KafkaBinaryWriter(Stream output)
+            : base(output)
+        {
+        }
+  
+        /// <summary>
+        /// Flushes data into stream and resets position pointer.
+        /// </summary>
+        /// <param name="disposing">
+        /// Not used
+        /// </param>
+        protected override void Dispose(bool disposing)
+        {
+            this.Flush();
+            this.OutStream.Position = 0;
+        }
+
+        /// <summary>
+        /// Writes four-bytes signed integer to the current stream using big endian bytes order 
+        /// and advances the stream position by four bytes
+        /// </summary>
+        /// <param name="value">
+        /// The value to write.
+        /// </param>
+        public override void Write(int value)
+        {
+            int bigOrdered = IPAddress.HostToNetworkOrder(value);
+            base.Write(bigOrdered);
+        }
+
+        /// <summary>
+        /// Writes eight-bytes signed integer to the current stream using big endian bytes order 
+        /// and advances the stream position by eight bytes
+        /// </summary>
+        /// <param name="value">
+        /// The value to write.
+        /// </param>
+        public override void Write(long value)
+        {
+            long bigOrdered = IPAddress.HostToNetworkOrder(value);
+            base.Write(bigOrdered);
+        }
+
+        /// <summary>
+        /// Writes two-bytes signed integer to the current stream using big endian bytes order 
+        /// and advances the stream position by two bytes
+        /// </summary>
+        /// <param name="value">
+        /// The value to write.
+        /// </param>
+        public override void Write(short value)
+        {
+            short bigOrdered = IPAddress.HostToNetworkOrder(value);
+            base.Write(bigOrdered);
+        }
+
+        /// <summary>
+        /// Writes topic and his size into underlying stream using given encoding.
+        /// </summary>
+        /// <param name="topic">
+        /// The topic to write.
+        /// </param>
+        /// <param name="encoding">
+        /// The encoding to use.
+        /// </param>
+        public void WriteTopic(string topic, string encoding)
+        {
+            if (string.IsNullOrEmpty(topic))
+            {
+                short defaultTopic = -1;
+                this.Write(defaultTopic);
+            }
+            else
+            {
+                var length = (short)topic.Length;
+                this.Write(length);
+                Encoding encoder = Encoding.GetEncoding(encoding);
+                byte[] encodedTopic = encoder.GetBytes(topic);
+                this.Write(encodedTopic);
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/StringEncoder.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/StringEncoder.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/StringEncoder.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/StringEncoder.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Serialization
+{
+    using System.Text;
+    using Kafka.Client.Messages;
+
+    /// <summary>
+    /// Serializes data to <see cref="Message" /> format using UTF-8 encoding
+    /// </summary>
+    public class StringEncoder : IEncoder<string>
+    {
+        /// <summary>
+        /// Serializes given data to <see cref="Message" /> format using UTF-8 encoding
+        /// </summary>
+        /// <param name="data">
+        /// The data to serialize.
+        /// </param>
+        /// <returns>
+        /// Serialized data
+        /// </returns>
+        public Message ToMessage(string data)
+        {
+            byte[] encodedData = Encoding.UTF8.GetBytes(data);
+            return new Message(encodedData);
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/BitWorks.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/BitWorks.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/BitWorks.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/BitWorks.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Utils
+{
+    using System;
+
+    /// <summary>
+    /// Utilty class for managing bits and bytes.
+    /// </summary>
+    internal class BitWorks
+    {
+        /// <summary>
+        /// Converts the value to bytes and reverses them.
+        /// </summary>
+        /// <param name="value">The value to convert to bytes.</param>
+        /// <returns>Bytes representing the value.</returns>
+        public static byte[] GetBytesReversed(short value)
+        {
+            return ReverseBytes(BitConverter.GetBytes(value));
+        }
+
+        /// <summary>
+        /// Converts the value to bytes and reverses them.
+        /// </summary>
+        /// <param name="value">The value to convert to bytes.</param>
+        /// <returns>Bytes representing the value.</returns>
+        public static byte[] GetBytesReversed(int value)
+        {
+            return ReverseBytes(BitConverter.GetBytes(value));
+        }
+
+        /// <summary>
+        /// Converts the value to bytes and reverses them.
+        /// </summary>
+        /// <param name="value">The value to convert to bytes.</param>
+        /// <returns>Bytes representing the value.</returns>
+        public static byte[] GetBytesReversed(long value)
+        {
+            return ReverseBytes(BitConverter.GetBytes(value));
+        }
+
+        /// <summary>
+        /// Reverse the position of an array of bytes.
+        /// </summary>
+        /// <param name="inArray">
+        /// The array to reverse.  If null or zero-length then the returned array will be null.
+        /// </param>
+        /// <returns>The reversed array.</returns>
+        public static byte[] ReverseBytes(byte[] inArray)
+        {
+            if (inArray != null && inArray.Length > 0)
+            {
+                int highCtr = inArray.Length - 1;
+                byte temp;
+
+                for (int ctr = 0; ctr < inArray.Length / 2; ctr++)
+                {
+                    temp = inArray[ctr];
+                    inArray[ctr] = inArray[highCtr];
+                    inArray[highCtr] = temp;
+                    highCtr -= 1;
+                }
+            }
+
+            return inArray;
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Crc32Hasher.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Crc32Hasher.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Crc32Hasher.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Crc32Hasher.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,122 @@
+// <auto-generated />
+namespace Kafka.Client.Utils
+{
+    using System;
+    using System.Security.Cryptography;
+
+    /// <summary>
+    /// From http://damieng.com/blog/2006/08/08/calculating_crc32_in_c_and_net
+    /// </summary>
+    internal class Crc32Hasher : HashAlgorithm
+    {
+        public const UInt32 DefaultPolynomial = 0xedb88320;
+        public const UInt32 DefaultSeed = 0xffffffff;
+
+        private UInt32 hash;
+        private UInt32 seed;
+        private UInt32[] table;
+        private static UInt32[] defaultTable;
+
+        public Crc32Hasher()
+        {
+            table = InitializeTable(DefaultPolynomial);
+            seed = DefaultSeed;
+            Initialize();
+        }
+
+        public Crc32Hasher(UInt32 polynomial, UInt32 seed)
+        {
+            table = InitializeTable(polynomial);
+            this.seed = seed;
+            Initialize();
+        }
+
+        public override void Initialize()
+        {
+            hash = seed;
+        }
+
+        protected override void HashCore(byte[] buffer, int start, int length)
+        {
+            hash = CalculateHash(table, hash, buffer, start, length);
+        }
+
+        protected override byte[] HashFinal()
+        {
+            byte[] hashBuffer = UInt32ToBigEndianBytes(~hash);
+            this.HashValue = hashBuffer;
+            return hashBuffer;
+        }
+
+        public override int HashSize
+        {
+            get { return 32; }
+        }
+
+        public static byte[] Compute(byte[] bytes)
+        {
+            var hasher = new Crc32Hasher();
+            byte[] hash = hasher.ComputeHash(bytes);
+            return hash;
+        }
+
+        //public static UInt32 Compute(byte[] buffer)
+        //{
+        //    return ~CalculateHash(InitializeTable(DefaultPolynomial), DefaultSeed, buffer, 0, buffer.Length);
+        //}
+
+        //public static UInt32 Compute(UInt32 seed, byte[] buffer)
+        //{
+        //    return ~CalculateHash(InitializeTable(DefaultPolynomial), seed, buffer, 0, buffer.Length);
+        //}
+
+        //public static UInt32 Compute(UInt32 polynomial, UInt32 seed, byte[] buffer)
+        //{
+        //    return ~CalculateHash(InitializeTable(polynomial), seed, buffer, 0, buffer.Length);
+        //}
+
+        private static UInt32[] InitializeTable(UInt32 polynomial)
+        {
+            if (polynomial == DefaultPolynomial && defaultTable != null)
+                return defaultTable;
+
+            UInt32[] createTable = new UInt32[256];
+            for (int i = 0; i < 256; i++)
+            {
+                UInt32 entry = (UInt32)i;
+                for (int j = 0; j < 8; j++)
+                    if ((entry & 1) == 1)
+                        entry = (entry >> 1) ^ polynomial;
+                    else
+                        entry = entry >> 1;
+                createTable[i] = entry;
+            }
+
+            if (polynomial == DefaultPolynomial)
+                defaultTable = createTable;
+
+            return createTable;
+        }
+
+        private static UInt32 CalculateHash(UInt32[] table, UInt32 seed, byte[] buffer, int start, int size)
+        {
+            UInt32 crc = seed;
+            for (int i = start; i < size; i++)
+                unchecked
+                {
+                    crc = (crc >> 8) ^ table[buffer[i] ^ crc & 0xff];
+                }
+            return crc;
+        }
+
+        private byte[] UInt32ToBigEndianBytes(UInt32 x)
+        {
+            return new byte[] {
+			    (byte)((x >> 24) & 0xff),
+			    (byte)((x >> 16) & 0xff),
+			    (byte)((x >> 8) & 0xff),
+			    (byte)(x & 0xff)
+		    };
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ErrorMapping.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ErrorMapping.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ErrorMapping.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ErrorMapping.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+namespace Kafka.Client.Utils
+{
+    public class ErrorMapping
+    {
+        public static readonly int UnknownCode = -1;
+        public static readonly int NoError = 0;
+        public static readonly int OffsetOutOfRangeCode = 1;
+        public static readonly int InvalidMessageCode = 2;
+        public static readonly int WrongPartitionCode = 3;
+        public static readonly int InvalidFetchSizeCode = 4;
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Extensions.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Extensions.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Extensions.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Extensions.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Utils
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Linq.Expressions;
+    using System.Text;
+
+    internal static class Extensions
+    {
+        public static string ToMultiString<T>(this IEnumerable<T> items, string separator)
+        {
+            if (items.Count() == 0)
+            {
+                return "NULL";
+            }
+
+            return String.Join(separator, items);
+        }
+
+        public static string ToMultiString<T>(this IEnumerable<T> items, Expression<Func<T, object>> selector, string separator)
+        {
+            if (items.Count() == 0)
+            {
+                return "NULL";
+            }
+
+            Func<T, object> compiled = selector.Compile();
+            return String.Join(separator, items.Select(compiled));
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Utils
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Linq.Expressions;
+    using System.Text.RegularExpressions;
+
+    internal static class Guard
+    {
+        /// <summary>
+        /// Checks whether given expression is true. Throws <see cref="InvalidOperationException" /> if not.
+        /// </summary>
+        /// <param name="assertion">
+        /// The assertion.
+        /// </param>
+        /// <exception cref="InvalidOperationException">
+        /// Thrown when condition is not met.
+        /// </exception>
+        public static void Assert(Expression<Func<bool>> assertion)
+        {
+            var compiled = assertion.Compile();
+            var evaluatedValue = compiled();
+            if (!evaluatedValue)
+            {
+                throw new InvalidOperationException(
+                    string.Format("'{0}' is not met.", Normalize(assertion.ToString())));
+            }
+        }
+
+        /// <summary>
+        /// Checks whether given expression is true. Throws given exception type if not.
+        /// </summary>
+        /// <typeparam name="TException">
+        /// Type of exception that i thrown when condition is not met.
+        /// </typeparam>
+        /// <param name="assertion">
+        /// The assertion.
+        /// </param>
+        public static void Assert<TException>(Expression<Func<bool>> assertion)
+            where TException : Exception, new()
+        {
+            var compiled = assertion.Compile();
+            var evaluatedValue = compiled();
+            if (!evaluatedValue)
+            {
+                var e = (Exception)Activator.CreateInstance(
+                    typeof(TException),
+                    new object[] { string.Format("'{0}' is not met.", Normalize(assertion.ToString())) });
+                throw e;
+            }
+        }
+
+        /// <summary>
+        /// Creates string representation of lambda expression with unnecessary information 
+        /// stripped out. 
+        /// </summary>
+        /// <param name="expression">Lambda expression to process. </param>
+        /// <returns>Normalized string representation. </returns>
+        private static string Normalize(string expression)
+        {
+            var result = expression;
+            var replacements = new Dictionary<Regex, string>()
+            {
+                { new Regex("value\\([^)]*\\)\\."), string.Empty },
+                { new Regex("\\(\\)\\."), string.Empty },
+                { new Regex("\\(\\)\\ =>"), string.Empty },                
+                { new Regex("Not"), "!" }            
+            };
+
+            foreach (var pattern in replacements)
+            {
+                result = pattern.Key.Replace(result, pattern.Value);
+            }
+
+            result = result.Trim();
+            return result;
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/KafkaScheduler.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/KafkaScheduler.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/KafkaScheduler.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/KafkaScheduler.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Utils
+{
+    using System;
+    using System.Globalization;
+    using System.Reflection;
+    using System.Threading;
+    using log4net;
+
+    /// <summary>
+    /// A scheduler for running jobs in the background
+    /// </summary>
+    internal class KafkaScheduler : IDisposable
+    {
+        public delegate void KafkaSchedulerDelegate();
+
+        private Timer timer;
+
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+        private KafkaSchedulerDelegate methodToRun;
+
+        private volatile bool disposed;
+
+        private readonly object shuttingDownLock = new object();
+
+        public void ScheduleWithRate(KafkaSchedulerDelegate method, long delayMs, long periodMs)
+        {
+            methodToRun = method;
+            TimerCallback tcb = HandleCallback;
+            timer = new Timer(tcb, null, delayMs, periodMs);
+        }
+
+        private void HandleCallback(object o)
+        {
+            methodToRun();
+        }
+
+        public void Dispose()
+        {
+            if (this.disposed)
+            {
+                return;
+            }
+
+            lock (this.shuttingDownLock)
+            {
+                if (this.disposed)
+                {
+                    return;
+                }
+
+                this.disposed = true;
+            }
+
+            try
+            {
+                if (timer != null)
+                {
+                    timer.Dispose();
+                    Logger.InfoFormat(CultureInfo.CurrentCulture, "shutdown scheduler");
+                }
+            }
+            catch (Exception exc)
+            {
+                Logger.Warn("Ignoring unexpected errors on closing", exc);
+            }
+        }
+    }
+}
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Utils
+{
+    using System;
+    using System.Reflection;
+
+    internal static class ReflectionHelper
+    {
+        public static T Instantiate<T>(string className)
+            where T : class
+        {
+            Type t1;
+            object o1;
+            if (string.IsNullOrEmpty(className))
+            {
+                return default(T);
+            }
+
+            if (className.Contains("`1"))
+            {
+                t1 = Type.GetType(className);
+                var t2 = typeof(T).GetGenericArguments();
+                var t3 = t1.MakeGenericType(t2);
+                o1 = Activator.CreateInstance(t3);
+                return o1 as T;
+            }
+
+            t1 = Type.GetType(className, true);
+            o1 = Activator.CreateInstance(t1);
+            return o1 as T;
+        }
+
+        public static T GetInstanceField<T>(string name, object obj)
+            where T : class
+        {
+            Type type = obj.GetType();
+            FieldInfo info = type.GetField(name, BindingFlags.NonPublic | BindingFlags.Instance);
+            object value = info.GetValue(obj);
+            return (T)value;
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupDirs.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupDirs.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupDirs.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupDirs.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+namespace Kafka.Client.Utils
+{
+    internal class ZKGroupDirs
+    {
+        private readonly string consumersPath = "/consumers";
+
+        public string ConsumerDir
+        {
+            get { return this.consumersPath; }
+        }
+
+        public string ConsumerGroupDir { get; private set; }
+
+        public string ConsumerRegistryDir { get; private set; }
+
+        public ZKGroupDirs(string group)
+        {
+            this.ConsumerGroupDir = this.consumersPath + "/" + group;
+            this.ConsumerRegistryDir = this.ConsumerGroupDir + "/ids";
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupTopicDirs.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupTopicDirs.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupTopicDirs.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupTopicDirs.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Utils
+{
+    internal class ZKGroupTopicDirs : ZKGroupDirs
+    {
+        public string ConsumerOffsetDir { get; private set; }
+
+        public string ConsumerOwnerDir { get; private set; }
+
+        public ZKGroupTopicDirs(string group, string topic) : base(group)
+        {
+            this.ConsumerOffsetDir = this.ConsumerGroupDir + "/offsets/" + topic;
+            this.ConsumerOwnerDir = this.ConsumerGroupDir + "/owners/" + topic;
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZkUtils.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZkUtils.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZkUtils.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZkUtils.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Utils
+{
+    using System.Collections.Generic;
+    using System.Globalization;
+    using System.Reflection;
+    using Kafka.Client.Cluster;
+    using Kafka.Client.ZooKeeperIntegration;
+    using log4net;
+    using ZooKeeperNet;
+
+    internal class ZkUtils
+    {
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+        internal static void UpdatePersistentPath(IZooKeeperClient zkClient, string path, string data)
+        {
+            try
+            {
+                zkClient.WriteData(path, data);
+            }
+            catch (KeeperException.NoNodeException)
+            {
+                CreateParentPath(zkClient, path);
+
+                try
+                {
+                    zkClient.CreatePersistent(path, data);
+                }
+                catch (KeeperException.NodeExistsException)
+                {
+                    zkClient.WriteData(path, data);
+                }
+            }
+        }
+
+        internal static void CreateParentPath(IZooKeeperClient zkClient, string path)
+        {
+            string parentDir = path.Substring(0, path.LastIndexOf('/'));
+            if (parentDir.Length != 0)
+            {
+                zkClient.CreatePersistent(parentDir, true);
+            }
+        }
+
+        internal static void DeletePath(IZooKeeperClient zkClient, string path)
+        {
+            try
+            {
+                zkClient.Delete(path);
+            }
+            catch (KeeperException.NoNodeException)
+            {
+                Logger.InfoFormat(CultureInfo.CurrentCulture, "{0} deleted during connection loss; this is ok", path);
+            }
+        }
+
+        internal static IDictionary<string, IList<string>> GetPartitionsForTopics(IZooKeeperClient zkClient, IEnumerable<string> topics)
+        {
+            var result = new Dictionary<string, IList<string>>();
+            foreach (string topic in topics)
+            {
+                var partList = new List<string>();
+                var brokers =
+                    zkClient.GetChildrenParentMayNotExist(ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic);
+                foreach (var broker in brokers)
+                {
+                    var numberOfParts =
+                        int.Parse(
+                            zkClient.ReadData<string>(ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic + "/" +
+                                                      broker),
+                                                      CultureInfo.CurrentCulture);
+                    for (int i = 0; i < numberOfParts; i++)
+                    {
+                        partList.Add(broker + "-" + i);
+                    }
+                }
+
+                partList.Sort();
+                result.Add(topic, partList);
+            }
+
+            return result;
+        }
+
+        internal static void CreateEphemeralPathExpectConflict(IZooKeeperClient zkClient, string path, string data)
+        {
+            try
+            {
+                CreateEphemeralPath(zkClient, path, data);
+            }
+            catch (KeeperException.NodeExistsException)
+            {
+                string storedData;
+                try
+                {
+                    storedData = zkClient.ReadData<string>(path);
+                }
+                catch (KeeperException.NoNodeException)
+                {
+                    // the node disappeared; treat as if node existed and let caller handles this
+                    throw;
+                }
+
+                if (storedData == null || storedData != data)
+                {
+                    Logger.InfoFormat(CultureInfo.CurrentCulture, "conflict in {0} data: {1} stored data: {2}", path, data, storedData);
+                    throw;
+                }
+                else
+                {
+                    // otherwise, the creation succeeded, return normally
+                    Logger.InfoFormat(CultureInfo.CurrentCulture, "{0} exits with value {1} during connection loss; this is ok", path, data);
+                }
+            }
+        }
+
+        internal static void CreateEphemeralPath(IZooKeeperClient zkClient, string path, string data)
+        {
+            try
+            {
+                zkClient.CreateEphemeral(path, data);
+            }
+            catch (KeeperException.NoNodeException)
+            {
+                ZkUtils.CreateParentPath(zkClient, path);
+                zkClient.CreateEphemeral(path, data);
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client
+{
+    using Kafka.Client.Cfg;
+
+    /// <summary>
+    /// A base class for all Kafka clients that support ZooKeeper based automatic broker discovery
+    /// </summary>
+    public abstract class ZooKeeperAwareKafkaClientBase : KafkaClientBase
+    {
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ZooKeeperAwareKafkaClientBase"/> class.
+        /// </summary>
+        /// <param name="config">The config.</param>
+        protected ZooKeeperAwareKafkaClientBase(ZKConfig config)
+        {
+            this.IsZooKeeperEnabled = config != null && !string.IsNullOrEmpty(config.ZkConnect);
+        }
+
+        /// <summary>
+        /// Gets a value indicating whether ZooKeeper based automatic broker discovery is enabled.
+        /// </summary>
+        /// <value>
+        /// <c>true</c> if this instance is zoo keeper enabled; otherwise, <c>false</c>.
+        /// </value>
+        protected bool IsZooKeeperEnabled { get; private set; }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ChildChangedEventItem.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ChildChangedEventItem.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ChildChangedEventItem.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ChildChangedEventItem.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Events
+{
+    using System.Linq;
+    using log4net;
+
+    /// <summary>
+    /// Represents methods that will handle a ZooKeeper child events  
+    /// </summary>
+    internal class ChildChangedEventItem
+    {
+        private readonly ILog logger;
+        private ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperChildChangedEventArgs> childChanged;
+
+        /// <summary>
+        /// Occurs when znode children changes
+        /// </summary>
+        public event ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperChildChangedEventArgs> ChildChanged
+        {
+            add
+            {
+                this.childChanged -= value;
+                this.childChanged += value;
+            }
+
+            remove
+            {
+                this.childChanged -= value;
+            }
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ChildChangedEventItem"/> class. 
+        /// </summary>
+        /// <param name="logger">
+        /// The logger.
+        /// </param>
+        /// <remarks>
+        /// Should use external logger to keep same format of all event logs
+        /// </remarks>
+        public ChildChangedEventItem(ILog logger)
+        {
+            this.logger = logger;
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ChildChangedEventItem"/> class.
+        /// </summary>
+        /// <param name="logger">
+        /// The logger.
+        /// </param>
+        /// <param name="handler">
+        /// The subscribed handler.
+        /// </param>
+        /// <remarks>
+        /// Should use external logger to keep same format of all event logs
+        /// </remarks>
+        public ChildChangedEventItem(ILog logger, ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperChildChangedEventArgs> handler)
+        {
+            this.logger = logger;
+            this.ChildChanged += handler;
+        }
+
+        /// <summary>
+        /// Invokes subscribed handlers for ZooKeeeper children changes event
+        /// </summary>
+        /// <param name="e">
+        /// The event data.
+        /// </param>
+        public void OnChildChanged(ZooKeeperChildChangedEventArgs e)
+        {
+            var handlers = this.childChanged;
+            if (handlers == null)
+            {
+                return;
+            }
+
+            foreach (var handler in handlers.GetInvocationList())
+            {
+                this.logger.Debug(e + " sent to " + handler.Target);
+            }
+
+            handlers(e);
+        }
+
+        /// <summary>
+        /// Gets the total count of subscribed handlers
+        /// </summary>
+        public int Count
+        {
+            get
+            {
+                return this.childChanged != null ? this.childChanged.GetInvocationList().Count() : 0;
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/DataChangedEventItem.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/DataChangedEventItem.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/DataChangedEventItem.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/DataChangedEventItem.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Events
+{
+    using System.Linq;
+    using log4net;
+
+    /// <summary>
+    /// Represents methods that will handle a ZooKeeper data events  
+    /// </summary>
+    internal class DataChangedEventItem
+    {
+        private readonly ILog logger;
+        private ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperDataChangedEventArgs> dataChanged;
+        private ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperDataChangedEventArgs> dataDeleted;
+
+        /// <summary>
+        /// Occurs when znode data changes
+        /// </summary>
+        public event ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperDataChangedEventArgs> DataChanged
+        {
+            add
+            {
+                this.dataChanged -= value;
+                this.dataChanged += value;
+            }
+
+            remove
+            {
+                this.dataChanged -= value;
+            }
+        }
+
+        /// <summary>
+        /// Occurs when znode data deletes
+        /// </summary>
+        public event ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperDataChangedEventArgs> DataDeleted
+        {
+            add
+            {
+                this.dataDeleted -= value;
+                this.dataDeleted += value;
+            }
+
+            remove
+            {
+                this.dataDeleted -= value;
+            }
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="DataChangedEventItem"/> class.
+        /// </summary>
+        /// <param name="logger">
+        /// The logger.
+        /// </param>
+        /// <remarks>
+        /// Should use external logger to keep same format of all event logs
+        /// </remarks>
+        public DataChangedEventItem(ILog logger)
+        {
+            this.logger = logger;
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="DataChangedEventItem"/> class.
+        /// </summary>
+        /// <param name="logger">
+        /// The logger.
+        /// </param>
+        /// <param name="changedHandler">
+        /// The changed handler.
+        /// </param>
+        /// <param name="deletedHandler">
+        /// The deleted handler.
+        /// </param>
+        /// <remarks>
+        /// Should use external logger to keep same format of all event logs
+        /// </remarks>
+        public DataChangedEventItem(
+            ILog logger,
+            ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperDataChangedEventArgs> changedHandler,
+            ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperDataChangedEventArgs> deletedHandler)
+        {
+            this.logger = logger;
+            this.DataChanged += changedHandler;
+            this.DataDeleted += deletedHandler;
+        }
+
+        /// <summary>
+        /// Invokes subscribed handlers for ZooKeeeper data changes event
+        /// </summary>
+        /// <param name="e">
+        /// The event data.
+        /// </param>
+        public void OnDataChanged(ZooKeeperDataChangedEventArgs e)
+        {
+            var handlers = this.dataChanged;
+            if (handlers == null)
+            {
+                return;
+            }
+
+            foreach (var handler in handlers.GetInvocationList())
+            {
+                this.logger.Debug(e + " sent to " + handler.Target);
+            }
+
+            handlers(e);
+        }
+
+        /// <summary>
+        /// Invokes subscribed handlers for ZooKeeeper data deletes event
+        /// </summary>
+        /// <param name="e">
+        /// The event data.
+        /// </param>
+        public void OnDataDeleted(ZooKeeperDataChangedEventArgs e)
+        {
+            var handlers = this.dataDeleted;
+            if (handlers == null)
+            {
+                return;
+            }
+
+            foreach (var handler in handlers.GetInvocationList())
+            {
+                this.logger.Debug(e + " sent to " + handler.Target);
+            }
+
+            handlers(e);
+        }
+
+        /// <summary>
+        /// Gets the total count of subscribed handlers
+        /// </summary>
+        public int TotalCount
+        {
+            get
+            {
+                return (this.dataChanged != null ? this.dataChanged.GetInvocationList().Count() : 0) +
+                    (this.dataDeleted != null ? this.dataDeleted.GetInvocationList().Count() : 0);
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperChildChangedEventArgs.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperChildChangedEventArgs.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperChildChangedEventArgs.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperChildChangedEventArgs.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Events
+{
+    using System.Collections.Generic;
+
+    /// <summary>
+    /// Contains znode children changed event data
+    /// </summary>
+    internal class ZooKeeperChildChangedEventArgs : ZooKeeperEventArgs
+    {
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ZooKeeperChildChangedEventArgs"/> class.
+        /// </summary>
+        /// <param name="path">
+        /// The path.
+        /// </param>
+        public ZooKeeperChildChangedEventArgs(string path)
+            : base("Children of " + path + " changed")
+        {
+            this.Path = path;
+        }
+
+        /// <summary>
+        /// Gets the znode path
+        /// </summary>
+        public string Path { get; private set; }
+
+        /// <summary>
+        /// Gets or sets the current znode children
+        /// </summary>
+        public IList<string> Children { get; set; }
+
+        /// <summary>
+        /// Gets the current event type
+        /// </summary>
+        public override ZooKeeperEventTypes Type
+        {
+            get
+            {
+                return ZooKeeperEventTypes.ChildChanged;
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperDataChangedEventArgs.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperDataChangedEventArgs.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperDataChangedEventArgs.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperDataChangedEventArgs.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Events
+{
+    /// <summary>
+    /// Contains znode data changed event data
+    /// </summary>
+    internal class ZooKeeperDataChangedEventArgs : ZooKeeperEventArgs
+    {
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ZooKeeperDataChangedEventArgs"/> class.
+        /// </summary>
+        /// <param name="path">
+        /// The znode path.
+        /// </param>
+        public ZooKeeperDataChangedEventArgs(string path)
+            : base("Data of " + path + " changed")
+        {
+            this.Path = path;
+        }
+
+        /// <summary>
+        /// Gets the znode path
+        /// </summary>
+        public string Path { get; private set; }
+
+        /// <summary>
+        /// Gets or sets znode changed data.
+        /// </summary>
+        /// <remarks>
+        /// Null if data was deleted.
+        /// </remarks>
+        public string Data { get; set; }
+
+        /// <summary>
+        /// Gets the event type.
+        /// </summary>
+        public override ZooKeeperEventTypes Type
+        {
+            get
+            {
+                return ZooKeeperEventTypes.DataChanged;
+            }
+        }
+
+        /// <summary>
+        /// Gets a value indicating whether data was deleted
+        /// </summary>
+        public bool DataDeleted
+        {
+            get
+            {
+                return string.IsNullOrEmpty(this.Data);
+            }
+        }
+
+        /// <summary>
+        /// Gets string representation of event data
+        /// </summary>
+        /// <returns>
+        /// String representation of event data
+        /// </returns>
+        public override string ToString()
+        {
+            if (this.DataDeleted)
+            {
+                return base.ToString().Replace("changed", "deleted");
+            }
+
+            return base.ToString();
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventArgs.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventArgs.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventArgs.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventArgs.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Events
+{
+    using System;
+
+    /// <summary>
+    /// Base class for classes containing ZooKeeper event data
+    /// </summary>
+    internal abstract class ZooKeeperEventArgs : EventArgs
+    {
+        private readonly string description;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ZooKeeperEventArgs"/> class.
+        /// </summary>
+        /// <param name="description">
+        /// The event description.
+        /// </param>
+        protected ZooKeeperEventArgs(string description)
+        {
+            this.description = description;
+        }
+
+        /// <summary>
+        /// Gets string representation of event data
+        /// </summary>
+        /// <returns>
+        /// String representation of event data
+        /// </returns>
+        public override string ToString()
+        {
+            return "ZooKeeperEvent[" + this.description + "]";
+        }
+
+        /// <summary>
+        /// Gets the event type.
+        /// </summary>
+        public abstract ZooKeeperEventTypes Type { get; }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventTypes.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventTypes.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventTypes.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventTypes.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Events
+{
+    /// <summary>
+    /// Event types
+    /// </summary>
+    internal enum ZooKeeperEventTypes
+    {
+        Unknow = 0,
+
+        StateChanged = 1,
+
+        SessionCreated = 2,
+
+        ChildChanged = 3,
+
+        DataChanged = 4,
+    }
+}