You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/10/18 19:52:16 UTC
svn commit: r1185772 [2/4] - in
/incubator/kafka/trunk/clients/csharp/src/Kafka: Kafka.Client/
Kafka.Client/Cfg/ Kafka.Client/Consumers/ Kafka.Client/Exceptions/
Kafka.Client/Messages/ Kafka.Client/Producers/
Kafka.Client/Producers/Async/ Kafka.Client/...
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducerPool.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducerPool.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducerPool.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducerPool.cs Tue Oct 18 17:52:13 2011
@@ -39,6 +39,7 @@ namespace Kafka.Client.Producers.Async
{
private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private readonly IDictionary<int, IAsyncProducer> asyncProducers;
+ private volatile bool disposed;
/// <summary>
/// Factory method used to instantiating asynchronous producer pool
@@ -56,7 +57,7 @@ namespace Kafka.Client.Producers.Async
/// Instantiated asynchronous producer pool
/// </returns>
public static AsyncProducerPool<TData> CreateAsyncPool(
- ProducerConfig config,
+ ProducerConfiguration config,
IEncoder<TData> serializer,
ICallbackHandler cbkHandler)
{
@@ -76,7 +77,7 @@ namespace Kafka.Client.Producers.Async
/// Instantiated asynchronous producer pool
/// </returns>
public static AsyncProducerPool<TData> CreateAsyncPool(
- ProducerConfig config,
+ ProducerConfiguration config,
IEncoder<TData> serializer)
{
return new AsyncProducerPool<TData>(config, serializer);
@@ -101,7 +102,7 @@ namespace Kafka.Client.Producers.Async
/// Should be used for testing purpose only
/// </remarks>
private AsyncProducerPool(
- ProducerConfig config,
+ ProducerConfiguration config,
IEncoder<TData> serializer,
IDictionary<int, IAsyncProducer> asyncProducers,
ICallbackHandler cbkHandler)
@@ -123,7 +124,7 @@ namespace Kafka.Client.Producers.Async
/// The callback invoked after new broker is added.
/// </param>
private AsyncProducerPool(
- ProducerConfig config,
+ ProducerConfiguration config,
IEncoder<TData> serializer,
ICallbackHandler cbkHandler)
: this(config, serializer, new Dictionary<int, IAsyncProducer>(), cbkHandler)
@@ -139,12 +140,12 @@ namespace Kafka.Client.Producers.Async
/// <param name="serializer">
/// The serializer.
/// </param>
- private AsyncProducerPool(ProducerConfig config, IEncoder<TData> serializer)
+ private AsyncProducerPool(ProducerConfiguration config, IEncoder<TData> serializer)
: this(
config,
serializer,
new Dictionary<int, IAsyncProducer>(),
- ReflectionHelper.Instantiate<ICallbackHandler>(config.CallbackHandler))
+ ReflectionHelper.Instantiate<ICallbackHandler>(config.CallbackHandlerClass))
{
}
@@ -159,7 +160,8 @@ namespace Kafka.Client.Producers.Async
/// </remarks>
public override void Send(IEnumerable<ProducerPoolData<TData>> poolData)
{
- Guard.Assert<ArgumentNullException>(() => poolData != null);
+ this.EnsuresNotDisposed();
+ Guard.NotNull(poolData, "poolData");
Dictionary<int, List<ProducerPoolData<TData>>> distinctBrokers = poolData.GroupBy(
x => x.BidPid.BrokerId, x => x)
.ToDictionary(x => x.Key, x => x.ToList());
@@ -184,14 +186,10 @@ namespace Kafka.Client.Producers.Async
/// <param name="broker">The broker informations.</param>
public override void AddProducer(Broker broker)
{
- Guard.Assert<ArgumentNullException>(() => broker != null);
- var asyncConfig = new AsyncProducerConfig
+ this.EnsuresNotDisposed();
+ Guard.NotNull(broker, "broker");
+ var asyncConfig = new AsyncProducerConfiguration(this.Config, broker.Id, broker.Host, broker.Port)
{
- Host = broker.Host,
- Port = broker.Port,
- QueueTime = this.Config.QueueTime,
- QueueSize = this.Config.QueueSize,
- BatchSize = this.Config.BatchSize,
SerializerClass = this.Config.SerializerClass
};
var asyncProducer = new AsyncProducer(asyncConfig, this.CallbackHandler);
@@ -203,5 +201,24 @@ namespace Kafka.Client.Producers.Async
broker.Port);
this.asyncProducers.Add(broker.Id, asyncProducer);
}
+
+ protected override void Dispose(bool disposing)
+ {
+ if (!disposing)
+ {
+ return;
+ }
+
+ if (this.disposed)
+ {
+ return;
+ }
+
+ this.disposed = true;
+ foreach (var asyncProducer in this.asyncProducers.Values)
+ {
+ asyncProducer.Dispose();
+ }
+ }
}
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/IAsyncProducer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/IAsyncProducer.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/IAsyncProducer.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/IAsyncProducer.cs Tue Oct 18 17:52:13 2011
@@ -17,6 +17,7 @@
namespace Kafka.Client.Producers.Async
{
+ using System;
using System.Collections.Generic;
using Kafka.Client.Messages;
using Kafka.Client.Requests;
@@ -24,7 +25,7 @@ namespace Kafka.Client.Producers.Async
/// <summary>
/// Sends messages encapsulated in request to Kafka server asynchronously
/// </summary>
- public interface IAsyncProducer
+ public interface IAsyncProducer : IDisposable
{
/// <summary>
/// Sends request to Kafka server asynchronously
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducerPool.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducerPool.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducerPool.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducerPool.cs Tue Oct 18 17:52:13 2011
@@ -17,6 +17,7 @@
namespace Kafka.Client.Producers
{
+ using System;
using System.Collections.Generic;
using Kafka.Client.Cluster;
@@ -24,7 +25,7 @@ namespace Kafka.Client.Producers
/// Pool of producers used by producer high-level API
/// </summary>
/// <typeparam name="TData">The type of the data.</typeparam>
- internal interface IProducerPool<TData>
+ internal interface IProducerPool<TData> : IDisposable
{
/// <summary>
/// Selects either a synchronous or an asynchronous producer, for
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ConfigBrokerPartitionInfo.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ConfigBrokerPartitionInfo.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ConfigBrokerPartitionInfo.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ConfigBrokerPartitionInfo.cs Tue Oct 18 17:52:13 2011
@@ -32,16 +32,16 @@ namespace Kafka.Client.Producers.Partiti
/// </remarks>
internal class ConfigBrokerPartitionInfo : IBrokerPartitionInfo
{
- private readonly ProducerConfig config;
+ private readonly ProducerConfiguration config;
private IDictionary<int, Broker> brokers;
/// <summary>
/// Initializes a new instance of the <see cref="ConfigBrokerPartitionInfo"/> class.
/// </summary>
/// <param name="config">The config.</param>
- public ConfigBrokerPartitionInfo(ProducerConfig config)
+ public ConfigBrokerPartitionInfo(ProducerConfiguration config)
{
- Guard.Assert<ArgumentNullException>(() => config != null);
+ Guard.NotNull(config, "config");
this.config = config;
this.InitializeBrokers();
}
@@ -67,7 +67,7 @@ namespace Kafka.Client.Producers.Partiti
/// <remarks>Partition ID would be allways 0</remarks>
public SortedSet<Partition> GetBrokerPartitionInfo(string topic)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(topic));
+ Guard.NotNullNorEmpty(topic, "topic");
var partitions = new SortedSet<Partition>();
foreach (var item in this.brokers)
{
@@ -108,13 +108,11 @@ namespace Kafka.Client.Producers.Partiti
}
this.brokers = new Dictionary<int, Broker>();
- string[] brokersInfoList = this.config.BrokerPartitionInfo.Split(',');
- foreach (string item in brokersInfoList)
+ foreach (var item in this.config.Brokers)
{
- var parts = item.Split(':');
- int id = int.Parse(parts[0], CultureInfo.InvariantCulture);
- int port = int.Parse(parts[2], CultureInfo.InvariantCulture);
- this.brokers.Add(id, new Broker(id, parts[1], parts[1], port));
+ this.brokers.Add(
+ item.BrokerId,
+ new Broker(item.BrokerId, item.Host, item.Host, item.Port));
}
}
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/DefaultPartitioner.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/DefaultPartitioner.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/DefaultPartitioner.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/DefaultPartitioner.cs Tue Oct 18 17:52:13 2011
@@ -41,7 +41,7 @@ namespace Kafka.Client.Producers.Partiti
/// </remarks>
public int Partition(TKey key, int numPartitions)
{
- Guard.Assert<ArgumentOutOfRangeException>(() => numPartitions > 0);
+ Guard.Greater(numPartitions, 0, "numPartitions");
return key == null
? Randomizer.Next(numPartitions)
: key.GetHashCode() % numPartitions;
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs Tue Oct 18 17:52:13 2011
@@ -18,6 +18,7 @@
namespace Kafka.Client.Producers.Partitioning
{
using System;
+ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.Reflection;
@@ -66,8 +67,8 @@ namespace Kafka.Client.Producers.Partiti
/// </summary>
/// <param name="config">The config.</param>
/// <param name="callback">The callback invoked when new broker is added.</param>
- public ZKBrokerPartitionInfo(ZKConfig config, Action<int, string, int> callback)
- : this(new ZooKeeperClient(config.ZkConnect, config.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ public ZKBrokerPartitionInfo(ProducerConfiguration config, Action<int, string, int> callback)
+ : this(new ZooKeeperClient(config.ZooKeeper.ZkConnect, config.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
{
this.callback = callback;
}
@@ -93,7 +94,7 @@ namespace Kafka.Client.Producers.Partiti
/// </returns>
public SortedSet<Partition> GetBrokerPartitionInfo(string topic)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(topic));
+ Guard.NotNullNorEmpty(topic, "topic");
this.EnsuresNotDisposed();
SortedSet<Partition> brokerPartitions = null;
@@ -101,11 +102,15 @@ namespace Kafka.Client.Producers.Partiti
{
brokerPartitions = this.topicBrokerPartitions[topic];
}
+ else
+ {
+ this.topicBrokerPartitions.Add(topic, null);
+ }
if (brokerPartitions == null || brokerPartitions.Count == 0)
{
var numBrokerPartitions = this.BootstrapWithExistingBrokers(topic);
- this.topicBrokerPartitions.Add(topic, numBrokerPartitions);
+ this.topicBrokerPartitions[topic] = numBrokerPartitions;
return numBrokerPartitions;
}
@@ -168,7 +173,7 @@ namespace Kafka.Client.Producers.Partiti
return;
}
- this.brokers = new Dictionary<int, Broker>();
+ this.brokers = new ConcurrentDictionary<int, Broker>();
IList<string> brokerIds = this.zkclient.GetChildrenParentMayNotExist(ZooKeeperClient.DefaultBrokerIdsPath);
foreach (var brokerId in brokerIds)
{
@@ -191,7 +196,7 @@ namespace Kafka.Client.Producers.Partiti
return;
}
- this.topicBrokerPartitions = new Dictionary<string, SortedSet<Partition>>();
+ this.topicBrokerPartitions = new ConcurrentDictionary<string, SortedSet<Partition>>();
this.zkclient.MakeSurePersistentPathExists(ZooKeeperClient.DefaultBrokerTopicsPath);
IList<string> topics = this.zkclient.GetChildrenParentMayNotExist(ZooKeeperClient.DefaultBrokerTopicsPath);
foreach (string topic in topics)
@@ -306,7 +311,7 @@ namespace Kafka.Client.Producers.Partiti
/// </remarks>
public void HandleStateChanged(ZooKeeperStateChangedEventArgs args)
{
- Guard.Assert<ArgumentNullException>(() => args != null);
+ Guard.NotNull(args, "args");
Guard.Assert<ArgumentException>(() => args.State != KeeperState.Unknown);
this.EnsuresNotDisposed();
@@ -322,7 +327,7 @@ namespace Kafka.Client.Producers.Partiti
/// </remarks>
public void HandleSessionCreated(ZooKeeperSessionCreatedEventArgs args)
{
- Guard.Assert<ArgumentNullException>(() => args != null);
+ Guard.NotNull(args, "args");
this.EnsuresNotDisposed();
Logger.Debug("ZK expired; release old list of broker partitions for topics ");
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs Tue Oct 18 17:52:13 2011
@@ -40,7 +40,7 @@ namespace Kafka.Client.Producers
/// <remarks>
/// Should be used for testing purpose only.
/// </remarks>
- internal Producer(ProducerConfig config, IPartitioner<string> partitioner, IProducerPool<Message> producerPool, bool populateProducerPool)
+ internal Producer(ProducerConfiguration config, IPartitioner<string> partitioner, IProducerPool<Message> producerPool, bool populateProducerPool)
: base(config, partitioner, producerPool, populateProducerPool)
{
}
@@ -53,7 +53,7 @@ namespace Kafka.Client.Producers
/// Can be used when all config parameters will be specified through the config object
/// and will be instantiated via reflection
/// </remarks>
- public Producer(ProducerConfig config)
+ public Producer(ProducerConfiguration config)
: base(config)
{
}
@@ -71,7 +71,7 @@ namespace Kafka.Client.Producers
/// Can be used to provide pre-instantiated objects for all config parameters
/// that would otherwise be instantiated via reflection.
/// </remarks>
- public Producer(ProducerConfig config, IPartitioner<string> partitioner, IEncoder<Message> encoder, ICallbackHandler callbackHandler)
+ public Producer(ProducerConfiguration config, IPartitioner<string> partitioner, IEncoder<Message> encoder, ICallbackHandler callbackHandler)
: base(config, partitioner, encoder, callbackHandler)
{
}
@@ -88,7 +88,7 @@ namespace Kafka.Client.Producers
/// Can be used to provide pre-instantiated objects for all config parameters
/// that would otherwise be instantiated via reflection.
/// </remarks>
- public Producer(ProducerConfig config, IPartitioner<string> partitioner, IEncoder<Message> encoder)
+ public Producer(ProducerConfiguration config, IPartitioner<string> partitioner, IEncoder<Message> encoder)
: base(config, partitioner, encoder)
{
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs Tue Oct 18 17:52:13 2011
@@ -41,13 +41,13 @@ namespace Kafka.Client.Producers
/// Provides serialization of data through a user-specified encoder, zookeeper based automatic broker discovery
/// and software load balancing through an optionally user-specified partitioner
/// </remarks>
- public class Producer<TKey, TData> : ZooKeeperAwareKafkaClientBase, IProducer<TKey, TData>
+ public class Producer<TKey, TData> : KafkaClientBase, IProducer<TKey, TData>
where TKey : class
where TData : class
{
private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private static readonly Random Randomizer = new Random();
- private readonly ProducerConfig config;
+ private readonly ProducerConfiguration config;
private readonly IProducerPool<TData> producerPool;
private readonly IPartitioner<TKey> partitioner;
private readonly bool populateProducerPool;
@@ -67,19 +67,19 @@ namespace Kafka.Client.Producers
/// Should be used for testing purpose only.
/// </remarks>
internal Producer(
- ProducerConfig config,
+ ProducerConfiguration config,
IPartitioner<TKey> partitioner,
IProducerPool<TData> producerPool,
bool populateProducerPool = true)
- : base(config)
{
- Guard.Assert<ArgumentNullException>(() => config != null);
- Guard.Assert<ArgumentNullException>(() => producerPool != null);
+ Guard.NotNull(config, "config");
+ Guard.NotNull(producerPool, "producerPool");
+
this.config = config;
this.partitioner = partitioner ?? new DefaultPartitioner<TKey>();
this.populateProducerPool = populateProducerPool;
this.producerPool = producerPool;
- if (this.IsZooKeeperEnabled)
+ if (this.config.IsZooKeeperEnabled)
{
this.brokerPartitionInfo = new ZKBrokerPartitionInfo(this.config, this.Callback);
}
@@ -107,13 +107,13 @@ namespace Kafka.Client.Producers
/// Can be used when all config parameters will be specified through the config object
/// and will be instantiated via reflection
/// </remarks>
- public Producer(ProducerConfig config)
+ public Producer(ProducerConfiguration config)
: this(
config,
ReflectionHelper.Instantiate<IPartitioner<TKey>>(config.PartitionerClass),
ProducerPool<TData>.CreatePool(config, ReflectionHelper.Instantiate<IEncoder<TData>>(config.SerializerClass)))
{
- Guard.Assert<ArgumentNullException>(() => config != null);
+ Guard.NotNull(config, "config");
}
/// <summary>
@@ -131,7 +131,7 @@ namespace Kafka.Client.Producers
/// that would otherwise be instantiated via reflection.
/// </remarks>
public Producer(
- ProducerConfig config,
+ ProducerConfiguration config,
IPartitioner<TKey> partitioner,
IEncoder<TData> encoder,
ICallbackHandler callbackHandler)
@@ -140,8 +140,8 @@ namespace Kafka.Client.Producers
partitioner,
ProducerPool<TData>.CreatePool(config, encoder, callbackHandler))
{
- Guard.Assert<ArgumentNullException>(() => config != null);
- Guard.Assert<ArgumentNullException>(() => encoder != null);
+ Guard.NotNull(config, "config");
+ Guard.NotNull(encoder, "encoder");
}
/// <summary>
@@ -157,7 +157,7 @@ namespace Kafka.Client.Producers
/// that would otherwise be instantiated via reflection.
/// </remarks>
public Producer(
- ProducerConfig config,
+ ProducerConfiguration config,
IPartitioner<TKey> partitioner,
IEncoder<TData> encoder)
: this(
@@ -165,8 +165,8 @@ namespace Kafka.Client.Producers
partitioner,
ProducerPool<TData>.CreatePool(config, encoder, null))
{
- Guard.Assert<ArgumentNullException>(() => config != null);
- Guard.Assert<ArgumentNullException>(() => encoder != null);
+ Guard.NotNull(config, "config");
+ Guard.NotNull(encoder, "encoder");
}
/// <summary>
@@ -176,8 +176,9 @@ namespace Kafka.Client.Producers
/// <param name="data">The producer data objects that encapsulate the topic, key and message data.</param>
public void Send(IEnumerable<ProducerData<TKey, TData>> data)
{
- Guard.Assert<ArgumentNullException>(() => data != null);
- Guard.Assert<ArgumentException>(() => data.Count() > 0);
+ Guard.NotNull(data, "data");
+ Guard.Greater(data.Count(), 0, "data");
+
this.EnsuresNotDisposed();
var poolRequests = new List<ProducerPoolData<TData>>();
foreach (var dataItem in data)
@@ -197,10 +198,11 @@ namespace Kafka.Client.Producers
/// <param name="data">The producer data object that encapsulates the topic, key and message data.</param>
public void Send(ProducerData<TKey, TData> data)
{
- Guard.Assert<ArgumentNullException>(() => data != null);
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(data.Topic));
- Guard.Assert<ArgumentNullException>(() => data.Data != null);
- Guard.Assert<ArgumentException>(() => data.Data.Count() > 0);
+ Guard.NotNull(data, "data");
+ Guard.NotNullNorEmpty(data.Topic, "data.Topic");
+ Guard.NotNull(data.Data, "data.Data");
+ Guard.Greater(data.Data.Count(), 0, "data.Data");
+
this.EnsuresNotDisposed();
this.Send(new[] { data });
}
@@ -233,6 +235,11 @@ namespace Kafka.Client.Producers
{
this.brokerPartitionInfo.Dispose();
}
+
+ if (this.producerPool != null)
+ {
+ this.producerPool.Dispose();
+ }
}
catch (Exception exc)
{
@@ -249,8 +256,8 @@ namespace Kafka.Client.Producers
/// <param name="port">The broker port.</param>
private void Callback(int bid, string host, int port)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(host));
- Guard.Assert<ArgumentOutOfRangeException>(() => port > 0);
+ Guard.NotNullNorEmpty(host, "host");
+ Guard.Greater(port, 0, "port");
if (this.populateProducerPool)
{
@@ -270,7 +277,7 @@ namespace Kafka.Client.Producers
/// <returns>Partition Id</returns>
private int GetPartitionId(TKey key, int numPartitions)
{
- Guard.Assert<ArgumentOutOfRangeException>(() => numPartitions > 0);
+ Guard.Greater(numPartitions, 0, "numPartitions");
return key == null
? Randomizer.Next(numPartitions)
: this.partitioner.Partition(key, numPartitions);
@@ -297,7 +304,7 @@ namespace Kafka.Client.Producers
int partitionId = this.GetPartitionId(dataItem.Key, totalNumPartitions);
Partition brokerIdPartition = brokerPartitions.ToList()[partitionId];
Broker brokerInfo = this.brokerPartitionInfo.GetBrokerInfo(brokerIdPartition.BrokerId);
- if (this.IsZooKeeperEnabled)
+ if (this.config.IsZooKeeperEnabled)
{
Logger.DebugFormat(
CultureInfo.CurrentCulture,
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs Tue Oct 18 17:52:13 2011
@@ -33,6 +33,8 @@ namespace Kafka.Client.Producers
internal abstract class ProducerPool<TData> : IProducerPool<TData>
where TData : class
{
+ protected bool Disposed { get; set; }
+
/// <summary>
/// Factory method used to instantiating either,
/// synchronous or asynchronous, producer pool based on configuration.
@@ -46,7 +48,7 @@ namespace Kafka.Client.Producers
/// <returns>
/// Instantiated either, synchronous or asynchronous, producer pool
/// </returns>
- public static ProducerPool<TData> CreatePool(ProducerConfig config, IEncoder<TData> serializer)
+ public static ProducerPool<TData> CreatePool(ProducerConfiguration config, IEncoder<TData> serializer)
{
if (config.ProducerType == ProducerTypes.Async)
{
@@ -78,7 +80,7 @@ namespace Kafka.Client.Producers
/// Instantiated either, synchronous or asynchronous, producer pool
/// </returns>
public static ProducerPool<TData> CreatePool(
- ProducerConfig config,
+ ProducerConfiguration config,
IEncoder<TData> serializer,
ICallbackHandler cbkHandler)
{
@@ -104,11 +106,11 @@ namespace Kafka.Client.Producers
/// Should be used for testing purpose only
/// </remarks>
protected ProducerPool(
- ProducerConfig config,
+ ProducerConfiguration config,
IEncoder<TData> serializer)
{
- Guard.Assert<ArgumentNullException>(() => config != null);
- Guard.Assert<ArgumentNullException>(() => serializer != null);
+ Guard.NotNull(config, "config");
+ Guard.NotNull(serializer, "serializer");
this.Config = config;
this.Serializer = serializer;
@@ -127,19 +129,19 @@ namespace Kafka.Client.Producers
/// The callback invoked after new broker is added.
/// </param>
protected ProducerPool(
- ProducerConfig config,
+ ProducerConfiguration config,
IEncoder<TData> serializer,
ICallbackHandler callbackHandler)
{
- Guard.Assert<ArgumentNullException>(() => config != null);
- Guard.Assert<ArgumentNullException>(() => serializer != null);
+ Guard.NotNull(config, "config");
+ Guard.NotNull(serializer, "serializer");
this.Config = config;
this.Serializer = serializer;
this.CallbackHandler = callbackHandler;
}
- protected ProducerConfig Config { get; private set; }
+ protected ProducerConfiguration Config { get; private set; }
protected IEncoder<TData> Serializer { get; private set; }
@@ -162,7 +164,9 @@ namespace Kafka.Client.Producers
/// </remarks>
public void Send(ProducerPoolData<TData> poolData)
{
- Guard.Assert<ArgumentNullException>(() => poolData != null);
+ this.EnsuresNotDisposed();
+ Guard.NotNull(poolData, "poolData");
+
this.Send(new[] { poolData });
}
@@ -176,5 +180,27 @@ namespace Kafka.Client.Producers
/// Used for multi-topic request
/// </remarks>
public abstract void Send(IEnumerable<ProducerPoolData<TData>> poolData);
+
+ /// <summary>
+ /// Releases all unmanaged and managed resources
+ /// </summary>
+ public void Dispose()
+ {
+ this.Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected abstract void Dispose(bool disposing);
+
+ /// <summary>
+ /// Ensures that object was not disposed
+ /// </summary>
+ protected void EnsuresNotDisposed()
+ {
+ if (this.Disposed)
+ {
+ throw new ObjectDisposedException(this.GetType().Name);
+ }
+ }
}
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs Tue Oct 18 17:52:13 2011
@@ -17,6 +17,7 @@
namespace Kafka.Client.Producers.Sync
{
+ using System;
using System.Collections.Generic;
using Kafka.Client.Messages;
using Kafka.Client.Requests;
@@ -24,7 +25,7 @@ namespace Kafka.Client.Producers.Sync
/// <summary>
/// Sends messages encapsulated in request to Kafka server synchronously
/// </summary>
- public interface ISyncProducer
+ public interface ISyncProducer : IDisposable
{
/// <summary>
/// Constructs producer request and sends it to given broker partition synchronously
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs Tue Oct 18 17:52:13 2011
@@ -30,15 +30,14 @@ namespace Kafka.Client.Producers.Sync
/// </summary>
public class SyncProducer : ISyncProducer
{
- private readonly SyncProducerConfig config;
+ private readonly KafkaConnection connection;
+
+ private volatile bool disposed;
/// <summary>
/// Gets producer config
/// </summary>
- public SyncProducerConfig Config
- {
- get { return config; }
- }
+ public SyncProducerConfiguration Config { get; private set; }
/// <summary>
/// Initializes a new instance of the <see cref="SyncProducer"/> class.
@@ -46,10 +45,15 @@ namespace Kafka.Client.Producers.Sync
/// <param name="config">
/// The producer config.
/// </param>
- public SyncProducer(SyncProducerConfig config)
+ public SyncProducer(SyncProducerConfiguration config)
{
- Guard.Assert<ArgumentNullException>(() => config != null);
- this.config = config;
+ Guard.NotNull(config, "config");
+ this.Config = config;
+ this.connection = new KafkaConnection(
+ this.Config.Host,
+ this.Config.Port,
+ config.BufferSize,
+ config.SocketTimeout);
}
/// <summary>
@@ -66,14 +70,13 @@ namespace Kafka.Client.Producers.Sync
/// </param>
public void Send(string topic, int partition, IEnumerable<Message> messages)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(topic));
- Guard.Assert<ArgumentNullException>(() => messages != null);
- Guard.Assert<ArgumentNullException>(
- () => messages.All(x => x != null));
+ Guard.NotNullNorEmpty(topic, "topic");
+ Guard.NotNull(messages, "messages");
+ Guard.AllNotNull(messages, "messages.items");
Guard.Assert<ArgumentOutOfRangeException>(
() => messages.All(
x => x.PayloadSize <= this.Config.MaxMessageSize));
-
+ this.EnsuresNotDisposed();
this.Send(new ProducerRequest(topic, partition, messages));
}
@@ -85,11 +88,8 @@ namespace Kafka.Client.Producers.Sync
/// </param>
public void Send(ProducerRequest request)
{
- Guard.Assert<ArgumentNullException>(() => request != null);
- using (var conn = new KafkaConnection(this.config.Host, this.config.Port))
- {
- conn.Write(request);
- }
+ this.EnsuresNotDisposed();
+ this.connection.Write(request);
}
/// <summary>
@@ -100,7 +100,7 @@ namespace Kafka.Client.Producers.Sync
/// </param>
public void MultiSend(IEnumerable<ProducerRequest> requests)
{
- Guard.Assert<ArgumentNullException>(() => requests != null);
+ Guard.NotNull(requests, "requests");
Guard.Assert<ArgumentNullException>(
() => requests.All(
x => x != null && x.MessageSet != null && x.MessageSet.Messages != null));
@@ -108,11 +108,47 @@ namespace Kafka.Client.Producers.Sync
() => requests.All(
x => x.MessageSet.Messages.All(
y => y != null && y.PayloadSize <= this.Config.MaxMessageSize)));
-
+ this.EnsuresNotDisposed();
var multiRequest = new MultiProducerRequest(requests);
- using (var conn = new KafkaConnection(this.config.Host, this.config.Port))
+ this.connection.Write(multiRequest);
+ }
+
+ /// <summary>
+ /// Releases all unmanaged and managed resources
+ /// </summary>
+ public void Dispose()
+ {
+ this.Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!disposing)
+ {
+ return;
+ }
+
+ if (this.disposed)
+ {
+ return;
+ }
+
+ this.disposed = true;
+ if (this.connection != null)
+ {
+ this.connection.Dispose();
+ }
+ }
+
+ /// <summary>
+ /// Ensures that object was not disposed
+ /// </summary>
+ private void EnsuresNotDisposed()
+ {
+ if (this.disposed)
{
- conn.Write(multiRequest);
+ throw new ObjectDisposedException(this.GetType().Name);
}
}
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs Tue Oct 18 17:52:13 2011
@@ -40,6 +40,7 @@ namespace Kafka.Client.Producers.Sync
{
private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private readonly IDictionary<int, ISyncProducer> syncProducers;
+ private volatile bool disposed;
/// <summary>
/// Factory method used to instantiating synchronous producer pool
@@ -53,7 +54,7 @@ namespace Kafka.Client.Producers.Sync
/// <returns>
/// Instantiated synchronous producer pool
/// </returns>
- public static SyncProducerPool<TData> CreateSyncPool(ProducerConfig config, IEncoder<TData> serializer)
+ public static SyncProducerPool<TData> CreateSyncPool(ProducerConfiguration config, IEncoder<TData> serializer)
{
return new SyncProducerPool<TData>(config, serializer);
}
@@ -73,7 +74,7 @@ namespace Kafka.Client.Producers.Sync
/// <returns>
/// Instantiated synchronous producer pool
/// </returns>
- public static SyncProducerPool<TData> CreateSyncPool(ProducerConfig config, IEncoder<TData> serializer, ICallbackHandler callbackHandler)
+ public static SyncProducerPool<TData> CreateSyncPool(ProducerConfiguration config, IEncoder<TData> serializer, ICallbackHandler callbackHandler)
{
return new SyncProducerPool<TData>(config, serializer, callbackHandler);
}
@@ -97,7 +98,7 @@ namespace Kafka.Client.Producers.Sync
/// Should be used for testing purpose only
/// </remarks>
private SyncProducerPool(
- ProducerConfig config,
+ ProducerConfiguration config,
IEncoder<TData> serializer,
IDictionary<int, ISyncProducer> syncProducers,
ICallbackHandler cbkHandler)
@@ -122,7 +123,7 @@ namespace Kafka.Client.Producers.Sync
/// Should be used for testing purpose only
/// </remarks>
private SyncProducerPool(
- ProducerConfig config,
+ ProducerConfiguration config,
IEncoder<TData> serializer,
ICallbackHandler cbkHandler)
: this(config, serializer, new Dictionary<int, ISyncProducer>(), cbkHandler)
@@ -138,12 +139,12 @@ namespace Kafka.Client.Producers.Sync
/// <param name="serializer">
/// The serializer.
/// </param>
- private SyncProducerPool(ProducerConfig config, IEncoder<TData> serializer)
+ private SyncProducerPool(ProducerConfiguration config, IEncoder<TData> serializer)
: this(
config,
serializer,
new Dictionary<int, ISyncProducer>(),
- ReflectionHelper.Instantiate<ICallbackHandler>(config.CallbackHandler))
+ ReflectionHelper.Instantiate<ICallbackHandler>(config.CallbackHandlerClass))
{
}
@@ -158,7 +159,8 @@ namespace Kafka.Client.Producers.Sync
/// </remarks>
public override void Send(IEnumerable<ProducerPoolData<TData>> poolData)
{
- Guard.Assert<ArgumentNullException>(() => poolData != null);
+ this.EnsuresNotDisposed();
+ Guard.NotNull(poolData, "poolData");
Dictionary<int, List<ProducerPoolData<TData>>> distinctBrokers = poolData.GroupBy(
x => x.BidPid.BrokerId, x => x)
.ToDictionary(x => x.Key, x => x.ToList());
@@ -188,15 +190,10 @@ namespace Kafka.Client.Producers.Sync
/// <param name="broker">The broker informations.</param>
public override void AddProducer(Broker broker)
{
- Guard.Assert<ArgumentNullException>(() => broker != null);
- var syncConfig = new SyncProducerConfig
- {
- Host = broker.Host,
- Port = broker.Port,
- BufferSize = this.Config.BufferSize,
- ConnectTimeout = this.Config.ConnectTimeout,
- ReconnectInterval = this.Config.ReconnectInterval
- };
+ this.EnsuresNotDisposed();
+ Guard.NotNull(broker, "broker");
+
+ var syncConfig = new SyncProducerConfiguration(this.Config, broker.Id, broker.Host, broker.Port);
var syncProducer = new SyncProducer(syncConfig);
Logger.InfoFormat(
CultureInfo.CurrentCulture,
@@ -206,5 +203,24 @@ namespace Kafka.Client.Producers.Sync
broker.Port);
this.syncProducers.Add(broker.Id, syncProducer);
}
+
+ protected override void Dispose(bool disposing)
+ {
+ if (!disposing)
+ {
+ return;
+ }
+
+ if (this.disposed)
+ {
+ return;
+ }
+
+ this.disposed = true;
+ foreach (var syncProducer in this.syncProducers.Values)
+ {
+ syncProducer.Dispose();
+ }
+ }
}
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs Tue Oct 18 17:52:13 2011
@@ -116,7 +116,7 @@ namespace Kafka.Client.Requests
/// </param>
public void WriteTo(MemoryStream output)
{
- Guard.Assert<ArgumentNullException>(() => output != null);
+ Guard.NotNull(output, "output");
using (var writer = new KafkaBinaryWriter(output))
{
@@ -134,7 +134,7 @@ namespace Kafka.Client.Requests
/// </param>
public void WriteTo(KafkaBinaryWriter writer)
{
- Guard.Assert<ArgumentNullException>(() => writer != null);
+ Guard.NotNull(writer, "writer");
writer.WriteTopic(this.Topic, DefaultEncoding);
writer.Write(this.Partition);
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs Tue Oct 18 17:52:13 2011
@@ -51,7 +51,7 @@ namespace Kafka.Client.Requests
/// <param name="requests">Requests to package up and batch.</param>
public MultiFetchRequest(IList<FetchRequest> requests)
{
- Guard.Assert<ArgumentNullException>(() => requests != null);
+ Guard.NotNull(requests, "requests");
ConsumerRequests = requests;
int length = GetRequestLength(requests, DefaultEncoding);
this.RequestBuffer = new BoundedBuffer(length);
@@ -79,7 +79,7 @@ namespace Kafka.Client.Requests
/// </param>
public void WriteTo(MemoryStream output)
{
- Guard.Assert<ArgumentNullException>(() => output != null);
+ Guard.NotNull(output, "output");
using (var writer = new KafkaBinaryWriter(output))
{
@@ -98,7 +98,7 @@ namespace Kafka.Client.Requests
/// </param>
public void WriteTo(KafkaBinaryWriter writer)
{
- Guard.Assert<ArgumentNullException>(() => writer != null);
+ Guard.NotNull(writer, "writer");
foreach (var consumerRequest in ConsumerRequests)
{
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs Tue Oct 18 17:52:13 2011
@@ -35,7 +35,7 @@ namespace Kafka.Client.Requests
public static int GetBufferLength(IEnumerable<ProducerRequest> requests)
{
- Guard.Assert<ArgumentNullException>(() => requests != null);
+ Guard.NotNull(requests, "requests");
return DefaultRequestSizeSize
+ DefaultRequestIdSize
@@ -51,7 +51,8 @@ namespace Kafka.Client.Requests
/// </param>
public MultiProducerRequest(IEnumerable<ProducerRequest> requests)
{
- Guard.Assert<ArgumentNullException>(() => requests != null);
+ Guard.NotNull(requests, "requests");
+
int length = GetBufferLength(requests);
ProducerRequests = requests;
this.RequestBuffer = new BoundedBuffer(length);
@@ -79,7 +80,7 @@ namespace Kafka.Client.Requests
/// </param>
public void WriteTo(MemoryStream output)
{
- Guard.Assert<ArgumentNullException>(() => output != null);
+ Guard.NotNull(output, "output");
using (var writer = new KafkaBinaryWriter(output))
{
@@ -97,7 +98,7 @@ namespace Kafka.Client.Requests
/// </param>
public void WriteTo(KafkaBinaryWriter writer)
{
- Guard.Assert<ArgumentNullException>(() => writer != null);
+ Guard.NotNull(writer, "writer");
writer.Write((short)this.ProducerRequests.Count());
foreach (var request in ProducerRequests)
@@ -108,33 +109,21 @@ namespace Kafka.Client.Requests
public override string ToString()
{
- using (var reader = new KafkaBinaryReader(this.RequestBuffer))
- {
- return ParseFrom(reader, (int)this.RequestBuffer.Length);
- }
- }
-
- public static string ParseFrom(KafkaBinaryReader reader, int count)
- {
- Guard.Assert<ArgumentNullException>(() => reader != null);
-
var sb = new StringBuilder();
sb.Append("Request size: ");
- sb.Append(reader.ReadInt32());
+ sb.Append(this.RequestBuffer.Capacity - DefaultRequestSizeSize);
sb.Append(", RequestId: ");
- short reqId = reader.ReadInt16();
- sb.Append(reqId);
+ sb.Append(this.RequestTypeId);
sb.Append("(");
- sb.Append((RequestTypes)reqId);
+ sb.Append((RequestTypes)this.RequestTypeId);
sb.Append("), Single Requests: {");
int i = 1;
- while (reader.BaseStream.Position != reader.BaseStream.Length)
+ foreach (var request in ProducerRequests)
{
sb.Append("Request ");
sb.Append(i);
sb.Append(" {");
- int msgSize = 0;
- sb.Append(ProducerRequest.ParseFrom(reader, msgSize));
+ sb.Append(request.ToString());
sb.AppendLine("} ");
i++;
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs Tue Oct 18 17:52:13 2011
@@ -107,7 +107,7 @@ namespace Kafka.Client.Requests
/// </param>
public void WriteTo(System.IO.MemoryStream output)
{
- Guard.Assert<ArgumentNullException>(() => output != null);
+ Guard.NotNull(output, "output");
using (var writer = new KafkaBinaryWriter(output))
{
@@ -125,7 +125,7 @@ namespace Kafka.Client.Requests
/// </param>
public void WriteTo(KafkaBinaryWriter writer)
{
- Guard.Assert<ArgumentNullException>(() => writer != null);
+ Guard.NotNull(writer, "writer");
writer.WriteTopic(this.Topic, DefaultEncoding);
writer.Write(this.Partition);
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs Tue Oct 18 17:52:13 2011
@@ -35,7 +35,6 @@ namespace Kafka.Client.Requests
public const byte DefaultPartitionSize = 4;
public const byte DefaultSetSizeSize = 4;
public const byte DefaultHeaderSize = DefaultRequestSizeSize + DefaultTopicSizeSize + DefaultPartitionSize + DefaultRequestIdSize + DefaultSetSizeSize;
- public const short DefaultTopicLengthIfNonePresent = 2;
public static int GetRequestLength(string topic, int messegesSize, string encoding = DefaultEncoding)
{
@@ -45,7 +44,8 @@ namespace Kafka.Client.Requests
public ProducerRequest(string topic, int partition, BufferedMessageSet messages)
{
- Guard.Assert<ArgumentNullException>(() => messages != null);
+ Guard.NotNull(messages, "messages");
+
int length = GetRequestLength(topic, messages.SetSize);
this.RequestBuffer = new BoundedBuffer(length);
this.Topic = topic;
@@ -91,7 +91,8 @@ namespace Kafka.Client.Requests
/// </param>
public void WriteTo(MemoryStream output)
{
- Guard.Assert<ArgumentNullException>(() => output != null);
+ Guard.NotNull(output, "output");
+
using (var writer = new KafkaBinaryWriter(output))
{
writer.Write(this.RequestBuffer.Capacity - DefaultRequestSizeSize);
@@ -108,7 +109,8 @@ namespace Kafka.Client.Requests
/// </param>
public void WriteTo(KafkaBinaryWriter writer)
{
- Guard.Assert<ArgumentNullException>(() => writer != null);
+ Guard.NotNull(writer, "writer");
+
writer.WriteTopic(this.Topic, DefaultEncoding);
writer.Write(this.Partition);
writer.Write(this.MessageSet.SetSize);
@@ -117,39 +119,22 @@ namespace Kafka.Client.Requests
public override string ToString()
{
- using (var reader = new KafkaBinaryReader(this.RequestBuffer))
- {
- return ParseFrom(reader, this.TotalSize);
- }
- }
-
- public static string ParseFrom(KafkaBinaryReader reader, int count, bool skipReqInfo = false)
- {
- Guard.Assert<ArgumentNullException>(() => reader != null);
var sb = new StringBuilder();
-
- if (!skipReqInfo)
- {
- sb.Append("Request size: ");
- sb.Append(reader.ReadInt32());
- sb.Append(", RequestId: ");
- short reqId = reader.ReadInt16();
- sb.Append(reqId);
- sb.Append("(");
- sb.Append((RequestTypes)reqId);
- sb.Append(")");
- }
-
+ sb.Append("Request size: ");
+ sb.Append(this.TotalSize);
+ sb.Append(", RequestId: ");
+ sb.Append(this.RequestTypeId);
+ sb.Append("(");
+ sb.Append((RequestTypes)this.RequestTypeId);
+ sb.Append(")");
sb.Append(", Topic: ");
- string topic = reader.ReadTopic(DefaultEncoding);
- sb.Append(topic);
+ sb.Append(this.Topic);
sb.Append(", Partition: ");
- sb.Append(reader.ReadInt32());
+ sb.Append(this.Partition);
sb.Append(", Set size: ");
- sb.Append(reader.ReadInt32());
- int size = count - DefaultHeaderSize - GetTopicLength(topic);
+ sb.Append(this.MessageSet.SetSize);
sb.Append(", Set {");
- sb.Append(BufferedMessageSet.ParseFrom(reader, size));
+ sb.Append(this.MessageSet.ToString());
sb.Append("}");
return sb.ToString();
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs Tue Oct 18 17:52:13 2011
@@ -20,6 +20,7 @@ namespace Kafka.Client.Serialization
using System.IO;
using System.Net;
using System.Text;
+ using System.Net.Sockets;
/// <summary>
/// Reads data from underlying stream using big endian bytes order for primitive types
@@ -47,7 +48,10 @@ namespace Kafka.Client.Serialization
/// </param>
protected override void Dispose(bool disposing)
{
- this.BaseStream.Position = 0;
+ if (this.BaseStream.CanSeek)
+ {
+ this.BaseStream.Position = 0;
+ }
}
/// <summary>
@@ -127,5 +131,18 @@ namespace Kafka.Client.Serialization
Encoding encoder = Encoding.GetEncoding(encoding);
return encoder.GetString(bytes);
}
+
+ public bool DataAvailabe
+ {
+ get
+ {
+ if (this.BaseStream is NetworkStream)
+ {
+ return ((NetworkStream)this.BaseStream).DataAvailable;
+ }
+
+ return this.BaseStream.Length != this.BaseStream.Position;
+ }
+ }
}
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs Tue Oct 18 17:52:13 2011
@@ -18,29 +18,53 @@
namespace Kafka.Client.Utils
{
using System;
+ using System.Collections;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Text.RegularExpressions;
internal static class Guard
{
- /// <summary>
- /// Checks whether given expression is true. Throws <see cref="InvalidOperationException" /> if not.
- /// </summary>
- /// <param name="assertion">
- /// The assertion.
- /// </param>
- /// <exception cref="InvalidOperationException">
- /// Thrown when condition is not met.
- /// </exception>
- public static void Assert(Expression<Func<bool>> assertion)
+ public static void NotNull(object parameter, string paramName)
{
- var compiled = assertion.Compile();
- var evaluatedValue = compiled();
- if (!evaluatedValue)
+ if (parameter == null)
+ {
+ throw new ArgumentNullException(paramName);
+ }
+ }
+
+ public static void Count(ICollection parameter, int length, string paramName)
+ {
+ if (parameter.Count != length)
+ {
+ throw new ArgumentOutOfRangeException(paramName, parameter.Count, string.Empty);
+ }
+ }
+
+ public static void Greater(int parameter, int expected, string paramName)
+ {
+ if (parameter <= expected)
+ {
+ throw new ArgumentOutOfRangeException(paramName, parameter, string.Empty);
+ }
+ }
+
+ public static void NotNullNorEmpty(string parameter, string paramName)
+ {
+ if (string.IsNullOrEmpty(parameter))
+ {
+ throw new ArgumentException("Given string is empty", paramName);
+ }
+ }
+
+ public static void AllNotNull(IEnumerable parameter, string paramName)
+ {
+ foreach (var par in parameter)
{
- throw new InvalidOperationException(
- string.Format("'{0}' is not met.", Normalize(assertion.ToString())));
+ if (par == null)
+ {
+ throw new ArgumentNullException(paramName);
+ }
}
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs Tue Oct 18 17:52:13 2011
@@ -25,23 +25,21 @@ namespace Kafka.Client.Utils
public static T Instantiate<T>(string className)
where T : class
{
- Type t1;
object o1;
if (string.IsNullOrEmpty(className))
{
return default(T);
}
- if (className.Contains("`1"))
+ Type t1 = Type.GetType(className, true);
+ if (t1.IsGenericType)
{
- t1 = Type.GetType(className);
var t2 = typeof(T).GetGenericArguments();
var t3 = t1.MakeGenericType(t2);
o1 = Activator.CreateInstance(t3);
return o1 as T;
}
- t1 = Type.GetType(className, true);
o1 = Activator.CreateInstance(t1);
return o1 as T;
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs Tue Oct 18 17:52:13 2011
@@ -28,7 +28,7 @@ namespace Kafka.Client
/// Initializes a new instance of the <see cref="ZooKeeperAwareKafkaClientBase"/> class.
/// </summary>
/// <param name="config">The config.</param>
- protected ZooKeeperAwareKafkaClientBase(ZKConfig config)
+ protected ZooKeeperAwareKafkaClientBase(ZooKeeperConfiguration config)
{
this.IsZooKeeperEnabled = config != null && !string.IsNullOrEmpty(config.ZkConnect);
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs Tue Oct 18 17:52:13 2011
@@ -34,7 +34,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// <remarks>
/// Used for testing purpose
/// </remarks>
- int IdleTime { get; }
+ int? IdleTime { get; }
/// <summary>
/// Connects to ZooKeeper server within given time period and installs watcher in ZooKeeper
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs Tue Oct 18 17:52:13 2011
@@ -77,9 +77,9 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </param>
public void HandleChildChange(ZooKeeperChildChangedEventArgs e)
{
- Guard.Assert<ArgumentNullException>(() => e != null);
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(e.Path));
- Guard.Assert<ArgumentNullException>(() => e.Children != null);
+ Guard.NotNull(e, "e");
+ Guard.NotNullNorEmpty(e.Path, "e.Path");
+ Guard.NotNull(e.Children, "e.Children");
lock (this.syncLock)
{
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs Tue Oct 18 17:52:13 2011
@@ -49,7 +49,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
private readonly object syncLock;
- private readonly ConsumerConfig config;
+ private readonly ConsumerConfiguration config;
private readonly IZooKeeperClient zkClient;
@@ -60,7 +60,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
private readonly ZookeeperConsumerConnector zkConsumerConnector;
internal ZKRebalancerListener(
- ConsumerConfig config,
+ ConsumerConfiguration config,
string consumerIdString,
IDictionary<string, IDictionary<Partition, PartitionTopicInfo>> topicRegistry,
IZooKeeperClient zkClient,
@@ -106,7 +106,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
//// release all partitions, reset state and retry
this.ReleasePartitionOwnership();
this.ResetState();
- Thread.Sleep(config.ZkSyncTimeMs);
+ Thread.Sleep(config.ZooKeeper.ZkSyncTimeMs);
}
}
@@ -124,9 +124,9 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </remarks>
public void HandleChildChange(ZooKeeperChildChangedEventArgs args)
{
- Guard.Assert<ArgumentNullException>(() => args != null);
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(args.Path));
- Guard.Assert<ArgumentNullException>(() => args.Children != null);
+ Guard.NotNull(args, "args");
+ Guard.NotNullNorEmpty(args.Path, "args.Path");
+ Guard.NotNull(args.Children, "args.Children");
SyncedRebalance();
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs Tue Oct 18 17:52:13 2011
@@ -58,7 +58,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </remarks>
public void HandleStateChanged(ZooKeeperStateChangedEventArgs args)
{
- Guard.Assert<ArgumentNullException>(() => args != null);
+ Guard.NotNull(args, "args");
Guard.Assert<ArgumentException>(() => args.State != KeeperState.Unknown);
}
@@ -72,7 +72,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </remarks>
public void HandleSessionCreated(ZooKeeperSessionCreatedEventArgs args)
{
- Guard.Assert<ArgumentNullException>(() => args != null);
+ Guard.NotNull(args, "args");
Logger.InfoFormat(
CultureInfo.CurrentCulture,
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs Tue Oct 18 17:52:13 2011
@@ -209,7 +209,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </returns>
public bool WaitUntilConnected(int connectionTimeout)
{
- Guard.Assert<ArgumentOutOfRangeException>(() => connectionTimeout > 0);
+ Guard.Greater(connectionTimeout, 0, "connectionTimeout");
+
this.EnsuresNotDisposed();
if (this.eventWorker != null && this.eventWorker == Thread.CurrentThread)
{
@@ -250,7 +251,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </returns>
public T RetryUntilConnected<T>(Func<T> callback)
{
- Guard.Assert<ArgumentNullException>(() => callback != null);
+ Guard.NotNull(callback, "callback");
+
this.EnsuresNotDisposed();
if (this.zooKeeperEventWorker != null && this.zooKeeperEventWorker == Thread.CurrentThread)
{
@@ -290,7 +292,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </remarks>
public bool Exists(string path)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
bool hasListeners = this.HasListeners(path);
@@ -311,7 +313,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </returns>
public bool Exists(string path, bool watch)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
return this.RetryUntilConnected(
@@ -332,7 +334,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </remarks>
public IList<string> GetChildren(string path)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
bool hasListeners = this.HasListeners(path);
@@ -353,7 +355,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </returns>
public IList<string> GetChildren(string path, bool watch)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
return this.RetryUntilConnected(
@@ -375,7 +377,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </remarks>
public int CountChildren(string path)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
try
@@ -413,7 +415,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
public T ReadData<T>(string path, Stat stats, bool watch)
where T : class
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
byte[] bytes = this.RetryUntilConnected(
@@ -443,7 +445,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </remarks>
public T ReadData<T>(string path, Stat stats) where T : class
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
bool hasListeners = this.HasListeners(path);
@@ -461,7 +463,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </param>
public void WriteData(string path, object data)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
this.WriteData(path, data, -1);
@@ -484,7 +486,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </remarks>
public void WriteData(string path, object data, int expectedVersion)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
byte[] bytes = this.serializer.Serialize(data);
@@ -507,7 +509,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </returns>
public bool Delete(string path)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
return this.RetryUntilConnected(
@@ -536,7 +538,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </returns>
public bool DeleteRecursive(string path)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
IList<string> children;
@@ -568,7 +570,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </param>
public void MakeSurePersistentPathExists(string path)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
if (!this.Exists(path))
@@ -588,7 +590,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </returns>
public IList<string> GetChildrenParentMayNotExist(string path)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
try
@@ -616,7 +618,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
public T ReadData<T>(string path)
where T : class
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
return this.ReadData<T>(path, false);
@@ -671,8 +673,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </remarks>
public void CreatePersistent(string path, bool createParents)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
-
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
try
{
@@ -710,8 +711,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </remarks>
public void CreatePersistent(string path)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
-
+ Guard.NotNullNorEmpty(path, "path");
+ this.EnsuresNotDisposed();
this.CreatePersistent(path, false);
}
@@ -730,8 +731,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </remarks>
public void CreatePersistent(string path, object data)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
-
+ Guard.NotNullNorEmpty(path, "path");
+ this.EnsuresNotDisposed();
this.Create(path, data, CreateMode.Persistent);
}
@@ -753,8 +754,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </returns>
public string CreatePersistentSequential(string path, object data)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
-
+ Guard.NotNullNorEmpty(path, "path");
+ this.EnsuresNotDisposed();
return this.Create(path, data, CreateMode.PersistentSequential);
}
@@ -796,8 +797,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </remarks>
public void CreateEphemeral(string path)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
-
+ Guard.NotNullNorEmpty(path, "path");
+ this.EnsuresNotDisposed();
this.Create(path, null, CreateMode.Ephemeral);
}
@@ -815,8 +816,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </remarks>
public void CreateEphemeral(string path, object data)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
-
+ Guard.NotNullNorEmpty(path, "path");
+ this.EnsuresNotDisposed();
this.Create(path, data, CreateMode.Ephemeral);
}
@@ -837,8 +838,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </returns>
public string CreateEphemeralSequential(string path, object data)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
-
+ Guard.NotNullNorEmpty(path, "path");
+ this.EnsuresNotDisposed();
return this.Create(path, data, CreateMode.EphemeralSequential);
}
@@ -861,8 +862,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
public T ReadData<T>(string path, bool returnNullIfPathNotExists)
where T : class
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
-
+ Guard.NotNullNorEmpty(path, "path");
+ this.EnsuresNotDisposed();
try
{
return this.ReadData<T>(path, null);
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs Tue Oct 18 17:52:13 2011
@@ -130,7 +130,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </param>
public void Delete(string path)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
this.Client.Delete(path, -1);
@@ -150,7 +150,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </returns>
public bool Exists(string path, bool watch)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
return this.Client.Exists(path, true) != null;
@@ -173,7 +173,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </returns>
public string Create(string path, byte[] data, CreateMode mode)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
return this.Client.Create(path, data, Ids.OPEN_ACL_UNSAFE, mode);
@@ -193,7 +193,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </returns>
public IList<string> GetChildren(string path, bool watch)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
return this.Client.GetChildren(path, watch);
@@ -216,7 +216,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </returns>
public byte[] ReadData(string path, Stat stats, bool watch)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
return this.Client.GetData(path, watch, stats);
@@ -233,7 +233,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </param>
public void WriteData(string path, byte[] data)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
this.WriteData(path, data, -1);
@@ -253,7 +253,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </param>
public void WriteData(string path, byte[] data, int version)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
this.Client.SetData(path, data, version);
@@ -270,7 +270,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </returns>
public long GetCreateTime(string path)
{
- Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
Stat stats = this.Client.Exists(path, false);
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs Tue Oct 18 17:52:13 2011
@@ -48,7 +48,7 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </returns>
public byte[] Serialize(object obj)
{
- Guard.Assert<ArgumentNullException>(() => obj != null);
+ Guard.NotNull(obj, "obj");
return Encoding.UTF8.GetBytes(obj.ToString());
}
@@ -63,8 +63,8 @@ namespace Kafka.Client.ZooKeeperIntegrat
/// </returns>
public object Deserialize(byte[] bytes)
{
- Guard.Assert<ArgumentNullException>(() => bytes != null);
- Guard.Assert<ArgumentException>(() => bytes.Count() > 0);
+ Guard.NotNull(bytes, "bytes");
+ Guard.Greater(bytes.Count(), 0, "bytes");
return bytes == null ? null : Encoding.UTF8.GetString(bytes);
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config Tue Oct 18 17:52:13 2011
@@ -26,7 +26,7 @@
</configSections>
<kafkaClientConfiguration>
<kafkaServer address="192.168.1.39" port="9092"></kafkaServer>
- <consumer numberOfTries="2" groupId="testGroup" timeout="10000" autoOffsetReset="smallest" autoCommit="true" autoCommitIntervalMs="10000" fetchSize="307200" backOffIncrementMs="2000"/>
+ <consumer numberOfTries="2" groupId="testGroup" timeout="10000" autoOffsetReset="smallest" autoCommit="true" autoCommitIntervalMs="1000" fetchSize="307200" backOffIncrementMs="2000"/>
<brokerPartitionInfos>
<add id="0" address="192.168.1.39" port="9092" />
<add id="1" address="192.168.1.39" port="9101" />
@@ -34,4 +34,4 @@
</brokerPartitionInfos>
<zooKeeperServers addressList="192.168.1.39:2181" sessionTimeout="30000" connectionTimeout="3000"></zooKeeperServers>
</kafkaClientConfiguration>
-</configuration>
\ No newline at end of file
+</configuration>