You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/21 21:17:25 UTC
svn commit: r1173797 [4/10] - in /incubator/kafka/trunk/clients/csharp: ./
lib/StyleCop/ src/Kafka/ src/Kafka/Kafka.Client/
src/Kafka/Kafka.Client/Cfg/ src/Kafka/Kafka.Client/Cluster/
src/Kafka/Kafka.Client/Consumers/ src/Kafka/Kafka.Client/Exceptions/...
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,336 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers.Partitioning
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Globalization;
+ using System.Reflection;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Cluster;
+ using Kafka.Client.Utils;
+ using Kafka.Client.ZooKeeperIntegration;
+ using Kafka.Client.ZooKeeperIntegration.Events;
+ using Kafka.Client.ZooKeeperIntegration.Listeners;
+ using log4net;
+ using ZooKeeperNet;
+
+ /// <summary>
+ /// Fetch broker info like ID, host, port and number of partitions from ZooKeeper.
+ /// </summary>
+ /// <remarks>
+ /// Used when zookeeper based auto partition discovery is enabled
+ /// </remarks>
+ internal class ZKBrokerPartitionInfo : IBrokerPartitionInfo, IZooKeeperStateListener
+ {
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+ private readonly Action<int, string, int> callback;
+ private IDictionary<int, Broker> brokers;
+ private IDictionary<string, SortedSet<Partition>> topicBrokerPartitions;
+ private readonly IZooKeeperClient zkclient;
+ private readonly BrokerTopicsListener brokerTopicsListener;
+ private volatile bool disposed;
+ private readonly object shuttingDownLock = new object();
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ZKBrokerPartitionInfo"/> class.
+ /// </summary>
+ /// <param name="zkclient">The wrapper above ZooKeeper client.</param>
+ public ZKBrokerPartitionInfo(IZooKeeperClient zkclient)
+ {
+ this.zkclient = zkclient;
+ this.zkclient.Connect();
+ this.InitializeBrokers();
+ this.InitializeTopicBrokerPartitions();
+ this.brokerTopicsListener = new BrokerTopicsListener(this.zkclient, this.topicBrokerPartitions, this.brokers, this.callback);
+ this.RegisterListeners();
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ZKBrokerPartitionInfo"/> class.
+ /// </summary>
+ /// <param name="config">The config.</param>
+ /// <param name="callback">The callback invoked when new broker is added.</param>
+ public ZKBrokerPartitionInfo(ZKConfig config, Action<int, string, int> callback)
+ : this(new ZooKeeperClient(config.ZkConnect, config.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ {
+ this.callback = callback;
+ }
+
+ /// <summary>
+ /// Gets a mapping from broker ID to the host and port for all brokers
+ /// </summary>
+ /// <returns>
+ /// Mapping from broker ID to the host and port for all brokers
+ /// </returns>
+ public IDictionary<int, Broker> GetAllBrokerInfo()
+ {
+ this.EnsuresNotDisposed();
+ return this.brokers;
+ }
+
+ /// <summary>
+ /// Gets a mapping from broker ID to partition IDs
+ /// </summary>
+ /// <param name="topic">The topic for which this information is to be returned</param>
+ /// <returns>
+ /// Mapping from broker ID to partition IDs
+ /// </returns>
+ public SortedSet<Partition> GetBrokerPartitionInfo(string topic)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(topic));
+
+ this.EnsuresNotDisposed();
+ SortedSet<Partition> brokerPartitions = null;
+ if (this.topicBrokerPartitions.ContainsKey(topic))
+ {
+ brokerPartitions = this.topicBrokerPartitions[topic];
+ }
+
+ if (brokerPartitions == null || brokerPartitions.Count == 0)
+ {
+ var numBrokerPartitions = this.BootstrapWithExistingBrokers(topic);
+ this.topicBrokerPartitions.Add(topic, numBrokerPartitions);
+ return numBrokerPartitions;
+ }
+
+ return brokerPartitions;
+ }
+
+ /// <summary>
+ /// Gets the host and port information for the broker identified by the given broker ID
+ /// </summary>
+ /// <param name="brokerId">The broker ID.</param>
+ /// <returns>
+ /// Host and port of broker
+ /// </returns>
+ public Broker GetBrokerInfo(int brokerId)
+ {
+ this.EnsuresNotDisposed();
+ return this.brokers.ContainsKey(brokerId) ? this.brokers[brokerId] : null;
+ }
+
+ /// <summary>
+ /// Closes underlying connection to ZooKeeper
+ /// </summary>
+ public void Dispose()
+ {
+ if (this.disposed)
+ {
+ return;
+ }
+
+ lock (this.shuttingDownLock)
+ {
+ if (this.disposed)
+ {
+ return;
+ }
+
+ this.disposed = true;
+ }
+
+ try
+ {
+ if (this.zkclient != null)
+ {
+ this.zkclient.Dispose();
+ }
+ }
+ catch (Exception exc)
+ {
+ Logger.Warn("Ignoring unexpected errors on closing", exc);
+ }
+ }
+
+ /// <summary>
+ /// Initializes the list of brokers.
+ /// </summary>
+ private void InitializeBrokers()
+ {
+ if (this.brokers != null)
+ {
+ return;
+ }
+
+ this.brokers = new Dictionary<int, Broker>();
+ IList<string> brokerIds = this.zkclient.GetChildrenParentMayNotExist(ZooKeeperClient.DefaultBrokerIdsPath);
+ foreach (var brokerId in brokerIds)
+ {
+ string path = ZooKeeperClient.DefaultBrokerIdsPath + "/" + brokerId;
+ int id = int.Parse(brokerId, CultureInfo.InvariantCulture);
+ var info = this.zkclient.ReadData<string>(path, null);
+ string[] parts = info.Split(':');
+ int port = int.Parse(parts[2], CultureInfo.InvariantCulture);
+ this.brokers.Add(id, new Broker(id, parts[0], parts[1], port));
+ }
+ }
+
+ /// <summary>
+ /// Initializes the topic - broker's partitions mappings.
+ /// </summary>
+ private void InitializeTopicBrokerPartitions()
+ {
+ if (this.topicBrokerPartitions != null)
+ {
+ return;
+ }
+
+ this.topicBrokerPartitions = new Dictionary<string, SortedSet<Partition>>();
+ this.zkclient.MakeSurePersistentPathExists(ZooKeeperClient.DefaultBrokerTopicsPath);
+ IList<string> topics = this.zkclient.GetChildrenParentMayNotExist(ZooKeeperClient.DefaultBrokerTopicsPath);
+ foreach (string topic in topics)
+ {
+ string brokerTopicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic;
+ IList<string> brokersPerTopic = this.zkclient.GetChildrenParentMayNotExist(brokerTopicPath);
+ var brokerPartitions = new SortedDictionary<int, int>();
+ foreach (string brokerId in brokersPerTopic)
+ {
+ string path = brokerTopicPath + "/" + brokerId;
+ var numPartitionsPerBrokerAndTopic = this.zkclient.ReadData<string>(path);
+ brokerPartitions.Add(int.Parse(brokerId, CultureInfo.InvariantCulture), int.Parse(numPartitionsPerBrokerAndTopic, CultureInfo.CurrentCulture));
+ }
+
+ var brokerParts = new SortedSet<Partition>();
+ foreach (var brokerPartition in brokerPartitions)
+ {
+ for (int i = 0; i < brokerPartition.Value; i++)
+ {
+ var bidPid = new Partition(brokerPartition.Key, i);
+ brokerParts.Add(bidPid);
+ }
+ }
+
+ this.topicBrokerPartitions.Add(topic, brokerParts);
+ }
+ }
+
+ /// <summary>
+ /// Add the all available brokers with default one partition for new topic, so all of the brokers
+ /// participate in hosting this topic
+ /// </summary>
+ /// <param name="topic">The new topic.</param>
+ /// <returns>Default partitions for new broker</returns>
+ /// <remarks>
+ /// Since we do not have the in formation about number of partitions on these brokers, just assume single partition
+ /// just pick partition 0 from each broker as a candidate
+ /// </remarks>
+ private SortedSet<Partition> BootstrapWithExistingBrokers(string topic)
+ {
+ Logger.Debug("Currently, no brokers are registered under topic: " + topic);
+ Logger.Debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default "
+ + "number of partitions = 1");
+ var numBrokerPartitions = new SortedSet<Partition>();
+ var allBrokers = this.zkclient.GetChildrenParentMayNotExist(ZooKeeperClient.DefaultBrokerIdsPath);
+ Logger.Debug("List of all brokers currently registered in zookeeper -> " + string.Join(", ", allBrokers));
+ foreach (var broker in allBrokers)
+ {
+ numBrokerPartitions.Add(new Partition(int.Parse(broker, CultureInfo.InvariantCulture), 0));
+ }
+
+ Logger.Debug("Adding following broker id, partition id for NEW topic: " + topic + " -> " + string.Join(", ", numBrokerPartitions));
+ return numBrokerPartitions;
+ }
+
+ /// <summary>
+ /// Registers the listeners under several path in ZooKeeper
+ /// to keep related data structures updated.
+ /// </summary>
+ /// <remarks>
+ /// Watch on following path:
+ /// /broker/topics
+ /// /broker/topics/[topic]
+ /// /broker/ids
+ /// </remarks>
+ private void RegisterListeners()
+ {
+ this.zkclient.Subscribe(ZooKeeperClient.DefaultBrokerTopicsPath, this.brokerTopicsListener);
+ Logger.Debug("Registering listener on path: " + ZooKeeperClient.DefaultBrokerTopicsPath);
+ foreach (string topic in this.topicBrokerPartitions.Keys)
+ {
+ string path = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic;
+ this.zkclient.Subscribe(path, this.brokerTopicsListener);
+ Logger.Debug("Registering listener on path: " + path);
+ }
+
+ this.zkclient.Subscribe(ZooKeeperClient.DefaultBrokerIdsPath, this.brokerTopicsListener);
+ Logger.Debug("Registering listener on path: " + ZooKeeperClient.DefaultBrokerIdsPath);
+
+ this.zkclient.Subscribe(this);
+ Logger.Debug("Registering listener on state changed event");
+ }
+
+ /// <summary>
+ /// Resets the related data structures
+ /// </summary>
+ private void Reset()
+ {
+ this.topicBrokerPartitions = null;
+ this.brokers = null;
+ this.InitializeBrokers();
+ this.InitializeTopicBrokerPartitions();
+ }
+
+ /// <summary>
+ /// Ensures that object was not disposed
+ /// </summary>
+ private void EnsuresNotDisposed()
+ {
+ if (this.disposed)
+ {
+ throw new ObjectDisposedException(this.GetType().Name);
+ }
+ }
+
+ /// <summary>
+ /// Called when the ZooKeeper connection state has changed.
+ /// </summary>
+ /// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperStateChangedEventArgs"/> instance containing the event data.</param>
+ /// <remarks>
+ /// Do nothing, since zkclient will do reconnect for us.
+ /// </remarks>
+ public void HandleStateChanged(ZooKeeperStateChangedEventArgs args)
+ {
+ Guard.Assert<ArgumentNullException>(() => args != null);
+ Guard.Assert<ArgumentException>(() => args.State != KeeperState.Unknown);
+
+ this.EnsuresNotDisposed();
+ Logger.Debug("Handle state change: do nothing, since zkclient will do reconnect for us.");
+ }
+
+ /// <summary>
+ /// Called after the ZooKeeper session has expired and a new session has been created.
+ /// </summary>
+ /// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperSessionCreatedEventArgs"/> instance containing the event data.</param>
+ /// <remarks>
+ /// We would have to re-create any ephemeral nodes here.
+ /// </remarks>
+ public void HandleSessionCreated(ZooKeeperSessionCreatedEventArgs args)
+ {
+ Guard.Assert<ArgumentNullException>(() => args != null);
+
+ this.EnsuresNotDisposed();
+ Logger.Debug("ZK expired; release old list of broker partitions for topics ");
+ this.Reset();
+ this.brokerTopicsListener.ResetState();
+ foreach (var topic in this.topicBrokerPartitions.Keys)
+ {
+ this.zkclient.Subscribe(ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic, this.brokerTopicsListener);
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers
+{
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Producers.Async;
+ using Kafka.Client.Producers.Partitioning;
+ using Kafka.Client.Serialization;
+
+ /// <summary>
+ /// High-level Producer API that exposes all the producer functionality to the client
+ /// using <see cref="System.String" /> as type of key and <see cref="Message" /> as type of data
+ /// </summary>
+ public class Producer : Producer<string, Message>
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Producer"/> class.
+ /// </summary>
+ /// <param name="config">The config object.</param>
+ /// <param name="partitioner">The partitioner that implements <see cref="IPartitioner<String>" />
+ /// used to supply a custom partitioning strategy based on the message key.</param>
+ /// <param name="producerPool">Pool of producers, one per broker.</param>
+ /// <param name="populateProducerPool">if set to <c>true</c>, producers should be populated.</param>
+ /// <remarks>
+ /// Should be used for testing purpose only.
+ /// </remarks>
+ internal Producer(ProducerConfig config, IPartitioner<string> partitioner, IProducerPool<Message> producerPool, bool populateProducerPool)
+ : base(config, partitioner, producerPool, populateProducerPool)
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Producer"/> class.
+ /// </summary>
+ /// <param name="config">The config object.</param>
+ /// <remarks>
+ /// Can be used when all config parameters will be specified through the config object
+ /// and will be instantiated via reflection
+ /// </remarks>
+ public Producer(ProducerConfig config)
+ : base(config)
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Producer"/> class.
+ /// </summary>
+ /// <param name="config">The config object.</param>
+ /// <param name="partitioner">The partitioner that implements <see cref="IPartitioner<String>" />
+ /// used to supply a custom partitioning strategy based on the message key.</param>
+ /// <param name="encoder">The encoder that implements <see cref="IEncoder<Message>" /></param>
+ /// <param name="callbackHandler">The callback handler that implements <see cref="ICallbackHandler" />, used
+ /// to supply callback invoked when sending asynchronous request is completed.</param>
+ /// <remarks>
+ /// Can be used to provide pre-instantiated objects for all config parameters
+ /// that would otherwise be instantiated via reflection.
+ /// </remarks>
+ public Producer(ProducerConfig config, IPartitioner<string> partitioner, IEncoder<Message> encoder, ICallbackHandler callbackHandler)
+ : base(config, partitioner, encoder, callbackHandler)
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Producer"/> class.
+ /// </summary>
+ /// <param name="config">The config object.</param>
+ /// <param name="partitioner">The partitioner that implements <see cref="IPartitioner<TKey>" />
+ /// used to supply a custom partitioning strategy based on the message key.</param>
+ /// <param name="encoder">The encoder that implements <see cref="IEncoder<Message>" />
+ /// </param>
+ /// <remarks>
+ /// Can be used to provide pre-instantiated objects for all config parameters
+ /// that would otherwise be instantiated via reflection.
+ /// </remarks>
+ public Producer(ProducerConfig config, IPartitioner<string> partitioner, IEncoder<Message> encoder)
+ : base(config, partitioner, encoder)
+ {
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,329 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Globalization;
+ using System.Linq;
+ using System.Reflection;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Cluster;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Producers.Async;
+ using Kafka.Client.Producers.Partitioning;
+ using Kafka.Client.Requests;
+ using Kafka.Client.Serialization;
+ using Kafka.Client.Utils;
+ using log4net;
+
+ /// <summary>
+ /// High-level Producer API that exposes all the producer functionality to the client
+ /// </summary>
+ /// <typeparam name="TKey">The type of the key.</typeparam>
+ /// <typeparam name="TData">The type of the data.</typeparam>
+ /// <remarks>
+ /// Provides serialization of data through a user-specified encoder, zookeeper based automatic broker discovery
+ /// and software load balancing through an optionally user-specified partitioner
+ /// </remarks>
+ public class Producer<TKey, TData> : ZooKeeperAwareKafkaClientBase, IProducer<TKey, TData>
+ where TKey : class
+ where TData : class
+ {
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+ private static readonly Random Randomizer = new Random();
+ private readonly ProducerConfig config;
+ private readonly IProducerPool<TData> producerPool;
+ private readonly IPartitioner<TKey> partitioner;
+ private readonly bool populateProducerPool;
+ private readonly IBrokerPartitionInfo brokerPartitionInfo;
+ private volatile bool disposed;
+ private readonly object shuttingDownLock = new object();
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Producer<TKey, TData>"/> class.
+ /// </summary>
+ /// <param name="config">The config object.</param>
+ /// <param name="partitioner">The partitioner that implements <see cref="IPartitioner<TKey>" />
+ /// used to supply a custom partitioning strategy based on the message key.</param>
+ /// <param name="producerPool">Pool of producers, one per broker.</param>
+ /// <param name="populateProducerPool">if set to <c>true</c>, producers should be populated.</param>
+ /// <remarks>
+ /// Should be used for testing purpose only.
+ /// </remarks>
+ internal Producer(
+ ProducerConfig config,
+ IPartitioner<TKey> partitioner,
+ IProducerPool<TData> producerPool,
+ bool populateProducerPool = true)
+ : base(config)
+ {
+ Guard.Assert<ArgumentNullException>(() => config != null);
+ Guard.Assert<ArgumentNullException>(() => producerPool != null);
+ this.config = config;
+ this.partitioner = partitioner ?? new DefaultPartitioner<TKey>();
+ this.populateProducerPool = populateProducerPool;
+ this.producerPool = producerPool;
+ if (this.IsZooKeeperEnabled)
+ {
+ this.brokerPartitionInfo = new ZKBrokerPartitionInfo(this.config, this.Callback);
+ }
+ else
+ {
+ this.brokerPartitionInfo = new ConfigBrokerPartitionInfo(this.config);
+ }
+
+ if (this.populateProducerPool)
+ {
+ IDictionary<int, Broker> allBrokers = this.brokerPartitionInfo.GetAllBrokerInfo();
+ foreach (var broker in allBrokers)
+ {
+ this.producerPool.AddProducer(
+ new Broker(broker.Key, broker.Value.Host, broker.Value.Host, broker.Value.Port));
+ }
+ }
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Producer<TKey, TData>"/> class.
+ /// </summary>
+ /// <param name="config">The config object.</param>
+ /// <remarks>
+ /// Can be used when all config parameters will be specified through the config object
+ /// and will be instantiated via reflection
+ /// </remarks>
+ public Producer(ProducerConfig config)
+ : this(
+ config,
+ ReflectionHelper.Instantiate<IPartitioner<TKey>>(config.PartitionerClass),
+ ProducerPool<TData>.CreatePool(config, ReflectionHelper.Instantiate<IEncoder<TData>>(config.SerializerClass)))
+ {
+ Guard.Assert<ArgumentNullException>(() => config != null);
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Producer<TKey, TData>"/> class.
+ /// </summary>
+ /// <param name="config">The config object.</param>
+ /// <param name="partitioner">The partitioner that implements <see cref="IPartitioner<TKey>" />
+ /// used to supply a custom partitioning strategy based on the message key.</param>
+ /// <param name="encoder">The encoder that implements <see cref="IEncoder<TData>" />
+ /// used to convert an object of type TData to <see cref="Message" />.</param>
+ /// <param name="callbackHandler">The callback handler that implements <see cref="ICallbackHandler" />, used
+ /// to supply callback invoked when sending asynchronous request is completed.</param>
+ /// <remarks>
+ /// Can be used to provide pre-instantiated objects for all config parameters
+ /// that would otherwise be instantiated via reflection.
+ /// </remarks>
+ public Producer(
+ ProducerConfig config,
+ IPartitioner<TKey> partitioner,
+ IEncoder<TData> encoder,
+ ICallbackHandler callbackHandler)
+ : this(
+ config,
+ partitioner,
+ ProducerPool<TData>.CreatePool(config, encoder, callbackHandler))
+ {
+ Guard.Assert<ArgumentNullException>(() => config != null);
+ Guard.Assert<ArgumentNullException>(() => encoder != null);
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Producer<TKey, TData>"/> class.
+ /// </summary>
+ /// <param name="config">The config object.</param>
+ /// <param name="partitioner">The partitioner that implements <see cref="IPartitioner<TKey>" />
+ /// used to supply a custom partitioning strategy based on the message key.</param>
+ /// <param name="encoder">The encoder that implements <see cref="IEncoder<TData>" />
+ /// used to convert an object of type TData to <see cref="Message" />.</param>
+ /// <remarks>
+ /// Can be used to provide pre-instantiated objects for all config parameters
+ /// that would otherwise be instantiated via reflection.
+ /// </remarks>
+ public Producer(
+ ProducerConfig config,
+ IPartitioner<TKey> partitioner,
+ IEncoder<TData> encoder)
+ : this(
+ config,
+ partitioner,
+ ProducerPool<TData>.CreatePool(config, encoder, null))
+ {
+ Guard.Assert<ArgumentNullException>(() => config != null);
+ Guard.Assert<ArgumentNullException>(() => encoder != null);
+ }
+
+ /// <summary>
+ /// Sends the data to a multiple topics, partitioned by key, using either the
+ /// synchronous or the asynchronous producer.
+ /// </summary>
+ /// <param name="data">The producer data objects that encapsulate the topic, key and message data.</param>
+ public void Send(IEnumerable<ProducerData<TKey, TData>> data)
+ {
+ Guard.Assert<ArgumentNullException>(() => data != null);
+ Guard.Assert<ArgumentException>(() => data.Count() > 0);
+ this.EnsuresNotDisposed();
+ var poolRequests = new List<ProducerPoolData<TData>>();
+ foreach (var dataItem in data)
+ {
+ Partition partition = this.GetPartition(dataItem);
+ var poolRequest = new ProducerPoolData<TData>(dataItem.Topic, partition, dataItem.Data);
+ poolRequests.Add(poolRequest);
+ }
+
+ this.producerPool.Send(poolRequests);
+ }
+
+ /// <summary>
+ /// Sends the data to a single topic, partitioned by key, using either the
+ /// synchronous or the asynchronous producer.
+ /// </summary>
+ /// <param name="data">The producer data object that encapsulates the topic, key and message data.</param>
+ public void Send(ProducerData<TKey, TData> data)
+ {
+ Guard.Assert<ArgumentNullException>(() => data != null);
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(data.Topic));
+ Guard.Assert<ArgumentNullException>(() => data.Data != null);
+ Guard.Assert<ArgumentException>(() => data.Data.Count() > 0);
+ this.EnsuresNotDisposed();
+ this.Send(new[] { data });
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (!disposing)
+ {
+ return;
+ }
+
+ if (this.disposed)
+ {
+ return;
+ }
+
+ lock (this.shuttingDownLock)
+ {
+ if (this.disposed)
+ {
+ return;
+ }
+
+ this.disposed = true;
+ }
+
+ try
+ {
+ if (this.brokerPartitionInfo != null)
+ {
+ this.brokerPartitionInfo.Dispose();
+ }
+ }
+ catch (Exception exc)
+ {
+ Logger.Warn("Ignoring unexpected errors on closing", exc);
+ }
+ }
+
+ /// <summary>
+ /// Callback to add a new producer to the producer pool.
+ /// Used by <see cref="ZKBrokerPartitionInfo" /> on registration of new broker in ZooKeeper
+ /// </summary>
+ /// <param name="bid">The broker Id.</param>
+ /// <param name="host">The broker host address.</param>
+ /// <param name="port">The broker port.</param>
+ private void Callback(int bid, string host, int port)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(host));
+ Guard.Assert<ArgumentOutOfRangeException>(() => port > 0);
+
+ if (this.populateProducerPool)
+ {
+ this.producerPool.AddProducer(new Broker(bid, host, host, port));
+ }
+ else
+ {
+ Logger.Debug("Skipping the callback since populating producers is off");
+ }
+ }
+
+ /// <summary>
+ /// Retrieves the partition id based on key using given partitioner or select random partition if key is null
+ /// </summary>
+ /// <param name="key">The partition key.</param>
+ /// <param name="numPartitions">The total number of available partitions.</param>
+ /// <returns>Partition Id</returns>
+ private int GetPartitionId(TKey key, int numPartitions)
+ {
+ Guard.Assert<ArgumentOutOfRangeException>(() => numPartitions > 0);
+ return key == null
+ ? Randomizer.Next(numPartitions)
+ : this.partitioner.Partition(key, numPartitions);
+ }
+
+ /// <summary>
+ /// Gets the partition for topic.
+ /// </summary>
+ /// <param name="dataItem">The producer data object that encapsulates the topic, key and message data.</param>
+ /// <returns>Partition for topic</returns>
+ private Partition GetPartition(ProducerData<TKey, TData> dataItem)
+ {
+ Logger.DebugFormat(
+ CultureInfo.CurrentCulture,
+ "Getting the number of broker partitions registered for topic: {0}",
+ dataItem.Topic);
+ SortedSet<Partition> brokerPartitions = this.brokerPartitionInfo.GetBrokerPartitionInfo(dataItem.Topic);
+ int totalNumPartitions = brokerPartitions.Count;
+ Logger.DebugFormat(
+ CultureInfo.CurrentCulture,
+ "Broker partitions registered for topic: {0} = {1}",
+ dataItem.Topic,
+ totalNumPartitions);
+ int partitionId = this.GetPartitionId(dataItem.Key, totalNumPartitions);
+ Partition brokerIdPartition = brokerPartitions.ToList()[partitionId];
+ Broker brokerInfo = this.brokerPartitionInfo.GetBrokerInfo(brokerIdPartition.BrokerId);
+ if (this.IsZooKeeperEnabled)
+ {
+ Logger.DebugFormat(
+ CultureInfo.CurrentCulture,
+ "Sending message to broker {0}:{1} on partition {2}",
+ brokerInfo.Host,
+ brokerInfo.Port,
+ brokerIdPartition.PartId);
+ return new Partition(brokerIdPartition.BrokerId, brokerIdPartition.PartId);
+ }
+
+ Logger.DebugFormat(
+ CultureInfo.CurrentCulture,
+ "Sending message to broker {0}:{1} on a randomly chosen partition",
+ brokerInfo.Host,
+ brokerInfo.Port);
+ return new Partition(brokerIdPartition.BrokerId, ProducerRequest.RandomPartition);
+ }
+
+ /// <summary>
+ /// Ensures that object was not disposed
+ /// </summary>
+ private void EnsuresNotDisposed()
+ {
+ if (this.disposed)
+ {
+ throw new ObjectDisposedException(this.GetType().Name);
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerData.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerData.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerData.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerData.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers
+{
+ using System.Collections.Generic;
+
+ /// <summary>
+ /// Encapsulates data to be send on topic
+ /// </summary>
+ /// <typeparam name="TKey">
+ /// Type of partitioning key
+ /// </typeparam>
+ /// <typeparam name="TData">
+ /// Type of data
+ /// </typeparam>
+ public class ProducerData<TKey, TData>
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ProducerData{TKey,TData}"/> class.
+ /// </summary>
+ /// <param name="topic">
+ /// The topic.
+ /// </param>
+ /// <param name="key">
+ /// The partitioning key.
+ /// </param>
+ /// <param name="data">
+ /// The list of data to send on the same topic.
+ /// </param>
+ public ProducerData(string topic, TKey key, IEnumerable<TData> data)
+ : this(topic, data)
+ {
+ this.Key = key;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ProducerData{TKey,TData}"/> class.
+ /// </summary>
+ /// <param name="topic">
+ /// The topic.
+ /// </param>
+ /// <param name="data">
+ /// The list of data to send on the same topic.
+ /// </param>
+ public ProducerData(string topic, IEnumerable<TData> data)
+ {
+ this.Topic = topic;
+ this.Data = data;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ProducerData{TKey,TData}"/> class.
+ /// </summary>
+ /// <param name="topic">
+ /// The topic.
+ /// </param>
+ /// <param name="data">
+ /// The data to send on the topic.
+ /// </param>
+ public ProducerData(string topic, TData data)
+ : this(topic, new[] { data })
+ {
+ }
+
+ /// <summary>
+ /// Gets topic.
+ /// </summary>
+ public string Topic { get; private set; }
+
+ /// <summary>
+ /// Gets the partitioning key.
+ /// </summary>
+ public TKey Key { get; private set; }
+
+ /// <summary>
+ /// Gets the data.
+ /// </summary>
+ public IEnumerable<TData> Data { get; private set; }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers
+{
+ using System;
+ using System.Collections.Generic;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Cluster;
+ using Kafka.Client.Producers.Async;
+ using Kafka.Client.Producers.Sync;
+ using Kafka.Client.Serialization;
+ using Kafka.Client.Utils;
+
+ /// <summary>
+ /// The base for all classes that represents pool of producers used by high-level API
+ /// </summary>
+ /// <typeparam name="TData">The type of the data.</typeparam>
+ internal abstract class ProducerPool<TData> : IProducerPool<TData>
+ where TData : class
+ {
+ /// <summary>
+ /// Factory method used to instantiating either,
+ /// synchronous or asynchronous, producer pool based on configuration.
+ /// </summary>
+ /// <param name="config">
+ /// The producer pool configuration.
+ /// </param>
+ /// <param name="serializer">
+ /// The serializer.
+ /// </param>
+ /// <returns>
+ /// Instantiated either, synchronous or asynchronous, producer pool
+ /// </returns>
+ public static ProducerPool<TData> CreatePool(ProducerConfig config, IEncoder<TData> serializer)
+ {
+ if (config.ProducerType == ProducerTypes.Async)
+ {
+ return AsyncProducerPool<TData>.CreateAsyncPool(config, serializer);
+ }
+
+ if (config.ProducerType == ProducerTypes.Sync)
+ {
+ return SyncProducerPool<TData>.CreateSyncPool(config, serializer);
+ }
+
+ throw new InvalidOperationException("Not supported producer type " + config.ProducerType);
+ }
+
+ /// <summary>
+ /// Factory method used to instantiating either,
+ /// synchronous or asynchronous, producer pool based on configuration.
+ /// </summary>
+ /// <param name="config">
+ /// The producer pool configuration.
+ /// </param>
+ /// <param name="serializer">
+ /// The serializer.
+ /// </param>
+ /// <param name="cbkHandler">
+ /// The callback invoked after new broker is added.
+ /// </param>
+ /// <returns>
+ /// Instantiated either, synchronous or asynchronous, producer pool
+ /// </returns>
+ public static ProducerPool<TData> CreatePool(
+ ProducerConfig config,
+ IEncoder<TData> serializer,
+ ICallbackHandler cbkHandler)
+ {
+ if (config.ProducerType == ProducerTypes.Async)
+ {
+ return AsyncProducerPool<TData>.CreateAsyncPool(config, serializer, cbkHandler);
+ }
+
+ if (config.ProducerType == ProducerTypes.Sync)
+ {
+ return SyncProducerPool<TData>.CreateSyncPool(config, serializer, cbkHandler);
+ }
+
+ throw new InvalidOperationException("Not supported producer type " + config.ProducerType);
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ProducerPool<TData>"/> class.
+ /// </summary>
+ /// <param name="config">The config.</param>
+ /// <param name="serializer">The serializer.</param>
+ /// <remarks>
+ /// Should be used for testing purpose only
+ /// </remarks>
+ protected ProducerPool(
+ ProducerConfig config,
+ IEncoder<TData> serializer)
+ {
+ Guard.Assert<ArgumentNullException>(() => config != null);
+ Guard.Assert<ArgumentNullException>(() => serializer != null);
+
+ this.Config = config;
+ this.Serializer = serializer;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ProducerPool<TData>"/> class.
+ /// </summary>
+ /// <param name="config">
+ /// The config.
+ /// </param>
+ /// <param name="serializer">
+ /// The serializer.
+ /// </param>
+ /// <param name="callbackHandler">
+ /// The callback invoked after new broker is added.
+ /// </param>
+ protected ProducerPool(
+ ProducerConfig config,
+ IEncoder<TData> serializer,
+ ICallbackHandler callbackHandler)
+ {
+ Guard.Assert<ArgumentNullException>(() => config != null);
+ Guard.Assert<ArgumentNullException>(() => serializer != null);
+
+ this.Config = config;
+ this.Serializer = serializer;
+ this.CallbackHandler = callbackHandler;
+ }
+
+ protected ProducerConfig Config { get; private set; }
+
+ protected IEncoder<TData> Serializer { get; private set; }
+
+ protected ICallbackHandler CallbackHandler { get; private set; }
+
+ /// <summary>
+ /// Add a new producer, either synchronous or asynchronous, to the pool
+ /// </summary>
+ /// <param name="broker">The broker informations.</param>
+ public abstract void AddProducer(Broker broker);
+
+ /// <summary>
+ /// Selects either a synchronous or an asynchronous producer, for
+ /// the specified broker id and calls the send API on the selected
+ /// producer to publish the data to the specified broker partition.
+ /// </summary>
+ /// <param name="poolData">The producer pool request object.</param>
+ /// <remarks>
+ /// Used for single-topic request
+ /// </remarks>
+ public void Send(ProducerPoolData<TData> poolData)
+ {
+ Guard.Assert<ArgumentNullException>(() => poolData != null);
+ this.Send(new[] { poolData });
+ }
+
+ /// <summary>
+ /// Selects either a synchronous or an asynchronous producer, for
+ /// the specified broker id and calls the send API on the selected
+ /// producer to publish the data to the specified broker partition.
+ /// </summary>
+ /// <param name="poolData">The producer pool request object.</param>
+ /// <remarks>
+ /// Used for multi-topic request
+ /// </remarks>
+ public abstract void Send(IEnumerable<ProducerPoolData<TData>> poolData);
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPoolData.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPoolData.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPoolData.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPoolData.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers
+{
+ using System.Collections.Generic;
+ using Kafka.Client.Cluster;
+
+ /// <summary>
+ /// Encapsulates data to be send on chosen partition
+ /// </summary>
+ /// <typeparam name="TData">
+ /// Type of data
+ /// </typeparam>
+ internal class ProducerPoolData<TData>
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ProducerPoolData{TData}"/> class.
+ /// </summary>
+ /// <param name="topic">
+ /// The topic.
+ /// </param>
+ /// <param name="bidPid">
+ /// The chosen partition.
+ /// </param>
+ /// <param name="data">
+ /// The data.
+ /// </param>
+ public ProducerPoolData(string topic, Partition bidPid, IEnumerable<TData> data)
+ {
+ this.Topic = topic;
+ this.BidPid = bidPid;
+ this.Data = data;
+ }
+
+ /// <summary>
+ /// Gets the topic.
+ /// </summary>
+ public string Topic { get; private set; }
+
+ /// <summary>
+ /// Gets the chosen partition.
+ /// </summary>
+ public Partition BidPid { get; private set; }
+
+ /// <summary>
+ /// Gets the data.
+ /// </summary>
+ public IEnumerable<TData> Data { get; private set; }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerTypes.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerTypes.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerTypes.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerTypes.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers
+{
+ /// <summary>
+ /// Type of producer
+ /// </summary>
+ public enum ProducerTypes
+ {
+ Unknow = 0,
+ Sync = 1,
+ Async = 2
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers.Sync
+{
+ using System.Collections.Generic;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Requests;
+
+ /// <summary>
+ /// Sends messages encapsulated in request to Kafka server synchronously
+ /// </summary>
+ public interface ISyncProducer
+ {
+ /// <summary>
+ /// Constructs producer request and sends it to given broker partition synchronously
+ /// </summary>
+ /// <param name="topic">
+ /// The topic.
+ /// </param>
+ /// <param name="partition">
+ /// The partition.
+ /// </param>
+ /// <param name="messages">
+ /// The list of messages messages.
+ /// </param>
+ void Send(string topic, int partition, IEnumerable<Message> messages);
+
+ /// <summary>
+ /// Sends request to Kafka server synchronously
+ /// </summary>
+ /// <param name="request">
+ /// The request.
+ /// </param>
+ void Send(ProducerRequest request);
+
+ /// <summary>
+ /// Sends the data to a multiple topics on Kafka server synchronously
+ /// </summary>
+ /// <param name="requests">
+ /// The requests.
+ /// </param>
+ void MultiSend(IEnumerable<ProducerRequest> requests);
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers.Sync
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Requests;
+ using Kafka.Client.Utils;
+
+ /// <summary>
+ /// Sends messages encapsulated in request to Kafka server synchronously
+ /// </summary>
+ public class SyncProducer : ISyncProducer
+ {
+ private readonly SyncProducerConfig config;
+
+ /// <summary>
+ /// Gets producer config
+ /// </summary>
+ public SyncProducerConfig Config
+ {
+ get { return config; }
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="SyncProducer"/> class.
+ /// </summary>
+ /// <param name="config">
+ /// The producer config.
+ /// </param>
+ public SyncProducer(SyncProducerConfig config)
+ {
+ Guard.Assert<ArgumentNullException>(() => config != null);
+ this.config = config;
+ }
+
+ /// <summary>
+ /// Constructs producer request and sends it to given broker partition synchronously
+ /// </summary>
+ /// <param name="topic">
+ /// The topic.
+ /// </param>
+ /// <param name="partition">
+ /// The partition.
+ /// </param>
+ /// <param name="messages">
+ /// The list of messages messages.
+ /// </param>
+ public void Send(string topic, int partition, IEnumerable<Message> messages)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(topic));
+ Guard.Assert<ArgumentNullException>(() => messages != null);
+ Guard.Assert<ArgumentNullException>(
+ () => messages.All(x => x != null));
+ Guard.Assert<ArgumentOutOfRangeException>(
+ () => messages.All(
+ x => x.PayloadSize <= this.Config.MaxMessageSize));
+
+ this.Send(new ProducerRequest(topic, partition, messages));
+ }
+
+ /// <summary>
+ /// Sends request to Kafka server synchronously
+ /// </summary>
+ /// <param name="request">
+ /// The request.
+ /// </param>
+ public void Send(ProducerRequest request)
+ {
+ Guard.Assert<ArgumentNullException>(() => request != null);
+ using (var conn = new KafkaConnection(this.config.Host, this.config.Port))
+ {
+ conn.Write(request);
+ }
+ }
+
+ /// <summary>
+ /// Sends the data to a multiple topics on Kafka server synchronously
+ /// </summary>
+ /// <param name="requests">
+ /// The requests.
+ /// </param>
+ public void MultiSend(IEnumerable<ProducerRequest> requests)
+ {
+ Guard.Assert<ArgumentNullException>(() => requests != null);
+ Guard.Assert<ArgumentNullException>(
+ () => requests.All(
+ x => x != null && x.MessageSet != null && x.MessageSet.Messages != null));
+ Guard.Assert<ArgumentNullException>(
+ () => requests.All(
+ x => x.MessageSet.Messages.All(
+ y => y != null && y.PayloadSize <= this.Config.MaxMessageSize)));
+
+ var multiRequest = new MultiProducerRequest(requests);
+ using (var conn = new KafkaConnection(this.config.Host, this.config.Port))
+ {
+ conn.Write(multiRequest);
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers.Sync
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Globalization;
+ using System.Linq;
+ using System.Reflection;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Cluster;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Producers.Async;
+ using Kafka.Client.Requests;
+ using Kafka.Client.Serialization;
+ using Kafka.Client.Utils;
+ using log4net;
+
+ /// <summary>
+ /// Pool of synchronous producers used by high-level API
+ /// </summary>
+ /// <typeparam name="TData">The type of the data.</typeparam>
+ internal class SyncProducerPool<TData> : ProducerPool<TData>
+ where TData : class
+ {
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+ private readonly IDictionary<int, ISyncProducer> syncProducers;
+
+ /// <summary>
+ /// Factory method used to instantiating synchronous producer pool
+ /// </summary>
+ /// <param name="config">
+ /// The synchronous producer pool configuration.
+ /// </param>
+ /// <param name="serializer">
+ /// The serializer.
+ /// </param>
+ /// <returns>
+ /// Instantiated synchronous producer pool
+ /// </returns>
+ public static SyncProducerPool<TData> CreateSyncPool(ProducerConfig config, IEncoder<TData> serializer)
+ {
+ return new SyncProducerPool<TData>(config, serializer);
+ }
+
+ /// <summary>
+ /// Factory method used to instantiating synchronous producer pool
+ /// </summary>
+ /// <param name="config">
+ /// The synchronous producer pool configuration.
+ /// </param>
+ /// <param name="serializer">
+ /// The serializer.
+ /// </param>
+ /// <param name="callbackHandler">
+ /// The callback invoked after new broker is added.
+ /// </param>
+ /// <returns>
+ /// Instantiated synchronous producer pool
+ /// </returns>
+ public static SyncProducerPool<TData> CreateSyncPool(ProducerConfig config, IEncoder<TData> serializer, ICallbackHandler callbackHandler)
+ {
+ return new SyncProducerPool<TData>(config, serializer, callbackHandler);
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="SyncProducerPool{TData}"/> class.
+ /// </summary>
+ /// <param name="config">
+ /// The synchronous producer pool configuration.
+ /// </param>
+ /// <param name="serializer">
+ /// The serializer.
+ /// </param>
+ /// <param name="syncProducers">
+ /// The list of synchronous producers.
+ /// </param>
+ /// <param name="cbkHandler">
+ /// The callback invoked after new broker is added.
+ /// </param>
+ /// <remarks>
+ /// Should be used for testing purpose only
+ /// </remarks>
+ private SyncProducerPool(
+ ProducerConfig config,
+ IEncoder<TData> serializer,
+ IDictionary<int, ISyncProducer> syncProducers,
+ ICallbackHandler cbkHandler)
+ : base(config, serializer, cbkHandler)
+ {
+ this.syncProducers = syncProducers;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="SyncProducerPool{TData}"/> class.
+ /// </summary>
+ /// <param name="config">
+ /// The synchronous producer pool configuration.
+ /// </param>
+ /// <param name="serializer">
+ /// The serializer.
+ /// </param>
+ /// <param name="cbkHandler">
+ /// The callback invoked after new broker is added.
+ /// </param>
+ /// <remarks>
+ /// Should be used for testing purpose only
+ /// </remarks>
+ private SyncProducerPool(
+ ProducerConfig config,
+ IEncoder<TData> serializer,
+ ICallbackHandler cbkHandler)
+ : this(config, serializer, new Dictionary<int, ISyncProducer>(), cbkHandler)
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="SyncProducerPool{TData}"/> class.
+ /// </summary>
+ /// <param name="config">
+ /// The synchronous producer pool configuration.
+ /// </param>
+ /// <param name="serializer">
+ /// The serializer.
+ /// </param>
+ private SyncProducerPool(ProducerConfig config, IEncoder<TData> serializer)
+ : this(
+ config,
+ serializer,
+ new Dictionary<int, ISyncProducer>(),
+ ReflectionHelper.Instantiate<ICallbackHandler>(config.CallbackHandler))
+ {
+ }
+
+ /// <summary>
+ /// Selects a synchronous producer, for
+ /// the specified broker id and calls the send API on the selected
+ /// producer to publish the data to the specified broker partition.
+ /// </summary>
+ /// <param name="poolData">The producer pool request object.</param>
+ /// <remarks>
+ /// Used for multi-topic request
+ /// </remarks>
+ public override void Send(IEnumerable<ProducerPoolData<TData>> poolData)
+ {
+ Guard.Assert<ArgumentNullException>(() => poolData != null);
+ Dictionary<int, List<ProducerPoolData<TData>>> distinctBrokers = poolData.GroupBy(
+ x => x.BidPid.BrokerId, x => x)
+ .ToDictionary(x => x.Key, x => x.ToList());
+ foreach (var broker in distinctBrokers)
+ {
+ Logger.DebugFormat(CultureInfo.CurrentCulture, "Fetching sync producer for broker id: {0}", broker.Key);
+ ISyncProducer producer = this.syncProducers[broker.Key];
+ IEnumerable<ProducerRequest> requests = broker.Value.Select(x => new ProducerRequest(
+ x.Topic,
+ x.BidPid.PartId,
+ new BufferedMessageSet(x.Data.Select(y => this.Serializer.ToMessage(y)))));
+ Logger.DebugFormat(CultureInfo.CurrentCulture, "Sending message to broker {0}", broker.Key);
+ if (requests.Count() > 1)
+ {
+ producer.MultiSend(requests);
+ }
+ else
+ {
+ producer.Send(requests.First());
+ }
+ }
+ }
+
+ /// <summary>
+ /// Add a new synchronous producer to the pool
+ /// </summary>
+ /// <param name="broker">The broker informations.</param>
+ public override void AddProducer(Broker broker)
+ {
+ Guard.Assert<ArgumentNullException>(() => broker != null);
+ var syncConfig = new SyncProducerConfig
+ {
+ Host = broker.Host,
+ Port = broker.Port,
+ BufferSize = this.Config.BufferSize,
+ ConnectTimeout = this.Config.ConnectTimeout,
+ ReconnectInterval = this.Config.ReconnectInterval
+ };
+ var syncProducer = new SyncProducer(syncConfig);
+ Logger.InfoFormat(
+ CultureInfo.CurrentCulture,
+ "Creating sync producer for broker id = {0} at {1}:{2}",
+ broker.Id,
+ broker.Host,
+ broker.Port);
+ this.syncProducers.Add(broker.Id, syncProducer);
+ }
+ }
+}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs Wed Sep 21 19:17:19 2011
@@ -1,36 +1,18 @@
-using System.Reflection;
-using System.Runtime.CompilerServices;
-using System.Runtime.InteropServices;
-
-// General Information about an assembly is controlled through the following
-// set of attributes. Change these attribute values to modify the information
-// associated with an assembly.
-[assembly: AssemblyTitle("Kafka.Client")]
-[assembly: AssemblyDescription("")]
-[assembly: AssemblyConfiguration("")]
-[assembly: AssemblyCompany("Microsoft")]
-[assembly: AssemblyProduct("Kafka.Client")]
-[assembly: AssemblyCopyright("Copyright © Microsoft 2011")]
-[assembly: AssemblyTrademark("")]
-[assembly: AssemblyCulture("")]
-
-// Setting ComVisible to false makes the types in this assembly not visible
-// to COM components. If you need to access a type in this assembly from
-// COM, set the ComVisible attribute to true on that type.
-[assembly: ComVisible(false)]
-
-// The following GUID is for the ID of the typelib if this project is exposed to COM
-[assembly: Guid("93d702e5-9998-49a8-8c16-5b04b3ba55c1")]
-
-// Version information for an assembly consists of the following four values:
-//
-// Major Version
-// Minor Version
-// Build Number
-// Revision
-//
-// You can specify all the values or you can default the Build and Revision Numbers
-// by using the '*' as shown below:
-// [assembly: AssemblyVersion("1.0.*")]
-[assembly: AssemblyVersion("1.0.0.0")]
-[assembly: AssemblyFileVersion("1.0.0.0")]
+using System;
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+[assembly: AssemblyTitle("Kafka.Client")]
+[assembly: AssemblyDescription(".NET Client for Kafka")]
+[assembly: AssemblyCompany("ExactTarget")]
+[assembly: AssemblyProduct("Kafka.Client")]
+[assembly: AssemblyCopyright("Copyright © ExactTarget 2011")]
+
+[assembly: ComVisible(false)]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
+[assembly: InternalsVisibleTo("Kafka.Client.Tests")]
+[assembly: InternalsVisibleTo("Kafka.Client.IntegrationTests")]
+[assembly: CLSCompliant(true)]
+
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/RequestContext.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/RequestContext.cs?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/RequestContext.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/RequestContext.cs Wed Sep 21 19:17:19 2011
@@ -1,36 +1,53 @@
-using System.Net.Sockets;
-
-namespace Kafka.Client
-{
- /// <summary>
- /// The context of a request made to Kafka.
- /// </summary>
- /// <typeparam name="T">
- /// Must be of type <see cref="AbstractRequest"/> and represents the type of request
- /// sent to Kafka.
- /// </typeparam>
- public class RequestContext<T> where T : AbstractRequest
- {
- /// <summary>
- /// Initializes a new instance of the RequestContext class.
- /// </summary>
- /// <param name="networkStream">The network stream that sent the message.</param>
- /// <param name="request">The request sent over the stream.</param>
- public RequestContext(NetworkStream networkStream, T request)
- {
- NetworkStream = networkStream;
- Request = request;
- }
-
- /// <summary>
- /// Gets the <see cref="NetworkStream"/> instance of the request.
- /// </summary>
- public NetworkStream NetworkStream { get; private set; }
-
- /// <summary>
- /// Gets the <see cref="FetchRequest"/> or <see cref="ProducerRequest"/> object
- /// associated with the <see cref="RequestContext"/>.
- /// </summary>
- public T Request { get; private set; }
- }
-}
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client
+{
+ using System.Net.Sockets;
+ using Kafka.Client.Requests;
+
+ /// <summary>
+ /// The context of a request made to Kafka.
+ /// </summary>
+ /// <typeparam name="T">
+ /// Must be of type <see cref="AbstractRequest"/> and represents the type of request
+ /// sent to Kafka.
+ /// </typeparam>
+ public class RequestContext<T> where T : AbstractRequest
+ {
+ /// <summary>
+ /// Initializes a new instance of the RequestContext class.
+ /// </summary>
+ /// <param name="networkStream">The network stream that sent the message.</param>
+ /// <param name="request">The request sent over the stream.</param>
+ public RequestContext(NetworkStream networkStream, T request)
+ {
+ NetworkStream = networkStream;
+ Request = request;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="NetworkStream"/> instance of the request.
+ /// </summary>
+ public NetworkStream NetworkStream { get; private set; }
+
+ /// <summary>
+ /// Gets the <see cref="FetchRequest"/> or <see cref="ProducerRequest"/> object
+ /// associated with the <see cref="RequestContext"/>.
+ /// </summary>
+ public T Request { get; private set; }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/AbstractRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/AbstractRequest.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/AbstractRequest.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/AbstractRequest.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Requests
+{
+ using System.IO;
+ using System.Text;
+
+ /// <summary>
+ /// Base request to make to Kafka.
+ /// </summary>
+ public abstract class AbstractRequest
+ {
+ public const string DefaultEncoding = "UTF-8";
+ public const byte DefaultRequestSizeSize = 4;
+ public const byte DefaultRequestIdSize = 2;
+ public const short DefaultTopicLengthIfNonePresent = 2;
+
+ /// <summary>
+ /// Gets or sets the topic to publish to.
+ /// </summary>
+ public string Topic { get; set; }
+
+ /// <summary>
+ /// Gets or sets the partition to publish to.
+ /// </summary>
+ public int Partition { get; set; }
+
+ public MemoryStream RequestBuffer { get; protected set; }
+
+ public abstract RequestTypes RequestType { get; }
+
+ protected short RequestTypeId
+ {
+ get
+ {
+ return (short)this.RequestType;
+ }
+ }
+
+ protected static short GetTopicLength(string topic, string encoding = DefaultEncoding)
+ {
+ Encoding encoder = Encoding.GetEncoding(encoding);
+ return string.IsNullOrEmpty(topic) ? DefaultTopicLengthIfNonePresent : (short)encoder.GetByteCount(topic);
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Requests
+{
+ using System;
+ using System.Globalization;
+ using System.IO;
+ using System.Text;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Serialization;
+ using Kafka.Client.Utils;
+
+ /// <summary>
+ /// Constructs a request to send to Kafka.
+ /// </summary>
+ public class FetchRequest : AbstractRequest, IWritable
+ {
+ /// <summary>
+ /// Maximum size.
+ /// </summary>
+ private static readonly int DefaultMaxSize = 1048576;
+ public const byte DefaultTopicSizeSize = 2;
+ public const byte DefaultPartitionSize = 4;
+ public const byte DefaultOffsetSize = 8;
+ public const byte DefaultMaxSizeSize = 4;
+ public const byte DefaultHeaderSize = DefaultRequestSizeSize + DefaultTopicSizeSize + DefaultPartitionSize + DefaultRequestIdSize + DefaultOffsetSize + DefaultMaxSizeSize;
+ public const byte DefaultHeaderAsPartOfMultirequestSize = DefaultTopicSizeSize + DefaultPartitionSize + DefaultOffsetSize + DefaultMaxSizeSize;
+
+ public static int GetRequestLength(string topic, string encoding = DefaultEncoding)
+ {
+ short topicLength = GetTopicLength(topic, encoding);
+ return topicLength + DefaultHeaderSize;
+ }
+
+ public static int GetRequestAsPartOfMultirequestLength(string topic, string encoding = DefaultEncoding)
+ {
+ short topicLength = GetTopicLength(topic, encoding);
+ return topicLength + DefaultHeaderAsPartOfMultirequestSize;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the FetchRequest class.
+ /// </summary>
+ public FetchRequest()
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the FetchRequest class.
+ /// </summary>
+ /// <param name="topic">The topic to publish to.</param>
+ /// <param name="partition">The partition to publish to.</param>
+ /// <param name="offset">The offset in the topic/partition to retrieve from.</param>
+ public FetchRequest(string topic, int partition, long offset)
+ : this(topic, partition, offset, DefaultMaxSize)
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the FetchRequest class.
+ /// </summary>
+ /// <param name="topic">The topic to publish to.</param>
+ /// <param name="partition">The partition to publish to.</param>
+ /// <param name="offset">The offset in the topic/partition to retrieve from.</param>
+ /// <param name="maxSize">The maximum size.</param>
+ public FetchRequest(string topic, int partition, long offset, int maxSize)
+ {
+ Topic = topic;
+ Partition = partition;
+ Offset = offset;
+ MaxSize = maxSize;
+
+ int length = GetRequestLength(topic, DefaultEncoding);
+ this.RequestBuffer = new BoundedBuffer(length);
+ this.WriteTo(this.RequestBuffer);
+ }
+
+ /// <summary>
+ /// Gets or sets the offset to request.
+ /// </summary>
+ public long Offset { get; set; }
+
+ /// <summary>
+ /// Gets or sets the maximum size to pass in the request.
+ /// </summary>
+ public int MaxSize { get; set; }
+
+ public override RequestTypes RequestType
+ {
+ get
+ {
+ return RequestTypes.Fetch;
+ }
+ }
+
+ /// <summary>
+ /// Writes content into given stream
+ /// </summary>
+ /// <param name="output">
+ /// The output stream.
+ /// </param>
+ public void WriteTo(MemoryStream output)
+ {
+ Guard.Assert<ArgumentNullException>(() => output != null);
+
+ using (var writer = new KafkaBinaryWriter(output))
+ {
+ writer.Write(this.RequestBuffer.Capacity - DefaultRequestSizeSize);
+ writer.Write(this.RequestTypeId);
+ this.WriteTo(writer);
+ }
+ }
+
+ /// <summary>
+ /// Writes content into given writer
+ /// </summary>
+ /// <param name="writer">
+ /// The writer.
+ /// </param>
+ public void WriteTo(KafkaBinaryWriter writer)
+ {
+ Guard.Assert<ArgumentNullException>(() => writer != null);
+
+ writer.WriteTopic(this.Topic, DefaultEncoding);
+ writer.Write(this.Partition);
+ writer.Write(this.Offset);
+ writer.Write(this.MaxSize);
+ }
+
+ public override string ToString()
+ {
+ return String.Format(
+ CultureInfo.CurrentCulture,
+ "topic: {0}, part: {1}, offset: {2}, maxSize: {3}",
+ this.Topic,
+ this.Partition,
+ this.Offset,
+ this.MaxSize);
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Requests
+{
+ using System;
+ using System.Collections.Generic;
+ using System.IO;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Serialization;
+ using Kafka.Client.Utils;
+
+ /// <summary>
+ /// Constructs a multi-consumer request to send to Kafka.
+ /// </summary>
+ public class MultiFetchRequest : AbstractRequest, IWritable
+ {
+ public const byte DefaultNumberOfRequestsSize = 2;
+
+ public const byte DefaultHeaderSize =
+ DefaultRequestSizeSize + DefaultRequestIdSize + DefaultNumberOfRequestsSize;
+
+ public static int GetRequestLength(IList<FetchRequest> requests, string encoding = DefaultEncoding)
+ {
+ int requestsLength = 0;
+ foreach (var request in requests)
+ {
+ requestsLength += FetchRequest.GetRequestAsPartOfMultirequestLength(request.Topic, encoding);
+ }
+
+ return requestsLength + DefaultHeaderSize;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the MultiFetchRequest class.
+ /// </summary>
+ /// <param name="requests">Requests to package up and batch.</param>
+ public MultiFetchRequest(IList<FetchRequest> requests)
+ {
+ Guard.Assert<ArgumentNullException>(() => requests != null);
+ ConsumerRequests = requests;
+ int length = GetRequestLength(requests, DefaultEncoding);
+ this.RequestBuffer = new BoundedBuffer(length);
+ this.WriteTo(this.RequestBuffer);
+ }
+
+ /// <summary>
+ /// Gets or sets the consumer requests to be batched into this multi-request.
+ /// </summary>
+ public IList<FetchRequest> ConsumerRequests { get; set; }
+
+ public override RequestTypes RequestType
+ {
+ get
+ {
+ return RequestTypes.MultiFetch;
+ }
+ }
+
+ /// <summary>
+ /// Writes content into given stream
+ /// </summary>
+ /// <param name="output">
+ /// The output stream.
+ /// </param>
+ public void WriteTo(MemoryStream output)
+ {
+ Guard.Assert<ArgumentNullException>(() => output != null);
+
+ using (var writer = new KafkaBinaryWriter(output))
+ {
+ writer.Write(this.RequestBuffer.Capacity - DefaultRequestSizeSize);
+ writer.Write(this.RequestTypeId);
+ writer.Write((short)this.ConsumerRequests.Count);
+ this.WriteTo(writer);
+ }
+ }
+
+ /// <summary>
+ /// Writes content into given writer
+ /// </summary>
+ /// <param name="writer">
+ /// The writer.
+ /// </param>
+ public void WriteTo(KafkaBinaryWriter writer)
+ {
+ Guard.Assert<ArgumentNullException>(() => writer != null);
+
+ foreach (var consumerRequest in ConsumerRequests)
+ {
+ consumerRequest.WriteTo(writer);
+ }
+ }
+ }
+}