You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/21 21:17:25 UTC
svn commit: r1173797 [2/10] - in /incubator/kafka/trunk/clients/csharp: ./
lib/StyleCop/ src/Kafka/ src/Kafka/Kafka.Client/
src/Kafka/Kafka.Client/Cfg/ src/Kafka/Kafka.Client/Cluster/
src/Kafka/Kafka.Client/Consumers/ src/Kafka/Kafka.Client/Exceptions/...
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIterator.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIterator.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIterator.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIterator.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Globalization;
+ using System.Reflection;
+ using Kafka.Client.Exceptions;
+ using Kafka.Client.Messages;
+ using log4net;
+
+ /// <summary>
+ /// An iterator that blocks until a value can be read from the supplied queue.
+ /// </summary>
+ /// <remarks>
+ /// The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
+ /// </remarks>
+ internal class ConsumerIterator : IEnumerator<Message>
+ {
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+ private readonly BlockingCollection<FetchedDataChunk> channel;
+ private readonly int consumerTimeoutMs;
+ private PartitionTopicInfo currentTopicInfo;
+ private ConsumerIteratorState state = ConsumerIteratorState.NotReady;
+ private IEnumerator<Message> current;
+ private Message nextItem;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ConsumerIterator"/> class.
+ /// </summary>
+ /// <param name="channel">
+ /// The queue containing
+ /// </param>
+ /// <param name="consumerTimeoutMs">
+ /// The consumer timeout in ms.
+ /// </param>
+ public ConsumerIterator(BlockingCollection<FetchedDataChunk> channel, int consumerTimeoutMs)
+ {
+ this.channel = channel;
+ this.consumerTimeoutMs = consumerTimeoutMs;
+ }
+
+ /// <summary>
+ /// Gets the element in the collection at the current position of the enumerator.
+ /// </summary>
+ /// <returns>
+ /// The element in the collection at the current position of the enumerator.
+ /// </returns>
+ public Message Current
+ {
+ get
+ {
+ if (!MoveNext())
+ {
+ throw new Exception("No element");
+ }
+
+ state = ConsumerIteratorState.NotReady;
+ if (nextItem != null)
+ {
+ currentTopicInfo.Consumed(MessageSet.GetEntrySize(nextItem));
+ return nextItem;
+ }
+
+ throw new Exception("Expected item but none found.");
+ }
+ }
+
+ /// <summary>
+ /// Gets the current element in the collection.
+ /// </summary>
+ /// <returns>
+ /// The current element in the collection.
+ /// </returns>
+ object IEnumerator.Current
+ {
+ get { return this.Current; }
+ }
+
+ /// <summary>
+ /// Advances the enumerator to the next element of the collection.
+ /// </summary>
+ /// <returns>
+ /// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection.
+ /// </returns>
+ public bool MoveNext()
+ {
+ if (state == ConsumerIteratorState.Failed)
+ {
+ throw new Exception("Iterator is in failed state");
+ }
+
+ switch (state)
+ {
+ case ConsumerIteratorState.Done:
+ return false;
+ case ConsumerIteratorState.Ready:
+ return true;
+ default:
+ return MaybeComputeNext();
+ }
+ }
+
+ /// <summary>
+ /// Resets the enumerator's state to NotReady.
+ /// </summary>
+ public void Reset()
+ {
+ state = ConsumerIteratorState.NotReady;
+ }
+
+ public void Dispose()
+ {
+ }
+
+ private bool MaybeComputeNext()
+ {
+ state = ConsumerIteratorState.Failed;
+ nextItem = this.MakeNext();
+ if (state == ConsumerIteratorState.Done)
+ {
+ return false;
+ }
+
+ state = ConsumerIteratorState.Ready;
+ return true;
+ }
+
+ private Message MakeNext()
+ {
+ if (current == null || !current.MoveNext())
+ {
+ FetchedDataChunk found;
+ if (consumerTimeoutMs < 0)
+ {
+ found = this.channel.Take();
+ }
+ else
+ {
+ bool done = channel.TryTake(out found, consumerTimeoutMs);
+ if (!done)
+ {
+ Logger.Debug("Consumer iterator timing out...");
+ throw new ConsumerTimeoutException();
+ }
+ }
+
+ if (found.Equals(ZookeeperConsumerConnector.ShutdownCommand))
+ {
+ Logger.Debug("Received the shutdown command");
+ channel.Add(found);
+ return this.AllDone();
+ }
+
+ currentTopicInfo = found.TopicInfo;
+ if (currentTopicInfo.GetConsumeOffset() != found.FetchOffset)
+ {
+ Logger.ErrorFormat(
+ CultureInfo.CurrentCulture,
+ "consumed offset: {0} doesn't match fetch offset: {1} for {2}; consumer may lose data",
+ currentTopicInfo.GetConsumeOffset(),
+ found.FetchOffset,
+ currentTopicInfo);
+ currentTopicInfo.ResetConsumeOffset(found.FetchOffset);
+ }
+
+ current = found.Messages.Messages.GetEnumerator();
+ current.MoveNext();
+ }
+
+ return current.Current;
+ }
+
+ private Message AllDone()
+ {
+ this.state = ConsumerIteratorState.Done;
+ return null;
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIteratorState.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIteratorState.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIteratorState.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIteratorState.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+ internal enum ConsumerIteratorState
+ {
+ Done,
+ Ready,
+ NotReady,
+ Failed
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetchedDataChunk.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetchedDataChunk.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetchedDataChunk.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetchedDataChunk.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+ using System;
+ using Kafka.Client.Messages;
+
+ internal class FetchedDataChunk : IEquatable<FetchedDataChunk>
+ {
+ public BufferedMessageSet Messages { get; set; }
+
+ public PartitionTopicInfo TopicInfo { get; set; }
+
+ public long FetchOffset { get; set; }
+
+ public FetchedDataChunk(BufferedMessageSet messages, PartitionTopicInfo topicInfo, long fetchOffset)
+ {
+ this.Messages = messages;
+ this.TopicInfo = topicInfo;
+ this.FetchOffset = fetchOffset;
+ }
+
+ public override bool Equals(object obj)
+ {
+ FetchedDataChunk other = obj as FetchedDataChunk;
+ if (other == null)
+ {
+ return false;
+ }
+ else
+ {
+ return this.Equals(other);
+ }
+ }
+
+ public bool Equals(FetchedDataChunk other)
+ {
+ return this.Messages == other.Messages &&
+ this.TopicInfo == other.TopicInfo &&
+ this.FetchOffset == other.FetchOffset;
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Fetcher.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Fetcher.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Fetcher.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Fetcher.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+ using System;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Reflection;
+ using System.Threading;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Cluster;
+ using Kafka.Client.ZooKeeperIntegration;
+ using log4net;
+
+ /// <summary>
+ /// Background thread that fetches data from a set of servers
+ /// </summary>
+ internal class Fetcher : IDisposable
+ {
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+ private readonly ConsumerConfig config;
+ private readonly IZooKeeperClient zkClient;
+ private FetcherRunnable[] fetcherWorkerObjects;
+ private volatile bool disposed;
+ private readonly object shuttingDownLock = new object();
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Fetcher"/> class.
+ /// </summary>
+ /// <param name="config">
+ /// The consumer configuration.
+ /// </param>
+ /// <param name="zkClient">
+ /// The wrapper above ZooKeeper client.
+ /// </param>
+ public Fetcher(ConsumerConfig config, IZooKeeperClient zkClient)
+ {
+ this.config = config;
+ this.zkClient = zkClient;
+ }
+
+ /// <summary>
+ /// Shuts down all fetch threads
+ /// </summary>
+ private void Shutdown()
+ {
+ if (fetcherWorkerObjects != null)
+ {
+ foreach (FetcherRunnable fetcherRunnable in fetcherWorkerObjects)
+ {
+ fetcherRunnable.Shutdown();
+ }
+
+ fetcherWorkerObjects = null;
+ }
+ }
+
+ /// <summary>
+ /// Opens connections to brokers.
+ /// </summary>
+ /// <param name="topicInfos">
+ /// The topic infos.
+ /// </param>
+ /// <param name="cluster">
+ /// The cluster.
+ /// </param>
+ /// <param name="queuesToBeCleared">
+ /// The queues to be cleared.
+ /// </param>
+ public void InitConnections(IEnumerable<PartitionTopicInfo> topicInfos, Cluster cluster, IEnumerable<BlockingCollection<FetchedDataChunk>> queuesToBeCleared)
+ {
+ this.EnsuresNotDisposed();
+ this.Shutdown();
+ if (topicInfos == null)
+ {
+ return;
+ }
+
+ foreach (var queueToBeCleared in queuesToBeCleared)
+ {
+ while (queueToBeCleared.Count > 0)
+ {
+ queueToBeCleared.Take();
+ }
+ }
+
+ var partitionTopicInfoMap = new Dictionary<int, List<PartitionTopicInfo>>();
+
+ //// re-arrange by broker id
+ foreach (var topicInfo in topicInfos)
+ {
+ if (!partitionTopicInfoMap.ContainsKey(topicInfo.BrokerId))
+ {
+ partitionTopicInfoMap.Add(topicInfo.BrokerId, new List<PartitionTopicInfo>() { topicInfo });
+ }
+ else
+ {
+ partitionTopicInfoMap[topicInfo.BrokerId].Add(topicInfo);
+ }
+ }
+
+ //// open a new fetcher thread for each broker
+ fetcherWorkerObjects = new FetcherRunnable[partitionTopicInfoMap.Count];
+ int i = 0;
+ foreach (KeyValuePair<int, List<PartitionTopicInfo>> item in partitionTopicInfoMap)
+ {
+ Broker broker = cluster.GetBroker(item.Key);
+ var fetcherRunnable = new FetcherRunnable("FetcherRunnable-" + i, zkClient, config, broker, item.Value);
+ var threadStart = new ThreadStart(fetcherRunnable.Run);
+ var fetcherThread = new Thread(threadStart);
+ fetcherWorkerObjects[i] = fetcherRunnable;
+ fetcherThread.Start();
+ i++;
+ }
+ }
+
+ public void Dispose()
+ {
+ if (this.disposed)
+ {
+ return;
+ }
+
+ lock (this.shuttingDownLock)
+ {
+ if (this.disposed)
+ {
+ return;
+ }
+
+ this.disposed = true;
+ }
+
+ try
+ {
+ this.Shutdown();
+ }
+ catch (Exception exc)
+ {
+ Logger.Warn("Ignoring unexpected errors on closing", exc);
+ }
+ }
+
+ /// <summary>
+ /// Ensures that object was not disposed
+ /// </summary>
+ private void EnsuresNotDisposed()
+ {
+ if (this.disposed)
+ {
+ throw new ObjectDisposedException(this.GetType().Name);
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetcherRunnable.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetcherRunnable.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetcherRunnable.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetcherRunnable.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Globalization;
+ using System.Linq;
+ using System.Reflection;
+ using System.Threading;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Cluster;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Requests;
+ using Kafka.Client.Utils;
+ using Kafka.Client.ZooKeeperIntegration;
+ using log4net;
+
+ /// <summary>
+ /// Background thread worker class that is used to fetch data from a single broker
+ /// </summary>
+ internal class FetcherRunnable
+ {
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+ private readonly string name;
+
+ private readonly IZooKeeperClient zkClient;
+
+ private readonly ConsumerConfig config;
+
+ private readonly Broker broker;
+
+ private readonly IList<PartitionTopicInfo> partitionTopicInfos;
+
+ private readonly IConsumer simpleConsumer;
+
+ private bool shouldStop;
+
+ internal FetcherRunnable(string name, IZooKeeperClient zkClient, ConsumerConfig config, Broker broker, List<PartitionTopicInfo> partitionTopicInfos)
+ {
+ this.name = name;
+ this.zkClient = zkClient;
+ this.config = config;
+ this.broker = broker;
+ this.partitionTopicInfos = partitionTopicInfos;
+ this.simpleConsumer = new Consumer(this.config);
+ }
+
+ /// <summary>
+ /// Method to be used for starting a new thread
+ /// </summary>
+ internal void Run()
+ {
+ foreach (var partitionTopicInfo in partitionTopicInfos)
+ {
+ Logger.InfoFormat(
+ CultureInfo.CurrentCulture,
+ "{0} start fetching topic: {1} part: {2} offset: {3} from {4}:{5}",
+ this.name,
+ partitionTopicInfo.Topic,
+ partitionTopicInfo.Partition.PartId,
+ partitionTopicInfo.GetFetchOffset(),
+ this.broker.Host,
+ this.broker.Port);
+ }
+
+ try
+ {
+ while (!this.shouldStop)
+ {
+ var requestList = new List<FetchRequest>();
+ foreach (var partitionTopicInfo in this.partitionTopicInfos)
+ {
+ var singleRequest = new FetchRequest(partitionTopicInfo.Topic, partitionTopicInfo.Partition.PartId, partitionTopicInfo.GetFetchOffset(), this.config.FetchSize);
+ requestList.Add(singleRequest);
+ }
+
+ Logger.Debug("Fetch request: " + string.Join(", ", requestList.Select(x => x.ToString())));
+ var request = new MultiFetchRequest(requestList);
+ var response = this.simpleConsumer.MultiFetch(request);
+ int read = 0;
+ var items = this.partitionTopicInfos.Zip(
+ response,
+ (x, y) =>
+ new Tuple<PartitionTopicInfo, BufferedMessageSet>(x, y));
+ foreach (Tuple<PartitionTopicInfo, BufferedMessageSet> item in items)
+ {
+ BufferedMessageSet messages = item.Item2;
+ PartitionTopicInfo info = item.Item1;
+ try
+ {
+ bool done = false;
+ if (messages.ErrorCode == ErrorMapping.OffsetOutOfRangeCode)
+ {
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "offset {0} out of range", info.GetFetchOffset());
+ //// see if we can fix this error
+ var resetOffset = this.ResetConsumerOffsets(info.Topic, info.Partition);
+ if (resetOffset >= 0)
+ {
+ info.ResetFetchOffset(resetOffset);
+ info.ResetConsumeOffset(resetOffset);
+ done = true;
+ }
+ }
+
+ if (!done)
+ {
+ read += info.Add(messages, info.GetFetchOffset());
+ }
+ }
+ catch (Exception ex)
+ {
+ if (!shouldStop)
+ {
+ Logger.ErrorFormat(CultureInfo.CurrentCulture, "error in FetcherRunnable for {0}" + info, ex);
+ }
+
+ throw;
+ }
+ }
+
+ Logger.Info("Fetched bytes: " + read);
+ if (read == 0)
+ {
+ Logger.DebugFormat(CultureInfo.CurrentCulture, "backing off {0} ms", this.config.BackOffIncrementMs);
+ Thread.Sleep(this.config.BackOffIncrementMs);
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ if (shouldStop)
+ {
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "FetcherRunnable {0} interrupted", this);
+ }
+ else
+ {
+ Logger.ErrorFormat(CultureInfo.CurrentCulture, "error in FetcherRunnable {0}", ex);
+ }
+ }
+
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "stopping fetcher {0} to host {1}", this.name, this.broker.Host);
+ }
+
+ internal void Shutdown()
+ {
+ this.shouldStop = true;
+ }
+
+ private long ResetConsumerOffsets(string topic, Partition partition)
+ {
+ long offset;
+ switch (this.config.AutoOffsetReset)
+ {
+ case OffsetRequest.SmallestTime:
+ offset = OffsetRequest.EarliestTime;
+ break;
+ case OffsetRequest.LargestTime:
+ offset = OffsetRequest.LatestTime;
+ break;
+ default:
+ return -1;
+ }
+
+ var request = new OffsetRequest(topic, partition.PartId, offset, 1);
+ var offsets = this.simpleConsumer.GetOffsetsBefore(request);
+ var topicDirs = new ZKGroupTopicDirs(this.config.GroupId, topic);
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "updating partition {0} with {1} offset {2}", partition.Name, offset == OffsetRequest.EarliestTime ? "earliest" : "latest", offsets[0]);
+ ZkUtils.UpdatePersistentPath(this.zkClient, topicDirs.ConsumerOffsetDir + "/" + partition.Name, offsets[0].ToString());
+
+ return offsets[0];
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+ using System.Collections.Generic;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Requests;
+
+ /// <summary>
+ /// The low-level API of consumer of Kafka messages
+ /// </summary>
+ /// <remarks>
+ /// Maintains a connection to a single broker and has a close correspondence
+ /// to the network requests sent to the server.
+ /// </remarks>
+ public interface IConsumer
+ {
+ /// <summary>
+ /// Gets the server to which the connection is to be established.
+ /// </summary>
+ string Host { get; }
+
+ /// <summary>
+ /// Gets the port to which the connection is to be established.
+ /// </summary>
+ int Port { get; }
+
+ /// <summary>
+ /// Fetch a set of messages from a topic.
+ /// </summary>
+ /// <param name="request">
+ /// Specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
+ /// </param>
+ /// <returns>
+ /// A set of fetched messages.
+ /// </returns>
+ /// <remarks>
+ /// Offset is passed in on every request, allowing the user to maintain this metadata
+ /// however they choose.
+ /// </remarks>
+ BufferedMessageSet Fetch(FetchRequest request);
+
+ /// <summary>
+ /// Combine multiple fetch requests in one call.
+ /// </summary>
+ /// <param name="request">
+ /// The list of fetch requests.
+ /// </param>
+ /// <returns>
+ /// A list of sets of fetched messages.
+ /// </returns>
+ /// <remarks>
+ /// Offset is passed in on every request, allowing the user to maintain this metadata
+ /// however they choose.
+ /// </remarks>
+ IList<BufferedMessageSet> MultiFetch(MultiFetchRequest request);
+
+ /// <summary>
+ /// Gets a list of valid offsets (up to maxSize) before the given time.
+ /// </summary>
+ /// <param name="request">
+ /// The offset request.
+ /// </param>
+ /// <returns>
+ /// The list of offsets, in descending order.
+ /// </returns>
+ IList<long> GetOffsetsBefore(OffsetRequest request);
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumerConnector.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumerConnector.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumerConnector.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumerConnector.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+ using System;
+ using System.Collections.Generic;
+
+ /// <summary>
+ /// The consumer high-level API, that hides the details of brokers from the consumer
+ /// It also maintains the state of what has been consumed.
+ /// </summary>
+ public interface IConsumerConnector : IDisposable
+ {
+ /// <summary>
+ /// Creates a list of message streams for each topic.
+ /// </summary>
+ /// <param name="topicCountDict">
+ /// The map of topic on number of streams
+ /// </param>
+ /// <returns>
+ /// The list of <see cref="KafkaMessageStream"/>, which are iterators over topic.
+ /// </returns>
+ IDictionary<string, IList<KafkaMessageStream>> CreateMessageStreams(IDictionary<string, int> topicCountDict);
+
+ /// <summary>
+ /// Commits the offsets of all messages consumed so far.
+ /// </summary>
+ void CommitOffsets();
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/KafkaMessageStream.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/KafkaMessageStream.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/KafkaMessageStream.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/KafkaMessageStream.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+ using System.Collections;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using Kafka.Client.Messages;
+
+ /// <summary>
+ /// This class is a thread-safe IEnumerable of <see cref="Message"/> that can be enumerated to get messages.
+ /// </summary>
+ public class KafkaMessageStream : IEnumerable<Message>
+ {
+ private readonly BlockingCollection<FetchedDataChunk> queue;
+
+ private readonly int consumerTimeoutMs;
+
+ private readonly ConsumerIterator iterator;
+
+ internal KafkaMessageStream(BlockingCollection<FetchedDataChunk> queue, int consumerTimeoutMs)
+ {
+ this.consumerTimeoutMs = consumerTimeoutMs;
+ this.queue = queue;
+ this.iterator = new ConsumerIterator(this.queue, this.consumerTimeoutMs);
+ }
+
+ public IEnumerator<Message> GetEnumerator()
+ {
+ return this.iterator;
+ }
+
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return this.GetEnumerator();
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/PartitionTopicInfo.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/PartitionTopicInfo.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/PartitionTopicInfo.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/PartitionTopicInfo.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+ using System.Collections.Concurrent;
+ using System.Globalization;
+ using System.Reflection;
+ using System.Threading;
+ using Kafka.Client.Cluster;
+ using Kafka.Client.Messages;
+ using log4net;
+
+ /// <summary>
+ /// Represents topic in brokers's partition.
+ /// </summary>
+ internal class PartitionTopicInfo
+ {
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+ private readonly object consumedOffsetLock = new object();
+
+ private readonly object fetchedOffsetLock = new object();
+
+ private readonly BlockingCollection<FetchedDataChunk> chunkQueue;
+
+ private long consumedOffset;
+
+ private long fetchedOffset;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="PartitionTopicInfo"/> class.
+ /// </summary>
+ /// <param name="topic">
+ /// The topic.
+ /// </param>
+ /// <param name="brokerId">
+ /// The broker ID.
+ /// </param>
+ /// <param name="partition">
+ /// The broker's partition.
+ /// </param>
+ /// <param name="chunkQueue">
+ /// The chunk queue.
+ /// </param>
+ /// <param name="consumedOffset">
+ /// The consumed offset value.
+ /// </param>
+ /// <param name="fetchedOffset">
+ /// The fetched offset value.
+ /// </param>
+ /// <param name="fetchSize">
+ /// The fetch size.
+ /// </param>
+ public PartitionTopicInfo(
+ string topic,
+ int brokerId,
+ Partition partition,
+ BlockingCollection<FetchedDataChunk> chunkQueue,
+ long consumedOffset,
+ long fetchedOffset,
+ int fetchSize)
+ {
+ this.Topic = topic;
+ this.Partition = partition;
+ this.chunkQueue = chunkQueue;
+ this.BrokerId = brokerId;
+ this.consumedOffset = consumedOffset;
+ this.fetchedOffset = fetchedOffset;
+ this.FetchSize = fetchSize;
+ if (Logger.IsDebugEnabled)
+ {
+ Logger.DebugFormat(
+ CultureInfo.CurrentCulture, "initial consumer offset of {0} is {1}", this, consumedOffset);
+ Logger.DebugFormat(
+ CultureInfo.CurrentCulture, "initial fetch offset of {0} is {1}", this, fetchedOffset);
+ }
+ }
+
+ /// <summary>
+ /// Gets broker ID.
+ /// </summary>
+ public int BrokerId { get; private set; }
+
+ /// <summary>
+ /// Gets the fetch size.
+ /// </summary>
+ public int FetchSize { get; private set; }
+
+ /// <summary>
+ /// Gets the partition.
+ /// </summary>
+ public Partition Partition { get; private set; }
+
+ /// <summary>
+ /// Gets the topic.
+ /// </summary>
+ public string Topic { get; private set; }
+
+ /// <summary>
+ /// Records the given number of bytes as having been consumed
+ /// </summary>
+ /// <param name="messageSize">
+ /// The message size.
+ /// </param>
+ public void Consumed(int messageSize)
+ {
+ long newOffset;
+ lock (this.consumedOffsetLock)
+ {
+ this.consumedOffset += messageSize;
+ newOffset = this.consumedOffset;
+ }
+
+ if (Logger.IsDebugEnabled)
+ {
+ Logger.DebugFormat(
+ CultureInfo.CurrentCulture, "updated consume offset of {0} to {1}", this, newOffset);
+ }
+ }
+
+ public int Add(BufferedMessageSet messages, long fetchOffset)
+ {
+ int size = messages.SetSize;
+ if (size > 0)
+ {
+ long newOffset = Interlocked.Add(ref this.fetchedOffset, size);
+ Logger.Debug("Updated fetch offset of " + this + " to " + newOffset);
+ this.chunkQueue.Add(new FetchedDataChunk(messages, this, fetchOffset));
+ }
+
+ return size;
+ }
+
+ public long GetConsumeOffset()
+ {
+ lock (this.consumedOffsetLock)
+ {
+ return this.consumedOffset;
+ }
+ }
+
+ public long GetFetchOffset()
+ {
+ lock (this.fetchedOffsetLock)
+ {
+ return this.fetchedOffset;
+ }
+ }
+
+ public void ResetConsumeOffset(long newConsumeOffset)
+ {
+ lock (this.consumedOffsetLock)
+ {
+ this.consumedOffset = newConsumeOffset;
+ }
+
+ if (Logger.IsDebugEnabled)
+ {
+ Logger.DebugFormat(
+ CultureInfo.CurrentCulture, "reset consume offset of {0} to {1}", this, newConsumeOffset);
+ }
+ }
+
+ public void ResetFetchOffset(long newFetchOffset)
+ {
+ lock (this.fetchedOffsetLock)
+ {
+ this.fetchedOffset = newFetchOffset;
+ }
+
+ if (Logger.IsDebugEnabled)
+ {
+ Logger.DebugFormat(
+ CultureInfo.CurrentCulture, "reset fetch offset of {0} to {1}", this, newFetchOffset);
+ }
+ }
+
+ public override string ToString()
+ {
+ return this.Topic + ":" + this.Partition;
+ }
+ }
+}
\ No newline at end of file
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/TopicCount.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/TopicCount.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/TopicCount.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/TopicCount.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Globalization;
+ using System.Reflection;
+ using System.Text;
+ using System.Web.Script.Serialization;
+ using log4net;
+
+ internal class TopicCount
+ {
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+ private readonly IDictionary<string, int> topicCountMap;
+ private readonly string consumerIdString;
+
+ public TopicCount(string consumerIdString, IDictionary<string, int> topicCountMap)
+ {
+ this.topicCountMap = topicCountMap;
+ this.consumerIdString = consumerIdString;
+ }
+
+ public static TopicCount ConstructTopicCount(string consumerIdString, string json)
+ {
+ Dictionary<string, int> result = null;
+ var ser = new JavaScriptSerializer();
+ try
+ {
+ result = ser.Deserialize<Dictionary<string, int>>(json);
+ }
+ catch (Exception ex)
+ {
+ Logger.ErrorFormat(CultureInfo.CurrentCulture, "error parsing consumer json string {0}. {1}", json, ex);
+ }
+
+ return new TopicCount(consumerIdString, result);
+ }
+
+ public IDictionary<string, IList<string>> GetConsumerThreadIdsPerTopic()
+ {
+ var result = new Dictionary<string, IList<string>>();
+ foreach (KeyValuePair<string, int> item in topicCountMap)
+ {
+ var consumerSet = new List<string>();
+ for (int i = 0; i < item.Value; i++)
+ {
+ consumerSet.Add(consumerIdString + "-" + i);
+ }
+
+ result.Add(item.Key, consumerSet);
+ }
+
+ return result;
+ }
+
+ public override bool Equals(object obj)
+ {
+ var o = obj as TopicCount;
+ if (o != null)
+ {
+ return this.consumerIdString == o.consumerIdString && this.topicCountMap == o.topicCountMap;
+ }
+
+ return false;
+ }
+
+ /*
+ return json of
+ { "topic1" : 4,
+ "topic2" : 4
+ }
+ */
+ public string ToJsonString()
+ {
+ var sb = new StringBuilder();
+ sb.Append("{ ");
+ int i = 0;
+ foreach (KeyValuePair<string, int> entry in this.topicCountMap)
+ {
+ if (i > 0)
+ {
+ sb.Append(",");
+ }
+
+ sb.Append("\"" + entry.Key + "\": " + entry.Value);
+ i++;
+ }
+
+ sb.Append(" }");
+ return sb.ToString();
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ZookeeperConsumerConnector.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ZookeeperConsumerConnector.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ZookeeperConsumerConnector.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ZookeeperConsumerConnector.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,322 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+ using System;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Globalization;
+ using System.Reflection;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Cluster;
+ using Kafka.Client.Utils;
+ using Kafka.Client.ZooKeeperIntegration;
+ using Kafka.Client.ZooKeeperIntegration.Listeners;
+ using log4net;
+
+ /// <summary>
+ /// The consumer high-level API, that hides the details of brokers from the consumer.
+ /// It also maintains the state of what has been consumed.
+ /// </summary>
+ public class ZookeeperConsumerConnector : ZooKeeperAwareKafkaClientBase, IConsumerConnector
+ {
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+ public static readonly int MaxNRetries = 4;
+
+ internal static readonly FetchedDataChunk ShutdownCommand = new FetchedDataChunk(null, null, -1);
+
+ private readonly ConsumerConfig config;
+
+ private IZooKeeperClient zkClient;
+
+ private readonly object shuttingDownLock = new object();
+
+ private readonly bool enableFetcher;
+
+ private Fetcher fetcher;
+
+ private readonly KafkaScheduler scheduler = new KafkaScheduler();
+
+ private readonly IDictionary<string, IDictionary<Partition, PartitionTopicInfo>> topicRegistry = new ConcurrentDictionary<string, IDictionary<Partition, PartitionTopicInfo>>();
+
+ private readonly IDictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>> queues = new Dictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>>();
+
+ private readonly object syncLock = new object();
+
+ private volatile bool disposed;
+
+ /// <summary>
+ /// Gets the consumer group ID.
+ /// </summary>
+ public string ConsumerGroup
+ {
+ get { return this.config.GroupId; }
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ZookeeperConsumerConnector"/> class.
+ /// </summary>
+ /// <param name="config">
+ /// The consumer configuration. At the minimum, need to specify the group ID
+ /// of the consumer and the ZooKeeper connection string.
+ /// </param>
+ /// <param name="enableFetcher">
+ /// Indicates whether fetchers should be enabled
+ /// </param>
+ public ZookeeperConsumerConnector(ConsumerConfig config, bool enableFetcher)
+ : base(config)
+ {
+ this.config = config;
+ this.enableFetcher = enableFetcher;
+ this.ConnectZk();
+ this.CreateFetcher();
+
+ if (this.config.AutoCommit)
+ {
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "starting auto committer every {0} ms", this.config.AutoCommitIntervalMs);
+ scheduler.ScheduleWithRate(this.AutoCommit, this.config.AutoCommitIntervalMs, this.config.AutoCommitIntervalMs);
+ }
+ }
+
+ /// <summary>
+ /// Commits the offsets of all messages consumed so far.
+ /// </summary>
+ public void CommitOffsets()
+ {
+ this.EnsuresNotDisposed();
+ if (this.zkClient == null)
+ {
+ return;
+ }
+
+ foreach (KeyValuePair<string, IDictionary<Partition, PartitionTopicInfo>> topic in topicRegistry)
+ {
+ var topicDirs = new ZKGroupTopicDirs(this.config.GroupId, topic.Key);
+ foreach (KeyValuePair<Partition, PartitionTopicInfo> partition in topic.Value)
+ {
+ var newOffset = partition.Value.GetConsumeOffset();
+ try
+ {
+ ZkUtils.UpdatePersistentPath(zkClient, topicDirs.ConsumerOffsetDir + "/" + partition.Value.Partition.Name, newOffset.ToString());
+ }
+ catch (Exception ex)
+ {
+ Logger.WarnFormat(CultureInfo.CurrentCulture, "exception during CommitOffsets: {0}", ex);
+ }
+
+ if (Logger.IsDebugEnabled)
+ {
+ Logger.DebugFormat(CultureInfo.CurrentCulture, "Commited offset {0} for topic {1}", newOffset, partition);
+ }
+ }
+ }
+ }
+
+ public void AutoCommit()
+ {
+ this.EnsuresNotDisposed();
+ try
+ {
+ this.CommitOffsets();
+ }
+ catch (Exception ex)
+ {
+ Logger.ErrorFormat(CultureInfo.CurrentCulture, "exception during AutoCommit: {0}", ex);
+ }
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (!disposing)
+ {
+ return;
+ }
+
+ if (this.disposed)
+ {
+ return;
+ }
+
+ lock (this.shuttingDownLock)
+ {
+ if (this.disposed)
+ {
+ return;
+ }
+
+ Logger.Info("ZookeeperConsumerConnector shutting down");
+ this.disposed = true;
+ }
+
+ try
+ {
+ if (this.scheduler != null)
+ {
+ this.scheduler.Dispose();
+ }
+
+ if (this.fetcher != null)
+ {
+ this.fetcher.Dispose();
+ }
+
+ this.SendShutdownToAllQueues();
+ if (this.zkClient != null)
+ {
+ this.zkClient.Dispose();
+ }
+ }
+ catch (Exception exc)
+ {
+ Logger.Debug("Ignoring unexpected errors on shutting down", exc);
+ }
+
+ Logger.Info("ZookeeperConsumerConnector shut down completed");
+ }
+
+ /// <summary>
+ /// Creates a list of message streams for each topic.
+ /// </summary>
+ /// <param name="topicCountDict">
+ /// The map of topic on number of streams
+ /// </param>
+ /// <returns>
+ /// The list of <see cref="KafkaMessageStream"/>, which are iterators over topic.
+ /// </returns>
+ /// <remarks>
+ /// Explicitly triggers load balancing for this consumer
+ /// </remarks>
+ public IDictionary<string, IList<KafkaMessageStream>> CreateMessageStreams(IDictionary<string, int> topicCountDict)
+ {
+ this.EnsuresNotDisposed();
+ return this.Consume(topicCountDict);
+ }
+
+ private void ConnectZk()
+ {
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "Connecting to zookeeper instance at {0}", this.config.ZkConnect);
+ this.zkClient = new ZooKeeperClient(this.config.ZkConnect, this.config.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer);
+ this.zkClient.Connect();
+ }
+
+ private void CreateFetcher()
+ {
+ if (this.enableFetcher)
+ {
+ this.fetcher = new Fetcher(this.config, this.zkClient);
+ }
+ }
+
+ private IDictionary<string, IList<KafkaMessageStream>> Consume(IDictionary<string, int> topicCountDict)
+ {
+ Logger.Debug("entering consume");
+
+ if (topicCountDict == null)
+ {
+ throw new ArgumentNullException();
+ }
+
+ var dirs = new ZKGroupDirs(this.config.GroupId);
+ var result = new Dictionary<string, IList<KafkaMessageStream>>();
+
+ string consumerUuid = Environment.MachineName + "-" + DateTime.Now.Millisecond;
+ string consumerIdString = this.config.GroupId + "_" + consumerUuid;
+ var topicCount = new TopicCount(consumerIdString, topicCountDict);
+
+ // listener to consumer and partition changes
+ var loadBalancerListener = new ZKRebalancerListener(
+ this.config,
+ consumerIdString,
+ this.topicRegistry,
+ this.zkClient,
+ this,
+ queues,
+ this.fetcher,
+ this.syncLock);
+ this.RegisterConsumerInZk(dirs, consumerIdString, topicCount);
+ this.zkClient.Subscribe(dirs.ConsumerRegistryDir, loadBalancerListener);
+
+ //// create a queue per topic per consumer thread
+ var consumerThreadIdsPerTopicMap = topicCount.GetConsumerThreadIdsPerTopic();
+ foreach (var topic in consumerThreadIdsPerTopicMap.Keys)
+ {
+ var streamList = new List<KafkaMessageStream>();
+ foreach (string threadId in consumerThreadIdsPerTopicMap[topic])
+ {
+ var stream = new BlockingCollection<FetchedDataChunk>(new ConcurrentQueue<FetchedDataChunk>());
+ this.queues.Add(new Tuple<string, string>(topic, threadId), stream);
+ streamList.Add(new KafkaMessageStream(stream, this.config.Timeout));
+ }
+
+ result.Add(topic, streamList);
+ Logger.DebugFormat(CultureInfo.CurrentCulture, "adding topic {0} and stream to map...", topic);
+
+ // register on broker partition path changes
+ string partitionPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic;
+ this.zkClient.MakeSurePersistentPathExists(partitionPath);
+ this.zkClient.Subscribe(partitionPath, loadBalancerListener);
+ }
+
+ //// register listener for session expired event
+ this.zkClient.Subscribe(new ZKSessionExpireListener(dirs, consumerIdString, topicCount, loadBalancerListener, this));
+
+ //// explicitly trigger load balancing for this consumer
+ lock (this.syncLock)
+ {
+ loadBalancerListener.SyncedRebalance();
+ }
+
+ return result;
+ }
+
+ private void SendShutdownToAllQueues()
+ {
+ foreach (var queue in this.queues)
+ {
+ Logger.Debug("Clearing up queue");
+ //// clear the queue
+ while (queue.Value.Count > 0)
+ {
+ queue.Value.Take();
+ }
+
+ queue.Value.Add(ShutdownCommand);
+ Logger.Debug("Cleared queue and sent shutdown command");
+ }
+ }
+
+ internal void RegisterConsumerInZk(ZKGroupDirs dirs, string consumerIdString, TopicCount topicCount)
+ {
+ this.EnsuresNotDisposed();
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "begin registering consumer {0} in ZK", consumerIdString);
+ ZkUtils.CreateEphemeralPathExpectConflict(this.zkClient, dirs.ConsumerRegistryDir + "/" + consumerIdString, topicCount.ToJsonString());
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "end registering consumer {0} in ZK", consumerIdString);
+ }
+
+ /// <summary>
+ /// Ensures that object was not disposed
+ /// </summary>
+ private void EnsuresNotDisposed()
+ {
+ if (this.disposed)
+ {
+ throw new ObjectDisposedException(this.GetType().Name);
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ConsumerTimeoutException.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ConsumerTimeoutException.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ConsumerTimeoutException.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ConsumerTimeoutException.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,11 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Kafka.Client.Exceptions
+{
+ public class ConsumerTimeoutException : Exception
+ {
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/KafkaException.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/KafkaException.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/KafkaException.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/KafkaException.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Exceptions
+{
+ using System;
+
+ /// <summary>
+ /// A wrapping of an error code returned from Kafka.
+ /// </summary>
+ public class KafkaException : Exception
+ {
+ /// <summary>
+ /// No error occurred.
+ /// </summary>
+ public const int NoError = 0;
+
+ /// <summary>
+ /// The offset requested was out of range.
+ /// </summary>
+ public const int OffsetOutOfRangeCode = 1;
+
+ /// <summary>
+ /// The message was invalid.
+ /// </summary>
+ public const int InvalidMessageCode = 2;
+
+ /// <summary>
+ /// The wrong partition.
+ /// </summary>
+ public const int WrongPartitionCode = 3;
+
+ /// <summary>
+ /// Invalid message size.
+ /// </summary>
+ public const int InvalidRetchSizeCode = 4;
+
+ public KafkaException()
+ {
+ ErrorCode = NoError;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the KafkaException class.
+ /// </summary>
+ /// <param name="errorCode">The error code generated by a request to Kafka.</param>
+ public KafkaException(int errorCode) : base(GetMessage(errorCode))
+ {
+ ErrorCode = errorCode;
+ }
+
+ /// <summary>
+ /// Gets the error code that was sent from Kafka.
+ /// </summary>
+ public int ErrorCode { get; private set; }
+
+ /// <summary>
+ /// Gets the message for the exception based on the Kafka error code.
+ /// </summary>
+ /// <param name="errorCode">The error code from Kafka.</param>
+ /// <returns>A string message representation </returns>
+ private static string GetMessage(int errorCode)
+ {
+ if (errorCode == OffsetOutOfRangeCode)
+ {
+ return "Offset out of range";
+ }
+ else if (errorCode == InvalidMessageCode)
+ {
+ return "Invalid message";
+ }
+ else if (errorCode == WrongPartitionCode)
+ {
+ return "Wrong partition";
+ }
+ else if (errorCode == InvalidRetchSizeCode)
+ {
+ return "Invalid message size";
+ }
+ else
+ {
+ return "Unknown error";
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/MessageSizeTooLargeException.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/MessageSizeTooLargeException.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/MessageSizeTooLargeException.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/MessageSizeTooLargeException.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,8 @@
+namespace Kafka.Client.Exceptions
+{
+ using System;
+
+ public class MessageSizeTooLargeException : Exception
+ {
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZKRebalancerException.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZKRebalancerException.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZKRebalancerException.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZKRebalancerException.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,19 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Kafka.Client.Exceptions
+{
+ public class ZKRebalancerException : Exception
+ {
+ public ZKRebalancerException()
+ {
+ }
+
+ public ZKRebalancerException(string message)
+ : base(message)
+ {
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperException.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperException.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperException.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperException.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,28 @@
+namespace Kafka.Client.Exceptions
+{
+ using System;
+ using System.Runtime.Serialization;
+
+ [Serializable]
+ public class ZooKeeperException : Exception
+ {
+ public ZooKeeperException()
+ {
+ }
+
+ public ZooKeeperException(string message)
+ : base(message)
+ {
+ }
+
+ public ZooKeeperException(string message, Exception exc)
+ : base(message, exc)
+ {
+ }
+
+ protected ZooKeeperException(SerializationInfo info, StreamingContext context)
+ : base(info, context)
+ {
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperTimeoutException.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperTimeoutException.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperTimeoutException.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperTimeoutException.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,17 @@
+namespace Kafka.Client.Exceptions
+{
+ using System;
+
+ public class ZooKeeperTimeoutException : Exception
+ {
+ public ZooKeeperTimeoutException()
+ : base("Unable to connect to zookeeper server within timeout: unknown value")
+ {
+ }
+
+ public ZooKeeperTimeoutException(int connectionTimeout)
+ : base("Unable to connect to zookeeper server within timeout: " + connectionTimeout)
+ {
+ }
+ }
+}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj Wed Sep 21 19:17:19 2011
@@ -1,69 +1,230 @@
-<?xml version="1.0" encoding="utf-8"?>
-<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
- <PropertyGroup>
- <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
- <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
- <ProductVersion>8.0.30703</ProductVersion>
- <SchemaVersion>2.0</SchemaVersion>
- <ProjectGuid>{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}</ProjectGuid>
- <OutputType>Library</OutputType>
- <AppDesignerFolder>Properties</AppDesignerFolder>
- <RootNamespace>Kafka.Client</RootNamespace>
- <AssemblyName>Kafka.Client</AssemblyName>
- <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
- <FileAlignment>512</FileAlignment>
- </PropertyGroup>
- <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
- <DebugSymbols>true</DebugSymbols>
- <DebugType>full</DebugType>
- <Optimize>false</Optimize>
- <OutputPath>bin\Debug\</OutputPath>
- <DefineConstants>DEBUG;TRACE</DefineConstants>
- <ErrorReport>prompt</ErrorReport>
- <WarningLevel>4</WarningLevel>
- </PropertyGroup>
- <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
- <DebugType>pdbonly</DebugType>
- <Optimize>true</Optimize>
- <OutputPath>bin\Release\</OutputPath>
- <DefineConstants>TRACE</DefineConstants>
- <ErrorReport>prompt</ErrorReport>
- <WarningLevel>4</WarningLevel>
- </PropertyGroup>
- <ItemGroup>
- <Reference Include="System" />
- <Reference Include="System.Core" />
- <Reference Include="System.Xml.Linq" />
- <Reference Include="System.Data.DataSetExtensions" />
- <Reference Include="Microsoft.CSharp" />
- <Reference Include="System.Data" />
- <Reference Include="System.Xml" />
- </ItemGroup>
- <ItemGroup>
- <Compile Include="AbstractRequest.cs" />
- <Compile Include="Consumer.cs" />
- <Compile Include="KafkaException.cs" />
- <Compile Include="RequestContext.cs" />
- <Compile Include="Request\FetchRequest.cs" />
- <Compile Include="Request\MultiFetchRequest.cs" />
- <Compile Include="Request\MultiProducerRequest.cs" />
- <Compile Include="Request\OffsetRequest.cs" />
- <Compile Include="Request\ProducerRequest.cs" />
- <Compile Include="Util\Crc32.cs" />
- <Compile Include="KafkaConnection.cs" />
- <Compile Include="Message.cs" />
- <Compile Include="Producer.cs" />
- <Compile Include="Properties\AssemblyInfo.cs" />
- <Compile Include="RequestType.cs" />
- <Compile Include="Util\BitWorks.cs" />
- </ItemGroup>
- <ItemGroup />
- <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
- <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
- Other similar extension points exist, see Microsoft.Common.targets.
- <Target Name="BeforeBuild">
- </Target>
- <Target Name="AfterBuild">
- </Target>
- -->
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>8.0.30703</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Kafka.Client</RootNamespace>
+ <AssemblyName>Kafka.Client</AssemblyName>
+ <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <CodeContractsAssemblyMode>0</CodeContractsAssemblyMode>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ <CodeContractsEnableRuntimeChecking>False</CodeContractsEnableRuntimeChecking>
+ <CodeContractsRuntimeOnlyPublicSurface>False</CodeContractsRuntimeOnlyPublicSurface>
+ <CodeContractsRuntimeThrowOnFailure>True</CodeContractsRuntimeThrowOnFailure>
+ <CodeContractsRuntimeCallSiteRequires>False</CodeContractsRuntimeCallSiteRequires>
+ <CodeContractsRuntimeSkipQuantifiers>False</CodeContractsRuntimeSkipQuantifiers>
+ <CodeContractsRunCodeAnalysis>False</CodeContractsRunCodeAnalysis>
+ <CodeContractsNonNullObligations>False</CodeContractsNonNullObligations>
+ <CodeContractsBoundsObligations>False</CodeContractsBoundsObligations>
+ <CodeContractsArithmeticObligations>False</CodeContractsArithmeticObligations>
+ <CodeContractsEnumObligations>False</CodeContractsEnumObligations>
+ <CodeContractsRedundantAssumptions>False</CodeContractsRedundantAssumptions>
+ <CodeContractsRunInBackground>True</CodeContractsRunInBackground>
+ <CodeContractsShowSquigglies>False</CodeContractsShowSquigglies>
+ <CodeContractsUseBaseLine>False</CodeContractsUseBaseLine>
+ <CodeContractsEmitXMLDocs>False</CodeContractsEmitXMLDocs>
+ <CodeContractsCustomRewriterAssembly />
+ <CodeContractsCustomRewriterClass />
+ <CodeContractsLibPaths />
+ <CodeContractsExtraRewriteOptions />
+ <CodeContractsExtraAnalysisOptions />
+ <CodeContractsBaseLineFile />
+ <CodeContractsCacheAnalysisResults>False</CodeContractsCacheAnalysisResults>
+ <CodeContractsRuntimeCheckingLevel>Full</CodeContractsRuntimeCheckingLevel>
+ <CodeContractsReferenceAssembly>%28none%29</CodeContractsReferenceAssembly>
+ <CodeContractsAnalysisWarningLevel>0</CodeContractsAnalysisWarningLevel>
+ <StyleCopTreatErrorsAsWarnings>true</StyleCopTreatErrorsAsWarnings>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ <StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Integration|AnyCPU'">
+ <DebugSymbols>true</DebugSymbols>
+ <OutputPath>bin\Integration\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <DebugType>full</DebugType>
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <ErrorReport>prompt</ErrorReport>
+ <CodeAnalysisIgnoreBuiltInRuleSets>false</CodeAnalysisIgnoreBuiltInRuleSets>
+ <CodeAnalysisIgnoreBuiltInRules>true</CodeAnalysisIgnoreBuiltInRules>
+ <CodeAnalysisFailOnMissingRules>true</CodeAnalysisFailOnMissingRules>
+ <StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
+ <CodeContractsEnableRuntimeChecking>True</CodeContractsEnableRuntimeChecking>
+ <CodeContractsRuntimeOnlyPublicSurface>False</CodeContractsRuntimeOnlyPublicSurface>
+ <CodeContractsRuntimeThrowOnFailure>True</CodeContractsRuntimeThrowOnFailure>
+ <CodeContractsRuntimeCallSiteRequires>False</CodeContractsRuntimeCallSiteRequires>
+ <CodeContractsRuntimeSkipQuantifiers>False</CodeContractsRuntimeSkipQuantifiers>
+ <CodeContractsRunCodeAnalysis>False</CodeContractsRunCodeAnalysis>
+ <CodeContractsNonNullObligations>False</CodeContractsNonNullObligations>
+ <CodeContractsBoundsObligations>False</CodeContractsBoundsObligations>
+ <CodeContractsArithmeticObligations>False</CodeContractsArithmeticObligations>
+ <CodeContractsEnumObligations>False</CodeContractsEnumObligations>
+ <CodeContractsRedundantAssumptions>False</CodeContractsRedundantAssumptions>
+ <CodeContractsRunInBackground>True</CodeContractsRunInBackground>
+ <CodeContractsShowSquigglies>False</CodeContractsShowSquigglies>
+ <CodeContractsUseBaseLine>False</CodeContractsUseBaseLine>
+ <CodeContractsEmitXMLDocs>False</CodeContractsEmitXMLDocs>
+ <CodeContractsCustomRewriterAssembly />
+ <CodeContractsCustomRewriterClass />
+ <CodeContractsLibPaths />
+ <CodeContractsExtraRewriteOptions />
+ <CodeContractsExtraAnalysisOptions />
+ <CodeContractsBaseLineFile />
+ <CodeContractsCacheAnalysisResults>False</CodeContractsCacheAnalysisResults>
+ <CodeContractsRuntimeCheckingLevel>Full</CodeContractsRuntimeCheckingLevel>
+ <CodeContractsReferenceAssembly>%28none%29</CodeContractsReferenceAssembly>
+ <CodeContractsAnalysisWarningLevel>0</CodeContractsAnalysisWarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="log4net">
+ <HintPath>..\..\..\lib\log4Net\log4net.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Configuration" />
+ <Reference Include="System.Core" />
+ <Reference Include="Microsoft.CSharp" />
+ <Reference Include="System.Web.Extensions" />
+ <Reference Include="ZooKeeperNet">
+ <HintPath>..\..\..\lib\zookeeper\ZooKeeperNet.dll</HintPath>
+ </Reference>
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="Cfg\AsyncProducerConfig.cs" />
+ <Compile Include="Cfg\BrokerPartitionInfo.cs" />
+ <Compile Include="Cfg\BrokerPartitionInfoCollection.cs" />
+ <Compile Include="Cfg\Consumer.cs" />
+ <Compile Include="Cfg\ConsumerConfig.cs" />
+ <Compile Include="Cfg\IAsyncProducerConfigShared.cs" />
+ <Compile Include="Cfg\ISyncProducerConfigShared.cs" />
+ <Compile Include="Cfg\KafkaClientConfiguration.cs" />
+ <Compile Include="Cfg\KafkaServer.cs" />
+ <Compile Include="Cfg\ProducerConfig.cs" />
+ <Compile Include="Cfg\ZooKeeperServers.cs" />
+ <Compile Include="Cluster\Cluster.cs" />
+ <Compile Include="Cluster\Partition.cs" />
+ <Compile Include="Consumers\Consumer.cs" />
+ <Compile Include="Consumers\ConsumerIterator.cs" />
+ <Compile Include="Consumers\ConsumerIteratorState.cs" />
+ <Compile Include="Consumers\FetchedDataChunk.cs" />
+ <Compile Include="Consumers\Fetcher.cs" />
+ <Compile Include="Consumers\FetcherRunnable.cs" />
+ <Compile Include="Consumers\IConsumer.cs" />
+ <Compile Include="Consumers\IConsumerConnector.cs" />
+ <Compile Include="Consumers\KafkaMessageStream.cs" />
+ <Compile Include="Consumers\PartitionTopicInfo.cs" />
+ <Compile Include="Consumers\TopicCount.cs" />
+ <Compile Include="Consumers\ZookeeperConsumerConnector.cs" />
+ <Compile Include="Exceptions\ConsumerTimeoutException.cs" />
+ <Compile Include="Exceptions\MessageSizeTooLargeException.cs" />
+ <Compile Include="Exceptions\ZKRebalancerException.cs" />
+ <Compile Include="Exceptions\ZooKeeperException.cs" />
+ <Compile Include="Exceptions\ZooKeeperTimeoutException.cs" />
+ <Compile Include="KafkaConnection.cs">
+ <SubType>Code</SubType>
+ </Compile>
+ <Compile Include="Exceptions\KafkaException.cs">
+ <SubType>Code</SubType>
+ </Compile>
+ <Compile Include="Messages\BoundedBuffer.cs" />
+ <Compile Include="Producers\Async\AsyncProducerPool.cs" />
+ <Compile Include="Producers\Async\MessageSent.cs" />
+ <Compile Include="Producers\Producer.StrMsg.cs" />
+ <Compile Include="Producers\Sync\SyncProducerPool.cs" />
+ <Compile Include="Serialization\StringEncoder.cs" />
+ <Compile Include="Serialization\IWritable.cs" />
+ <Compile Include="Producers\ProducerTypes.cs" />
+ <Compile Include="Cfg\SyncProducerConfig.cs" />
+ <Compile Include="Cfg\ZKConfig.cs" />
+ <Compile Include="Cluster\Broker.cs" />
+ <Compile Include="Producers\Partitioning\ConfigBrokerPartitionInfo.cs" />
+ <Compile Include="Producers\Partitioning\DefaultPartitioner.cs" />
+ <Compile Include="Producers\Partitioning\ZKBrokerPartitionInfo.cs" />
+ <Compile Include="KafkaClientBase.cs" />
+ <Compile Include="Serialization\DefaultEncoder.cs" />
+ <Compile Include="Serialization\KafkaBinaryReader.cs" />
+ <Compile Include="Serialization\KafkaBinaryWriter.cs" />
+ <Compile Include="Utils\ErrorMapping.cs" />
+ <Compile Include="Utils\Extensions.cs" />
+ <Compile Include="Utils\Guard.cs" />
+ <Compile Include="Utils\KafkaScheduler.cs" />
+ <Compile Include="Utils\ZKGroupDirs.cs" />
+ <Compile Include="Utils\ZKGroupTopicDirs.cs" />
+ <Compile Include="Utils\ZkUtils.cs" />
+ <Compile Include="ZooKeeperAwareKafkaClientBase.cs" />
+ <Compile Include="Messages\MessageSet.cs" />
+ <Compile Include="Messages\BufferedMessageSet.cs" />
+ <Compile Include="Producers\Async\AsyncProducer.cs" />
+ <Compile Include="Producers\Async\ICallbackHandler.cs" />
+ <Compile Include="Producers\Partitioning\IBrokerPartitionInfo.cs" />
+ <Compile Include="Producers\Partitioning\IPartitioner.cs" />
+ <Compile Include="Producers\Async\IAsyncProducer.cs" />
+ <Compile Include="Producers\IProducer.cs" />
+ <Compile Include="Producers\IProducerPool.cs" />
+ <Compile Include="Producers\Sync\ISyncProducer.cs" />
+ <Compile Include="Producers\ProducerPool.cs" />
+ <Compile Include="Producers\ProducerPoolData.cs" />
+ <Compile Include="Producers\Sync\SyncProducer.cs" />
+ <Compile Include="Requests\AbstractRequest.cs" />
+ <Compile Include="Messages\Message.cs" />
+ <Compile Include="Producers\ProducerData.cs" />
+ <Compile Include="RequestContext.cs" />
+ <Compile Include="Requests\FetchRequest.cs" />
+ <Compile Include="Requests\MultiFetchRequest.cs" />
+ <Compile Include="Requests\MultiProducerRequest.cs" />
+ <Compile Include="Requests\OffsetRequest.cs" />
+ <Compile Include="Requests\ProducerRequest.cs" />
+ <Compile Include="Serialization\IEncoder.cs" />
+ <Compile Include="Utils\Crc32Hasher.cs" />
+ <Compile Include="Producers\Producer.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="Requests\RequestTypes.cs" />
+ <Compile Include="Utils\BitWorks.cs" />
+ <Compile Include="Utils\ReflectionHelper.cs" />
+ <Compile Include="ZooKeeperIntegration\Listeners\BrokerTopicsListener.cs" />
+ <Compile Include="ZooKeeperIntegration\Events\ChildChangedEventItem.cs" />
+ <Compile Include="ZooKeeperIntegration\Events\DataChangedEventItem.cs" />
+ <Compile Include="ZooKeeperIntegration\Events\ZooKeeperChildChangedEventArgs.cs" />
+ <Compile Include="ZooKeeperIntegration\Events\ZooKeeperDataChangedEventArgs.cs" />
+ <Compile Include="ZooKeeperIntegration\Events\ZooKeeperEventTypes.cs" />
+ <Compile Include="ZooKeeperIntegration\Listeners\IZooKeeperDataListener.cs" />
+ <Compile Include="ZooKeeperIntegration\Listeners\IZooKeeperStateListener.cs" />
+ <Compile Include="ZooKeeperIntegration\Events\ZooKeeperEventArgs.cs" />
+ <Compile Include="ZooKeeperIntegration\Events\ZooKeeperSessionCreatedEventArgs.cs" />
+ <Compile Include="ZooKeeperIntegration\Events\ZooKeeperStateChangedEventArgs.cs" />
+ <Compile Include="ZooKeeperIntegration\Listeners\IZooKeeperChildListener.cs" />
+ <Compile Include="ZooKeeperIntegration\IZooKeeperConnection.cs" />
+ <Compile Include="ZooKeeperIntegration\IZooKeeperClient.cs" />
+ <Compile Include="ZooKeeperIntegration\Listeners\ZKRebalancerListener.cs" />
+ <Compile Include="ZooKeeperIntegration\Listeners\ZKSessionExpireListener.cs" />
+ <Compile Include="ZooKeeperIntegration\ZooKeeperConnection.cs" />
+ <Compile Include="ZooKeeperIntegration\ZooKeeperClient.cs" />
+ <Compile Include="ZooKeeperIntegration\ZooKeeperClient.Watcher.cs">
+ <DependentUpon>ZooKeeperClient.cs</DependentUpon>
+ </Compile>
+ <Compile Include="ZooKeeperIntegration\IZooKeeperSerializer.cs" />
+ <Compile Include="ZooKeeperIntegration\ZooKeeperStringSerializer.cs" />
+ </ItemGroup>
+ <ItemGroup />
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <Import Project="..\..\..\lib\StyleCop\Microsoft.StyleCop.Targets" />
</Project>
\ No newline at end of file
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaClientBase.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaClientBase.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaClientBase.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaClientBase.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client
+{
+ using System;
+
+ /// <summary>
+ /// Base class for all Kafka clients
+ /// </summary>
+ public abstract class KafkaClientBase : IDisposable
+ {
+ /// <summary>
+ /// Releases all unmanaged and managed resources
+ /// </summary>
+ public void Dispose()
+ {
+ this.Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ /// <summary>
+ /// Releases all unmanaged and managed resources
+ /// </summary>
+ /// <param name="disposing">
+ /// Indicates whether release managed resources.
+ /// </param>
+ protected abstract void Dispose(bool disposing);
+ }
+}