You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/21 21:17:25 UTC
svn commit: r1173797 [5/10] - in /incubator/kafka/trunk/clients/csharp: ./
lib/StyleCop/ src/Kafka/ src/Kafka/Kafka.Client/
src/Kafka/Kafka.Client/Cfg/ src/Kafka/Kafka.Client/Cluster/
src/Kafka/Kafka.Client/Consumers/ src/Kafka/Kafka.Client/Exceptions/...
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Requests
+{
+ using System;
+ using System.Collections.Generic;
+ using System.IO;
+ using System.Linq;
+ using System.Text;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Serialization;
+ using Kafka.Client.Utils;
+
+ /// <summary>
+ /// Constructs a request containing multiple producer requests to send to Kafka.
+ /// </summary>
+ public class MultiProducerRequest : AbstractRequest, IWritable
+ {
+ public const byte DefaultRequestsCountSize = 2;
+
+ public static int GetBufferLength(IEnumerable<ProducerRequest> requests)
+ {
+ Guard.Assert<ArgumentNullException>(() => requests != null);
+
+ return DefaultRequestSizeSize
+ + DefaultRequestIdSize
+ + DefaultRequestsCountSize
+ + (int)requests.Sum(x => x.RequestBuffer.Length - DefaultRequestIdSize - DefaultRequestSizeSize);
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the MultiProducerRequest class.
+ /// </summary>
+ /// <param name="requests">
+ /// The list of individual producer requests to send in this request.
+ /// </param>
+ public MultiProducerRequest(IEnumerable<ProducerRequest> requests)
+ {
+ Guard.Assert<ArgumentNullException>(() => requests != null);
+ int length = GetBufferLength(requests);
+ ProducerRequests = requests;
+ this.RequestBuffer = new BoundedBuffer(length);
+ this.WriteTo(this.RequestBuffer);
+ }
+
+ /// <summary>
+ /// Gets or sets the list of producer requests to be sent in batch.
+ /// </summary>
+ public IEnumerable<ProducerRequest> ProducerRequests { get; set; }
+
+ public override RequestTypes RequestType
+ {
+ get
+ {
+ return RequestTypes.MultiProduce;
+ }
+ }
+
+ /// <summary>
+ /// Writes content into given stream
+ /// </summary>
+ /// <param name="output">
+ /// The output stream.
+ /// </param>
+ public void WriteTo(MemoryStream output)
+ {
+ Guard.Assert<ArgumentNullException>(() => output != null);
+
+ using (var writer = new KafkaBinaryWriter(output))
+ {
+ writer.Write(this.RequestBuffer.Capacity - DefaultRequestSizeSize);
+ writer.Write(this.RequestTypeId);
+ this.WriteTo(writer);
+ }
+ }
+
+ /// <summary>
+ /// Writes content into given writer
+ /// </summary>
+ /// <param name="writer">
+ /// The writer.
+ /// </param>
+ public void WriteTo(KafkaBinaryWriter writer)
+ {
+ Guard.Assert<ArgumentNullException>(() => writer != null);
+
+ writer.Write((short)this.ProducerRequests.Count());
+ foreach (var request in ProducerRequests)
+ {
+ request.WriteTo(writer);
+ }
+ }
+
+ public override string ToString()
+ {
+ using (var reader = new KafkaBinaryReader(this.RequestBuffer))
+ {
+ return ParseFrom(reader, (int)this.RequestBuffer.Length);
+ }
+ }
+
+ public static string ParseFrom(KafkaBinaryReader reader, int count)
+ {
+ Guard.Assert<ArgumentNullException>(() => reader != null);
+
+ var sb = new StringBuilder();
+ sb.Append("Request size: ");
+ sb.Append(reader.ReadInt32());
+ sb.Append(", RequestId: ");
+ short reqId = reader.ReadInt16();
+ sb.Append(reqId);
+ sb.Append("(");
+ sb.Append((RequestTypes)reqId);
+ sb.Append("), Single Requests: {");
+ int i = 1;
+ while (reader.BaseStream.Position != reader.BaseStream.Length)
+ {
+ sb.Append("Request ");
+ sb.Append(i);
+ sb.Append(" {");
+ int msgSize = 0;
+ sb.Append(ProducerRequest.ParseFrom(reader, msgSize));
+ sb.AppendLine("} ");
+ i++;
+ }
+
+ return sb.ToString();
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Requests
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Serialization;
+ using Kafka.Client.Utils;
+
+ /// <summary>
+ /// Constructs a request to send to Kafka to get the current offset for a given topic
+ /// </summary>
+ public class OffsetRequest : AbstractRequest, IWritable
+ {
+ /// <summary>
+ /// The latest time constant.
+ /// </summary>
+ public static readonly long LatestTime = -1L;
+
+ /// <summary>
+ /// The earliest time constant.
+ /// </summary>
+ public static readonly long EarliestTime = -2L;
+
+ public const string SmallestTime = "smallest";
+
+ public const string LargestTime = "largest";
+
+ public const byte DefaultTopicSizeSize = 2;
+ public const byte DefaultPartitionSize = 4;
+ public const byte DefaultTimeSize = 8;
+ public const byte DefaultMaxOffsetsSize = 4;
+ public const byte DefaultHeaderSize = DefaultRequestSizeSize + DefaultTopicSizeSize + DefaultPartitionSize + DefaultRequestIdSize + DefaultTimeSize + DefaultMaxOffsetsSize;
+
+ public static int GetRequestLength(string topic, string encoding = DefaultEncoding)
+ {
+ short topicLength = GetTopicLength(topic, encoding);
+ return topicLength + DefaultHeaderSize;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the OffsetRequest class.
+ /// </summary>
+ public OffsetRequest()
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the OffsetRequest class.
+ /// </summary>
+ /// <param name="topic">The topic to publish to.</param>
+ /// <param name="partition">The partition to publish to.</param>
+ /// <param name="time">The time from which to request offsets.</param>
+ /// <param name="maxOffsets">The maximum amount of offsets to return.</param>
+ public OffsetRequest(string topic, int partition, long time, int maxOffsets)
+ {
+ Topic = topic;
+ Partition = partition;
+ Time = time;
+ MaxOffsets = maxOffsets;
+
+ int length = GetRequestLength(topic, DefaultEncoding);
+ this.RequestBuffer = new BoundedBuffer(length);
+ this.WriteTo(this.RequestBuffer);
+ }
+
+ /// <summary>
+ /// Gets the time.
+ /// </summary>
+ public long Time { get; private set; }
+
+ /// <summary>
+ /// Gets the maximum number of offsets to return.
+ /// </summary>
+ public int MaxOffsets { get; private set; }
+
+ public override RequestTypes RequestType
+ {
+ get
+ {
+ return RequestTypes.Offsets;
+ }
+ }
+
+ /// <summary>
+ /// Writes content into given stream
+ /// </summary>
+ /// <param name="output">
+ /// The output stream.
+ /// </param>
+ public void WriteTo(System.IO.MemoryStream output)
+ {
+ Guard.Assert<ArgumentNullException>(() => output != null);
+
+ using (var writer = new KafkaBinaryWriter(output))
+ {
+ writer.Write(this.RequestBuffer.Capacity - DefaultRequestSizeSize);
+ writer.Write(this.RequestTypeId);
+ this.WriteTo(writer);
+ }
+ }
+
+ /// <summary>
+ /// Writes content into given writer
+ /// </summary>
+ /// <param name="writer">
+ /// The writer.
+ /// </param>
+ public void WriteTo(KafkaBinaryWriter writer)
+ {
+ Guard.Assert<ArgumentNullException>(() => writer != null);
+
+ writer.WriteTopic(this.Topic, DefaultEncoding);
+ writer.Write(this.Partition);
+ writer.Write(this.Time);
+ writer.Write(this.MaxOffsets);
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Requests
+{
+ using System;
+ using System.Collections.Generic;
+ using System.IO;
+ using System.Text;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Serialization;
+ using Kafka.Client.Utils;
+
+ /// <summary>
+ /// Constructs a request to send to Kafka.
+ /// </summary>
+ public class ProducerRequest : AbstractRequest, IWritable
+ {
+ public const int RandomPartition = -1;
+ public const byte DefaultTopicSizeSize = 2;
+ public const byte DefaultPartitionSize = 4;
+ public const byte DefaultSetSizeSize = 4;
+ public const byte DefaultHeaderSize = DefaultRequestSizeSize + DefaultTopicSizeSize + DefaultPartitionSize + DefaultRequestIdSize + DefaultSetSizeSize;
+ public const short DefaultTopicLengthIfNonePresent = 2;
+
+ public static int GetRequestLength(string topic, int messegesSize, string encoding = DefaultEncoding)
+ {
+ short topicLength = GetTopicLength(topic, encoding);
+ return topicLength + DefaultHeaderSize + messegesSize;
+ }
+
+ public ProducerRequest(string topic, int partition, BufferedMessageSet messages)
+ {
+ Guard.Assert<ArgumentNullException>(() => messages != null);
+ int length = GetRequestLength(topic, messages.SetSize);
+ this.RequestBuffer = new BoundedBuffer(length);
+ this.Topic = topic;
+ this.Partition = partition;
+ this.MessageSet = messages;
+ this.WriteTo(this.RequestBuffer);
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the ProducerRequest class.
+ /// </summary>
+ /// <param name="topic">The topic to publish to.</param>
+ /// <param name="partition">The partition to publish to.</param>
+ /// <param name="messages">The list of messages to send.</param>
+ public ProducerRequest(string topic, int partition, IEnumerable<Message> messages)
+ : this(topic, partition, new BufferedMessageSet(messages))
+ {
+ }
+
+ public BufferedMessageSet MessageSet { get; private set; }
+
+ public override RequestTypes RequestType
+ {
+ get
+ {
+ return RequestTypes.Produce;
+ }
+ }
+
+ public int TotalSize
+ {
+ get
+ {
+ return (int)this.RequestBuffer.Length;
+ }
+ }
+
+ /// <summary>
+ /// Writes content into given stream
+ /// </summary>
+ /// <param name="output">
+ /// The output stream.
+ /// </param>
+ public void WriteTo(MemoryStream output)
+ {
+ Guard.Assert<ArgumentNullException>(() => output != null);
+ using (var writer = new KafkaBinaryWriter(output))
+ {
+ writer.Write(this.RequestBuffer.Capacity - DefaultRequestSizeSize);
+ writer.Write(this.RequestTypeId);
+ this.WriteTo(writer);
+ }
+ }
+
+ /// <summary>
+ /// Writes content into given writer
+ /// </summary>
+ /// <param name="writer">
+ /// The writer.
+ /// </param>
+ public void WriteTo(KafkaBinaryWriter writer)
+ {
+ Guard.Assert<ArgumentNullException>(() => writer != null);
+ writer.WriteTopic(this.Topic, DefaultEncoding);
+ writer.Write(this.Partition);
+ writer.Write(this.MessageSet.SetSize);
+ this.MessageSet.WriteTo(writer);
+ }
+
+ public override string ToString()
+ {
+ using (var reader = new KafkaBinaryReader(this.RequestBuffer))
+ {
+ return ParseFrom(reader, this.TotalSize);
+ }
+ }
+
+ public static string ParseFrom(KafkaBinaryReader reader, int count, bool skipReqInfo = false)
+ {
+ Guard.Assert<ArgumentNullException>(() => reader != null);
+ var sb = new StringBuilder();
+
+ if (!skipReqInfo)
+ {
+ sb.Append("Request size: ");
+ sb.Append(reader.ReadInt32());
+ sb.Append(", RequestId: ");
+ short reqId = reader.ReadInt16();
+ sb.Append(reqId);
+ sb.Append("(");
+ sb.Append((RequestTypes)reqId);
+ sb.Append(")");
+ }
+
+ sb.Append(", Topic: ");
+ string topic = reader.ReadTopic(DefaultEncoding);
+ sb.Append(topic);
+ sb.Append(", Partition: ");
+ sb.Append(reader.ReadInt32());
+ sb.Append(", Set size: ");
+ sb.Append(reader.ReadInt32());
+ int size = count - DefaultHeaderSize - GetTopicLength(topic);
+ sb.Append(", Set {");
+ sb.Append(BufferedMessageSet.ParseFrom(reader, size));
+ sb.Append("}");
+ return sb.ToString();
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/RequestTypes.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/RequestTypes.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/RequestTypes.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/RequestTypes.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Requests
+{
+ /// <summary>
+ /// Requests types for Kafka
+ /// </summary>
+ /// <remarks>
+ /// Many of these are not in play yet.
+ /// </remarks>
+ public enum RequestTypes : short
+ {
+ /// <summary>
+ /// Produce a message.
+ /// </summary>
+ Produce = 0,
+
+ /// <summary>
+ /// Fetch a message.
+ /// </summary>
+ Fetch = 1,
+
+ /// <summary>
+ /// Multi-fetch messages.
+ /// </summary>
+ MultiFetch = 2,
+
+ /// <summary>
+ /// Multi-produce messages.
+ /// </summary>
+ MultiProduce = 3,
+
+ /// <summary>
+ /// Gets offsets.
+ /// </summary>
+ Offsets = 4
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/DefaultEncoder.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/DefaultEncoder.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/DefaultEncoder.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/DefaultEncoder.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Serialization
+{
+ using Kafka.Client.Messages;
+
+ /// <summary>
+ /// Default serializer that expects <see cref="Message" /> object
+ /// </summary>
+ public class DefaultEncoder : IEncoder<Message>
+ {
+ /// <summary>
+ /// Do nothing with data
+ /// </summary>
+ /// <param name="data">
+ /// The data, that are already in <see cref="Message" /> format.
+ /// </param>
+ /// <returns>
+ /// Serialized data
+ /// </returns>
+ public Message ToMessage(Message data)
+ {
+ return data;
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IEncoder.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IEncoder.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IEncoder.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IEncoder.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Serialization
+{
+ using Kafka.Client.Messages;
+
+ /// <summary>
+ /// User-defined serializer to <see cref="Message" /> format
+ /// </summary>
+ /// <typeparam name="TData">
+ /// Type od data
+ /// </typeparam>
+ public interface IEncoder<TData>
+ {
+ /// <summary>
+ /// Serializes given data to <see cref="Message" /> format
+ /// </summary>
+ /// <param name="data">
+ /// The data to serialize.
+ /// </param>
+ /// <returns>
+ /// Serialized data
+ /// </returns>
+ Message ToMessage(TData data);
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IWritable.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IWritable.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IWritable.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IWritable.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Serialization
+{
+ using System.IO;
+
+ /// <summary>
+ /// Writes content into given stream
+ /// </summary>
+ internal interface IWritable
+ {
+ /// <summary>
+ /// Writes content into given stream
+ /// </summary>
+ /// <param name="output">
+ /// The output stream.
+ /// </param>
+ void WriteTo(MemoryStream output);
+
+ /// <summary>
+ /// Writes content into given writer
+ /// </summary>
+ /// <param name="writer">
+ /// The writer.
+ /// </param>
+ void WriteTo(KafkaBinaryWriter writer);
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Serialization
+{
+ using System.IO;
+ using System.Net;
+ using System.Text;
+
+ /// <summary>
+ /// Reads data from underlying stream using big endian bytes order for primitive types
+ /// and UTF-8 encoding for strings.
+ /// </summary>
+ public class KafkaBinaryReader : BinaryReader
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="KafkaBinaryReader"/> class
+ /// using big endian bytes order for primive types and UTF-8 encoding for strings.
+ /// </summary>
+ /// <param name="input">
+ /// The input stream.
+ /// </param>
+ public KafkaBinaryReader(Stream input)
+ : base(input)
+ {
+ }
+
+ /// <summary>
+ /// Resets position pointer.
+ /// </summary>
+ /// <param name="disposing">
+ /// Not used
+ /// </param>
+ protected override void Dispose(bool disposing)
+ {
+ this.BaseStream.Position = 0;
+ }
+
+ /// <summary>
+ /// Reads two-bytes signed integer from the current stream using big endian bytes order
+ /// and advances the stream position by two bytes
+ /// </summary>
+ /// <returns>
+ /// The two-byte signed integer read from the current stream.
+ /// </returns>
+ public override short ReadInt16()
+ {
+ short value = base.ReadInt16();
+ short currentOrdered = IPAddress.NetworkToHostOrder(value);
+ return currentOrdered;
+ }
+
+ /// <summary>
+ /// Reads four-bytes signed integer from the current stream using big endian bytes order
+ /// and advances the stream position by four bytes
+ /// </summary>
+ /// <returns>
+ /// The four-byte signed integer read from the current stream.
+ /// </returns>
+ public override int ReadInt32()
+ {
+ int value = base.ReadInt32();
+ int currentOrdered = IPAddress.NetworkToHostOrder(value);
+ return currentOrdered;
+ }
+
+ /// <summary>
+ /// Reads eight-bytes signed integer from the current stream using big endian bytes order
+ /// and advances the stream position by eight bytes
+ /// </summary>
+ /// <returns>
+ /// The eight-byte signed integer read from the current stream.
+ /// </returns>
+ public override long ReadInt64()
+ {
+ long value = base.ReadInt64();
+ long currentOrdered = IPAddress.NetworkToHostOrder(value);
+ return currentOrdered;
+ }
+
+ /// <summary>
+ /// Reads four-bytes signed integer from the current stream using big endian bytes order
+ /// and advances the stream position by four bytes
+ /// </summary>
+ /// <returns>
+ /// The four-byte signed integer read from the current stream.
+ /// </returns>
+ public override int Read()
+ {
+ int value = base.Read();
+ int currentOrdered = IPAddress.NetworkToHostOrder(value);
+ return currentOrdered;
+ }
+
+ /// <summary>
+ /// Reads fixed-length topic from underlying stream using given encoding.
+ /// </summary>
+ /// <param name="encoding">
+ /// The encoding to use.
+ /// </param>
+ /// <returns>
+ /// The read topic.
+ /// </returns>
+ public string ReadTopic(string encoding)
+ {
+ short length = this.ReadInt16();
+ if (length == -1)
+ {
+ return null;
+ }
+
+ var bytes = this.ReadBytes(length);
+ Encoding encoder = Encoding.GetEncoding(encoding);
+ return encoder.GetString(bytes);
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryWriter.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryWriter.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryWriter.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryWriter.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Serialization
+{
+ using System.IO;
+ using System.Net;
+ using System.Text;
+
+ /// <summary>
+ /// Writes data into underlying stream using big endian bytes order for primitive types
+ /// and UTF-8 encoding for strings.
+ /// </summary>
+ public class KafkaBinaryWriter : BinaryWriter
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="KafkaBinaryWriter"/> class
+ /// using big endian bytes order for primive types and UTF-8 encoding for strings.
+ /// </summary>
+ protected KafkaBinaryWriter()
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="KafkaBinaryWriter"/> class
+ /// using big endian bytes order for primive types and UTF-8 encoding for strings.
+ /// </summary>
+ /// <param name="output">
+ /// The output stream.
+ /// </param>
+ public KafkaBinaryWriter(Stream output)
+ : base(output)
+ {
+ }
+
+ /// <summary>
+ /// Flushes data into stream and resets position pointer.
+ /// </summary>
+ /// <param name="disposing">
+ /// Not used
+ /// </param>
+ protected override void Dispose(bool disposing)
+ {
+ this.Flush();
+ this.OutStream.Position = 0;
+ }
+
+ /// <summary>
+ /// Writes four-bytes signed integer to the current stream using big endian bytes order
+ /// and advances the stream position by four bytes
+ /// </summary>
+ /// <param name="value">
+ /// The value to write.
+ /// </param>
+ public override void Write(int value)
+ {
+ int bigOrdered = IPAddress.HostToNetworkOrder(value);
+ base.Write(bigOrdered);
+ }
+
+ /// <summary>
+ /// Writes eight-bytes signed integer to the current stream using big endian bytes order
+ /// and advances the stream position by eight bytes
+ /// </summary>
+ /// <param name="value">
+ /// The value to write.
+ /// </param>
+ public override void Write(long value)
+ {
+ long bigOrdered = IPAddress.HostToNetworkOrder(value);
+ base.Write(bigOrdered);
+ }
+
+ /// <summary>
+ /// Writes two-bytes signed integer to the current stream using big endian bytes order
+ /// and advances the stream position by two bytes
+ /// </summary>
+ /// <param name="value">
+ /// The value to write.
+ /// </param>
+ public override void Write(short value)
+ {
+ short bigOrdered = IPAddress.HostToNetworkOrder(value);
+ base.Write(bigOrdered);
+ }
+
+ /// <summary>
+ /// Writes topic and his size into underlying stream using given encoding.
+ /// </summary>
+ /// <param name="topic">
+ /// The topic to write.
+ /// </param>
+ /// <param name="encoding">
+ /// The encoding to use.
+ /// </param>
+ public void WriteTopic(string topic, string encoding)
+ {
+ if (string.IsNullOrEmpty(topic))
+ {
+ short defaultTopic = -1;
+ this.Write(defaultTopic);
+ }
+ else
+ {
+ var length = (short)topic.Length;
+ this.Write(length);
+ Encoding encoder = Encoding.GetEncoding(encoding);
+ byte[] encodedTopic = encoder.GetBytes(topic);
+ this.Write(encodedTopic);
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/StringEncoder.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/StringEncoder.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/StringEncoder.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/StringEncoder.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Serialization
+{
+ using System.Text;
+ using Kafka.Client.Messages;
+
+ /// <summary>
+ /// Serializes data to <see cref="Message" /> format using UTF-8 encoding
+ /// </summary>
+ public class StringEncoder : IEncoder<string>
+ {
+ /// <summary>
+ /// Serializes given data to <see cref="Message" /> format using UTF-8 encoding
+ /// </summary>
+ /// <param name="data">
+ /// The data to serialize.
+ /// </param>
+ /// <returns>
+ /// Serialized data
+ /// </returns>
+ public Message ToMessage(string data)
+ {
+ byte[] encodedData = Encoding.UTF8.GetBytes(data);
+ return new Message(encodedData);
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/BitWorks.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/BitWorks.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/BitWorks.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/BitWorks.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Utils
+{
+ using System;
+
+ /// <summary>
+ /// Utilty class for managing bits and bytes.
+ /// </summary>
+ internal class BitWorks
+ {
+ /// <summary>
+ /// Converts the value to bytes and reverses them.
+ /// </summary>
+ /// <param name="value">The value to convert to bytes.</param>
+ /// <returns>Bytes representing the value.</returns>
+ public static byte[] GetBytesReversed(short value)
+ {
+ return ReverseBytes(BitConverter.GetBytes(value));
+ }
+
+ /// <summary>
+ /// Converts the value to bytes and reverses them.
+ /// </summary>
+ /// <param name="value">The value to convert to bytes.</param>
+ /// <returns>Bytes representing the value.</returns>
+ public static byte[] GetBytesReversed(int value)
+ {
+ return ReverseBytes(BitConverter.GetBytes(value));
+ }
+
+ /// <summary>
+ /// Converts the value to bytes and reverses them.
+ /// </summary>
+ /// <param name="value">The value to convert to bytes.</param>
+ /// <returns>Bytes representing the value.</returns>
+ public static byte[] GetBytesReversed(long value)
+ {
+ return ReverseBytes(BitConverter.GetBytes(value));
+ }
+
+ /// <summary>
+ /// Reverse the position of an array of bytes.
+ /// </summary>
+ /// <param name="inArray">
+ /// The array to reverse. If null or zero-length then the returned array will be null.
+ /// </param>
+ /// <returns>The reversed array.</returns>
+ public static byte[] ReverseBytes(byte[] inArray)
+ {
+ if (inArray != null && inArray.Length > 0)
+ {
+ int highCtr = inArray.Length - 1;
+ byte temp;
+
+ for (int ctr = 0; ctr < inArray.Length / 2; ctr++)
+ {
+ temp = inArray[ctr];
+ inArray[ctr] = inArray[highCtr];
+ inArray[highCtr] = temp;
+ highCtr -= 1;
+ }
+ }
+
+ return inArray;
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Crc32Hasher.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Crc32Hasher.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Crc32Hasher.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Crc32Hasher.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,122 @@
+// <auto-generated />
+namespace Kafka.Client.Utils
+{
+ using System;
+ using System.Security.Cryptography;
+
+ /// <summary>
+ /// From http://damieng.com/blog/2006/08/08/calculating_crc32_in_c_and_net
+ /// </summary>
+ internal class Crc32Hasher : HashAlgorithm
+ {
+ public const UInt32 DefaultPolynomial = 0xedb88320;
+ public const UInt32 DefaultSeed = 0xffffffff;
+
+ private UInt32 hash;
+ private UInt32 seed;
+ private UInt32[] table;
+ private static UInt32[] defaultTable;
+
+ public Crc32Hasher()
+ {
+ table = InitializeTable(DefaultPolynomial);
+ seed = DefaultSeed;
+ Initialize();
+ }
+
+ public Crc32Hasher(UInt32 polynomial, UInt32 seed)
+ {
+ table = InitializeTable(polynomial);
+ this.seed = seed;
+ Initialize();
+ }
+
+ public override void Initialize()
+ {
+ hash = seed;
+ }
+
+ protected override void HashCore(byte[] buffer, int start, int length)
+ {
+ hash = CalculateHash(table, hash, buffer, start, length);
+ }
+
+ protected override byte[] HashFinal()
+ {
+ byte[] hashBuffer = UInt32ToBigEndianBytes(~hash);
+ this.HashValue = hashBuffer;
+ return hashBuffer;
+ }
+
+ public override int HashSize
+ {
+ get { return 32; }
+ }
+
+ public static byte[] Compute(byte[] bytes)
+ {
+ var hasher = new Crc32Hasher();
+ byte[] hash = hasher.ComputeHash(bytes);
+ return hash;
+ }
+
+ //public static UInt32 Compute(byte[] buffer)
+ //{
+ // return ~CalculateHash(InitializeTable(DefaultPolynomial), DefaultSeed, buffer, 0, buffer.Length);
+ //}
+
+ //public static UInt32 Compute(UInt32 seed, byte[] buffer)
+ //{
+ // return ~CalculateHash(InitializeTable(DefaultPolynomial), seed, buffer, 0, buffer.Length);
+ //}
+
+ //public static UInt32 Compute(UInt32 polynomial, UInt32 seed, byte[] buffer)
+ //{
+ // return ~CalculateHash(InitializeTable(polynomial), seed, buffer, 0, buffer.Length);
+ //}
+
+ private static UInt32[] InitializeTable(UInt32 polynomial)
+ {
+ if (polynomial == DefaultPolynomial && defaultTable != null)
+ return defaultTable;
+
+ UInt32[] createTable = new UInt32[256];
+ for (int i = 0; i < 256; i++)
+ {
+ UInt32 entry = (UInt32)i;
+ for (int j = 0; j < 8; j++)
+ if ((entry & 1) == 1)
+ entry = (entry >> 1) ^ polynomial;
+ else
+ entry = entry >> 1;
+ createTable[i] = entry;
+ }
+
+ if (polynomial == DefaultPolynomial)
+ defaultTable = createTable;
+
+ return createTable;
+ }
+
+ private static UInt32 CalculateHash(UInt32[] table, UInt32 seed, byte[] buffer, int start, int size)
+ {
+ UInt32 crc = seed;
+ for (int i = start; i < size; i++)
+ unchecked
+ {
+ crc = (crc >> 8) ^ table[buffer[i] ^ crc & 0xff];
+ }
+ return crc;
+ }
+
+ private byte[] UInt32ToBigEndianBytes(UInt32 x)
+ {
+ return new byte[] {
+ (byte)((x >> 24) & 0xff),
+ (byte)((x >> 16) & 0xff),
+ (byte)((x >> 8) & 0xff),
+ (byte)(x & 0xff)
+ };
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ErrorMapping.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ErrorMapping.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ErrorMapping.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ErrorMapping.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+namespace Kafka.Client.Utils
+{
+ public class ErrorMapping
+ {
+ public static readonly int UnknownCode = -1;
+ public static readonly int NoError = 0;
+ public static readonly int OffsetOutOfRangeCode = 1;
+ public static readonly int InvalidMessageCode = 2;
+ public static readonly int WrongPartitionCode = 3;
+ public static readonly int InvalidFetchSizeCode = 4;
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Extensions.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Extensions.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Extensions.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Extensions.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Utils
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Linq.Expressions;
+ using System.Text;
+
+ internal static class Extensions
+ {
+ public static string ToMultiString<T>(this IEnumerable<T> items, string separator)
+ {
+ if (items.Count() == 0)
+ {
+ return "NULL";
+ }
+
+ return String.Join(separator, items);
+ }
+
+ public static string ToMultiString<T>(this IEnumerable<T> items, Expression<Func<T, object>> selector, string separator)
+ {
+ if (items.Count() == 0)
+ {
+ return "NULL";
+ }
+
+ Func<T, object> compiled = selector.Compile();
+ return String.Join(separator, items.Select(compiled));
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Utils
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq.Expressions;
+ using System.Text.RegularExpressions;
+
+ internal static class Guard
+ {
+ /// <summary>
+ /// Checks whether given expression is true. Throws <see cref="InvalidOperationException" /> if not.
+ /// </summary>
+ /// <param name="assertion">
+ /// The assertion.
+ /// </param>
+ /// <exception cref="InvalidOperationException">
+ /// Thrown when condition is not met.
+ /// </exception>
+ public static void Assert(Expression<Func<bool>> assertion)
+ {
+ var compiled = assertion.Compile();
+ var evaluatedValue = compiled();
+ if (!evaluatedValue)
+ {
+ throw new InvalidOperationException(
+ string.Format("'{0}' is not met.", Normalize(assertion.ToString())));
+ }
+ }
+
+ /// <summary>
+ /// Checks whether given expression is true. Throws given exception type if not.
+ /// </summary>
+ /// <typeparam name="TException">
+ /// Type of exception that i thrown when condition is not met.
+ /// </typeparam>
+ /// <param name="assertion">
+ /// The assertion.
+ /// </param>
+ public static void Assert<TException>(Expression<Func<bool>> assertion)
+ where TException : Exception, new()
+ {
+ var compiled = assertion.Compile();
+ var evaluatedValue = compiled();
+ if (!evaluatedValue)
+ {
+ var e = (Exception)Activator.CreateInstance(
+ typeof(TException),
+ new object[] { string.Format("'{0}' is not met.", Normalize(assertion.ToString())) });
+ throw e;
+ }
+ }
+
+ /// <summary>
+ /// Creates string representation of lambda expression with unnecessary information
+ /// stripped out.
+ /// </summary>
+ /// <param name="expression">Lambda expression to process. </param>
+ /// <returns>Normalized string representation. </returns>
+ private static string Normalize(string expression)
+ {
+ var result = expression;
+ var replacements = new Dictionary<Regex, string>()
+ {
+ { new Regex("value\\([^)]*\\)\\."), string.Empty },
+ { new Regex("\\(\\)\\."), string.Empty },
+ { new Regex("\\(\\)\\ =>"), string.Empty },
+ { new Regex("Not"), "!" }
+ };
+
+ foreach (var pattern in replacements)
+ {
+ result = pattern.Key.Replace(result, pattern.Value);
+ }
+
+ result = result.Trim();
+ return result;
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/KafkaScheduler.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/KafkaScheduler.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/KafkaScheduler.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/KafkaScheduler.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Utils
+{
+ using System;
+ using System.Globalization;
+ using System.Reflection;
+ using System.Threading;
+ using log4net;
+
+ /// <summary>
+ /// A scheduler for running jobs in the background
+ /// </summary>
+ internal class KafkaScheduler : IDisposable
+ {
+ public delegate void KafkaSchedulerDelegate();
+
+ private Timer timer;
+
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+ private KafkaSchedulerDelegate methodToRun;
+
+ private volatile bool disposed;
+
+ private readonly object shuttingDownLock = new object();
+
+ public void ScheduleWithRate(KafkaSchedulerDelegate method, long delayMs, long periodMs)
+ {
+ methodToRun = method;
+ TimerCallback tcb = HandleCallback;
+ timer = new Timer(tcb, null, delayMs, periodMs);
+ }
+
+ private void HandleCallback(object o)
+ {
+ methodToRun();
+ }
+
+ public void Dispose()
+ {
+ if (this.disposed)
+ {
+ return;
+ }
+
+ lock (this.shuttingDownLock)
+ {
+ if (this.disposed)
+ {
+ return;
+ }
+
+ this.disposed = true;
+ }
+
+ try
+ {
+ if (timer != null)
+ {
+ timer.Dispose();
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "shutdown scheduler");
+ }
+ }
+ catch (Exception exc)
+ {
+ Logger.Warn("Ignoring unexpected errors on closing", exc);
+ }
+ }
+ }
+}
\ No newline at end of file
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Utils
+{
+ using System;
+ using System.Reflection;
+
+ internal static class ReflectionHelper
+ {
+ public static T Instantiate<T>(string className)
+ where T : class
+ {
+ Type t1;
+ object o1;
+ if (string.IsNullOrEmpty(className))
+ {
+ return default(T);
+ }
+
+ if (className.Contains("`1"))
+ {
+ t1 = Type.GetType(className);
+ var t2 = typeof(T).GetGenericArguments();
+ var t3 = t1.MakeGenericType(t2);
+ o1 = Activator.CreateInstance(t3);
+ return o1 as T;
+ }
+
+ t1 = Type.GetType(className, true);
+ o1 = Activator.CreateInstance(t1);
+ return o1 as T;
+ }
+
+ public static T GetInstanceField<T>(string name, object obj)
+ where T : class
+ {
+ Type type = obj.GetType();
+ FieldInfo info = type.GetField(name, BindingFlags.NonPublic | BindingFlags.Instance);
+ object value = info.GetValue(obj);
+ return (T)value;
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupDirs.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupDirs.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupDirs.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupDirs.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+namespace Kafka.Client.Utils
+{
+ internal class ZKGroupDirs
+ {
+ private readonly string consumersPath = "/consumers";
+
+ public string ConsumerDir
+ {
+ get { return this.consumersPath; }
+ }
+
+ public string ConsumerGroupDir { get; private set; }
+
+ public string ConsumerRegistryDir { get; private set; }
+
+ public ZKGroupDirs(string group)
+ {
+ this.ConsumerGroupDir = this.consumersPath + "/" + group;
+ this.ConsumerRegistryDir = this.ConsumerGroupDir + "/ids";
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupTopicDirs.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupTopicDirs.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupTopicDirs.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupTopicDirs.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Utils
+{
+ internal class ZKGroupTopicDirs : ZKGroupDirs
+ {
+ public string ConsumerOffsetDir { get; private set; }
+
+ public string ConsumerOwnerDir { get; private set; }
+
+ public ZKGroupTopicDirs(string group, string topic) : base(group)
+ {
+ this.ConsumerOffsetDir = this.ConsumerGroupDir + "/offsets/" + topic;
+ this.ConsumerOwnerDir = this.ConsumerGroupDir + "/owners/" + topic;
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZkUtils.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZkUtils.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZkUtils.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZkUtils.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Utils
+{
+ using System.Collections.Generic;
+ using System.Globalization;
+ using System.Reflection;
+ using Kafka.Client.Cluster;
+ using Kafka.Client.ZooKeeperIntegration;
+ using log4net;
+ using ZooKeeperNet;
+
+ internal class ZkUtils
+ {
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+ internal static void UpdatePersistentPath(IZooKeeperClient zkClient, string path, string data)
+ {
+ try
+ {
+ zkClient.WriteData(path, data);
+ }
+ catch (KeeperException.NoNodeException)
+ {
+ CreateParentPath(zkClient, path);
+
+ try
+ {
+ zkClient.CreatePersistent(path, data);
+ }
+ catch (KeeperException.NodeExistsException)
+ {
+ zkClient.WriteData(path, data);
+ }
+ }
+ }
+
+ internal static void CreateParentPath(IZooKeeperClient zkClient, string path)
+ {
+ string parentDir = path.Substring(0, path.LastIndexOf('/'));
+ if (parentDir.Length != 0)
+ {
+ zkClient.CreatePersistent(parentDir, true);
+ }
+ }
+
+ internal static void DeletePath(IZooKeeperClient zkClient, string path)
+ {
+ try
+ {
+ zkClient.Delete(path);
+ }
+ catch (KeeperException.NoNodeException)
+ {
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "{0} deleted during connection loss; this is ok", path);
+ }
+ }
+
+ internal static IDictionary<string, IList<string>> GetPartitionsForTopics(IZooKeeperClient zkClient, IEnumerable<string> topics)
+ {
+ var result = new Dictionary<string, IList<string>>();
+ foreach (string topic in topics)
+ {
+ var partList = new List<string>();
+ var brokers =
+ zkClient.GetChildrenParentMayNotExist(ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic);
+ foreach (var broker in brokers)
+ {
+ var numberOfParts =
+ int.Parse(
+ zkClient.ReadData<string>(ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic + "/" +
+ broker),
+ CultureInfo.CurrentCulture);
+ for (int i = 0; i < numberOfParts; i++)
+ {
+ partList.Add(broker + "-" + i);
+ }
+ }
+
+ partList.Sort();
+ result.Add(topic, partList);
+ }
+
+ return result;
+ }
+
+ internal static void CreateEphemeralPathExpectConflict(IZooKeeperClient zkClient, string path, string data)
+ {
+ try
+ {
+ CreateEphemeralPath(zkClient, path, data);
+ }
+ catch (KeeperException.NodeExistsException)
+ {
+ string storedData;
+ try
+ {
+ storedData = zkClient.ReadData<string>(path);
+ }
+ catch (KeeperException.NoNodeException)
+ {
+ // the node disappeared; treat as if node existed and let caller handles this
+ throw;
+ }
+
+ if (storedData == null || storedData != data)
+ {
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "conflict in {0} data: {1} stored data: {2}", path, data, storedData);
+ throw;
+ }
+ else
+ {
+ // otherwise, the creation succeeded, return normally
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "{0} exits with value {1} during connection loss; this is ok", path, data);
+ }
+ }
+ }
+
+ internal static void CreateEphemeralPath(IZooKeeperClient zkClient, string path, string data)
+ {
+ try
+ {
+ zkClient.CreateEphemeral(path, data);
+ }
+ catch (KeeperException.NoNodeException)
+ {
+ ZkUtils.CreateParentPath(zkClient, path);
+ zkClient.CreateEphemeral(path, data);
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client
+{
+ using Kafka.Client.Cfg;
+
+ /// <summary>
+ /// A base class for all Kafka clients that support ZooKeeper based automatic broker discovery
+ /// </summary>
+ public abstract class ZooKeeperAwareKafkaClientBase : KafkaClientBase
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ZooKeeperAwareKafkaClientBase"/> class.
+ /// </summary>
+ /// <param name="config">The config.</param>
+ protected ZooKeeperAwareKafkaClientBase(ZKConfig config)
+ {
+ this.IsZooKeeperEnabled = config != null && !string.IsNullOrEmpty(config.ZkConnect);
+ }
+
+ /// <summary>
+ /// Gets a value indicating whether ZooKeeper based automatic broker discovery is enabled.
+ /// </summary>
+ /// <value>
+ /// <c>true</c> if this instance is zoo keeper enabled; otherwise, <c>false</c>.
+ /// </value>
+ protected bool IsZooKeeperEnabled { get; private set; }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ChildChangedEventItem.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ChildChangedEventItem.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ChildChangedEventItem.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ChildChangedEventItem.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Events
+{
+ using System.Linq;
+ using log4net;
+
+ /// <summary>
+ /// Represents methods that will handle a ZooKeeper child events
+ /// </summary>
+ internal class ChildChangedEventItem
+ {
+ private readonly ILog logger;
+ private ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperChildChangedEventArgs> childChanged;
+
+ /// <summary>
+ /// Occurs when znode children changes
+ /// </summary>
+ public event ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperChildChangedEventArgs> ChildChanged
+ {
+ add
+ {
+ this.childChanged -= value;
+ this.childChanged += value;
+ }
+
+ remove
+ {
+ this.childChanged -= value;
+ }
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ChildChangedEventItem"/> class.
+ /// </summary>
+ /// <param name="logger">
+ /// The logger.
+ /// </param>
+ /// <remarks>
+ /// Should use external logger to keep same format of all event logs
+ /// </remarks>
+ public ChildChangedEventItem(ILog logger)
+ {
+ this.logger = logger;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ChildChangedEventItem"/> class.
+ /// </summary>
+ /// <param name="logger">
+ /// The logger.
+ /// </param>
+ /// <param name="handler">
+ /// The subscribed handler.
+ /// </param>
+ /// <remarks>
+ /// Should use external logger to keep same format of all event logs
+ /// </remarks>
+ public ChildChangedEventItem(ILog logger, ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperChildChangedEventArgs> handler)
+ {
+ this.logger = logger;
+ this.ChildChanged += handler;
+ }
+
+ /// <summary>
+ /// Invokes subscribed handlers for ZooKeeeper children changes event
+ /// </summary>
+ /// <param name="e">
+ /// The event data.
+ /// </param>
+ public void OnChildChanged(ZooKeeperChildChangedEventArgs e)
+ {
+ var handlers = this.childChanged;
+ if (handlers == null)
+ {
+ return;
+ }
+
+ foreach (var handler in handlers.GetInvocationList())
+ {
+ this.logger.Debug(e + " sent to " + handler.Target);
+ }
+
+ handlers(e);
+ }
+
+ /// <summary>
+ /// Gets the total count of subscribed handlers
+ /// </summary>
+ public int Count
+ {
+ get
+ {
+ return this.childChanged != null ? this.childChanged.GetInvocationList().Count() : 0;
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/DataChangedEventItem.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/DataChangedEventItem.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/DataChangedEventItem.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/DataChangedEventItem.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Events
+{
+ using System.Linq;
+ using log4net;
+
+ /// <summary>
+ /// Represents methods that will handle a ZooKeeper data events
+ /// </summary>
+ internal class DataChangedEventItem
+ {
+ private readonly ILog logger;
+ private ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperDataChangedEventArgs> dataChanged;
+ private ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperDataChangedEventArgs> dataDeleted;
+
+ /// <summary>
+ /// Occurs when znode data changes
+ /// </summary>
+ public event ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperDataChangedEventArgs> DataChanged
+ {
+ add
+ {
+ this.dataChanged -= value;
+ this.dataChanged += value;
+ }
+
+ remove
+ {
+ this.dataChanged -= value;
+ }
+ }
+
+ /// <summary>
+ /// Occurs when znode data deletes
+ /// </summary>
+ public event ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperDataChangedEventArgs> DataDeleted
+ {
+ add
+ {
+ this.dataDeleted -= value;
+ this.dataDeleted += value;
+ }
+
+ remove
+ {
+ this.dataDeleted -= value;
+ }
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="DataChangedEventItem"/> class.
+ /// </summary>
+ /// <param name="logger">
+ /// The logger.
+ /// </param>
+ /// <remarks>
+ /// Should use external logger to keep same format of all event logs
+ /// </remarks>
+ public DataChangedEventItem(ILog logger)
+ {
+ this.logger = logger;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="DataChangedEventItem"/> class.
+ /// </summary>
+ /// <param name="logger">
+ /// The logger.
+ /// </param>
+ /// <param name="changedHandler">
+ /// The changed handler.
+ /// </param>
+ /// <param name="deletedHandler">
+ /// The deleted handler.
+ /// </param>
+ /// <remarks>
+ /// Should use external logger to keep same format of all event logs
+ /// </remarks>
+ public DataChangedEventItem(
+ ILog logger,
+ ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperDataChangedEventArgs> changedHandler,
+ ZooKeeperClient.ZooKeeperEventHandler<ZooKeeperDataChangedEventArgs> deletedHandler)
+ {
+ this.logger = logger;
+ this.DataChanged += changedHandler;
+ this.DataDeleted += deletedHandler;
+ }
+
+ /// <summary>
+ /// Invokes subscribed handlers for ZooKeeeper data changes event
+ /// </summary>
+ /// <param name="e">
+ /// The event data.
+ /// </param>
+ public void OnDataChanged(ZooKeeperDataChangedEventArgs e)
+ {
+ var handlers = this.dataChanged;
+ if (handlers == null)
+ {
+ return;
+ }
+
+ foreach (var handler in handlers.GetInvocationList())
+ {
+ this.logger.Debug(e + " sent to " + handler.Target);
+ }
+
+ handlers(e);
+ }
+
+ /// <summary>
+ /// Invokes subscribed handlers for ZooKeeeper data deletes event
+ /// </summary>
+ /// <param name="e">
+ /// The event data.
+ /// </param>
+ public void OnDataDeleted(ZooKeeperDataChangedEventArgs e)
+ {
+ var handlers = this.dataDeleted;
+ if (handlers == null)
+ {
+ return;
+ }
+
+ foreach (var handler in handlers.GetInvocationList())
+ {
+ this.logger.Debug(e + " sent to " + handler.Target);
+ }
+
+ handlers(e);
+ }
+
+ /// <summary>
+ /// Gets the total count of subscribed handlers
+ /// </summary>
+ public int TotalCount
+ {
+ get
+ {
+ return (this.dataChanged != null ? this.dataChanged.GetInvocationList().Count() : 0) +
+ (this.dataDeleted != null ? this.dataDeleted.GetInvocationList().Count() : 0);
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperChildChangedEventArgs.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperChildChangedEventArgs.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperChildChangedEventArgs.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperChildChangedEventArgs.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Events
+{
+ using System.Collections.Generic;
+
+ /// <summary>
+ /// Contains znode children changed event data
+ /// </summary>
+ internal class ZooKeeperChildChangedEventArgs : ZooKeeperEventArgs
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ZooKeeperChildChangedEventArgs"/> class.
+ /// </summary>
+ /// <param name="path">
+ /// The path.
+ /// </param>
+ public ZooKeeperChildChangedEventArgs(string path)
+ : base("Children of " + path + " changed")
+ {
+ this.Path = path;
+ }
+
+ /// <summary>
+ /// Gets the znode path
+ /// </summary>
+ public string Path { get; private set; }
+
+ /// <summary>
+ /// Gets or sets the current znode children
+ /// </summary>
+ public IList<string> Children { get; set; }
+
+ /// <summary>
+ /// Gets the current event type
+ /// </summary>
+ public override ZooKeeperEventTypes Type
+ {
+ get
+ {
+ return ZooKeeperEventTypes.ChildChanged;
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperDataChangedEventArgs.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperDataChangedEventArgs.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperDataChangedEventArgs.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperDataChangedEventArgs.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Events
+{
+ /// <summary>
+ /// Contains znode data changed event data
+ /// </summary>
+ internal class ZooKeeperDataChangedEventArgs : ZooKeeperEventArgs
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ZooKeeperDataChangedEventArgs"/> class.
+ /// </summary>
+ /// <param name="path">
+ /// The znode path.
+ /// </param>
+ public ZooKeeperDataChangedEventArgs(string path)
+ : base("Data of " + path + " changed")
+ {
+ this.Path = path;
+ }
+
+ /// <summary>
+ /// Gets the znode path
+ /// </summary>
+ public string Path { get; private set; }
+
+ /// <summary>
+ /// Gets or sets znode changed data.
+ /// </summary>
+ /// <remarks>
+ /// Null if data was deleted.
+ /// </remarks>
+ public string Data { get; set; }
+
+ /// <summary>
+ /// Gets the event type.
+ /// </summary>
+ public override ZooKeeperEventTypes Type
+ {
+ get
+ {
+ return ZooKeeperEventTypes.DataChanged;
+ }
+ }
+
+ /// <summary>
+ /// Gets a value indicating whether data was deleted
+ /// </summary>
+ public bool DataDeleted
+ {
+ get
+ {
+ return string.IsNullOrEmpty(this.Data);
+ }
+ }
+
+ /// <summary>
+ /// Gets string representation of event data
+ /// </summary>
+ /// <returns>
+ /// String representation of event data
+ /// </returns>
+ public override string ToString()
+ {
+ if (this.DataDeleted)
+ {
+ return base.ToString().Replace("changed", "deleted");
+ }
+
+ return base.ToString();
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventArgs.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventArgs.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventArgs.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventArgs.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Events
+{
+ using System;
+
+ /// <summary>
+ /// Base class for classes containing ZooKeeper event data
+ /// </summary>
+ internal abstract class ZooKeeperEventArgs : EventArgs
+ {
+ private readonly string description;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ZooKeeperEventArgs"/> class.
+ /// </summary>
+ /// <param name="description">
+ /// The event description.
+ /// </param>
+ protected ZooKeeperEventArgs(string description)
+ {
+ this.description = description;
+ }
+
+ /// <summary>
+ /// Gets string representation of event data
+ /// </summary>
+ /// <returns>
+ /// String representation of event data
+ /// </returns>
+ public override string ToString()
+ {
+ return "ZooKeeperEvent[" + this.description + "]";
+ }
+
+ /// <summary>
+ /// Gets the event type.
+ /// </summary>
+ public abstract ZooKeeperEventTypes Type { get; }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventTypes.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventTypes.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventTypes.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventTypes.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Events
+{
+ /// <summary>
+ /// Event types
+ /// </summary>
+ internal enum ZooKeeperEventTypes
+ {
+ Unknow = 0,
+
+ StateChanged = 1,
+
+ SessionCreated = 2,
+
+ ChildChanged = 3,
+
+ DataChanged = 4,
+ }
+}