You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/21 21:17:25 UTC

svn commit: r1173797 [1/10] - in /incubator/kafka/trunk/clients/csharp: ./ lib/StyleCop/ src/Kafka/ src/Kafka/Kafka.Client/ src/Kafka/Kafka.Client/Cfg/ src/Kafka/Kafka.Client/Cluster/ src/Kafka/Kafka.Client/Consumers/ src/Kafka/Kafka.Client/Exceptions/...

Author: junrao
Date: Wed Sep 21 19:17:19 2011
New Revision: 1173797

URL: http://svn.apache.org/viewvc?rev=1173797&view=rev
Log:
enhancements to .Net; patched by Eric Hauser; KAFKA-85

Added:
    incubator/kafka/trunk/clients/csharp/lib/StyleCop/
    incubator/kafka/trunk/clients/csharp/lib/StyleCop/Microsoft.StyleCop.Targets
    incubator/kafka/trunk/clients/csharp/lib/StyleCop/Settings.StyleCop
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/AsyncProducerConfig.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/BrokerPartitionInfo.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/BrokerPartitionInfoCollection.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/Consumer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ConsumerConfig.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/IAsyncProducerConfigShared.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ISyncProducerConfigShared.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/KafkaClientConfiguration.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/KafkaServer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ProducerConfig.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/SyncProducerConfig.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ZKConfig.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ZooKeeperServers.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cluster/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cluster/Broker.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cluster/Cluster.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cluster/Partition.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Consumer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIterator.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIteratorState.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetchedDataChunk.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Fetcher.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetcherRunnable.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumerConnector.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/KafkaMessageStream.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/PartitionTopicInfo.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/TopicCount.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ZookeeperConsumerConnector.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ConsumerTimeoutException.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/KafkaException.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/MessageSizeTooLargeException.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZKRebalancerException.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperException.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ZooKeeperTimeoutException.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaClientBase.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/BoundedBuffer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/BufferedMessageSet.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/Message.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/MessageSet.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducerPool.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/IAsyncProducer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/ICallbackHandler.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/MessageSent.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducerPool.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ConfigBrokerPartitionInfo.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/DefaultPartitioner.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/IBrokerPartitionInfo.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/IPartitioner.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerData.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPoolData.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerTypes.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/AbstractRequest.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/RequestTypes.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/DefaultEncoder.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IEncoder.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/IWritable.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryWriter.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/StringEncoder.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/BitWorks.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Crc32Hasher.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ErrorMapping.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Extensions.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/KafkaScheduler.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupDirs.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZKGroupTopicDirs.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ZkUtils.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ChildChangedEventItem.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/DataChangedEventItem.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperChildChangedEventArgs.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperDataChangedEventArgs.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventArgs.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperEventTypes.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperSessionCreatedEventArgs.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperStateChangedEventArgs.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperConnection.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperSerializer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperChildListener.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperDataListener.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperStateListener.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.Watcher.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.FxCop
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/App.config
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Debug/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Debug/App.config
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Log4Net.config
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestsSetup.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Producers/
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Producers/PartitioningTests.cs
Modified:
    incubator/kafka/trunk/clients/csharp/LICENSE
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/RequestContext.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.sln
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Properties/AssemblyInfo.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Properties/AssemblyInfo.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/FetchRequestTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiFetchRequestTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/OffsetRequestTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Util/BitWorksTests.cs

Modified: incubator/kafka/trunk/clients/csharp/LICENSE
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/LICENSE?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/LICENSE (original)
+++ incubator/kafka/trunk/clients/csharp/LICENSE Wed Sep 21 19:17:19 2011
@@ -187,7 +187,7 @@ APPENDIX: How to apply the Apache Licens
    same "printed page" as the copyright notice for easier
    identification within third-party archives.
 
-Copyright [yyyy] [name of copyright owner]
+Copyright 2011 LinkedIn
 
 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
@@ -199,4 +199,4 @@ Unless required by applicable law or agr
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
-limitations under the License.
+limitations under the License.
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/lib/StyleCop/Microsoft.StyleCop.Targets
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/lib/StyleCop/Microsoft.StyleCop.Targets?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/lib/StyleCop/Microsoft.StyleCop.Targets (added)
+++ incubator/kafka/trunk/clients/csharp/lib/StyleCop/Microsoft.StyleCop.Targets Wed Sep 21 19:17:19 2011
@@ -0,0 +1,109 @@
+<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <!-- Specify where tasks are implemented. -->
+  <UsingTask AssemblyFile="Microsoft.StyleCop.dll" TaskName="StyleCopTask"/>
+
+  <PropertyGroup>
+    <BuildDependsOn>$(BuildDependsOn);StyleCop</BuildDependsOn>
+    <RebuildDependsOn>StyleCopForceFullAnalysis;$(RebuildDependsOn)</RebuildDependsOn>
+  </PropertyGroup>
+
+  <!-- Define StyleCopForceFullAnalysis property. -->
+  <PropertyGroup Condition="('$(SourceAnalysisForceFullAnalysis)' != '') and ('$(StyleCopForceFullAnalysis)' == '')">
+    <StyleCopForceFullAnalysis>$(SourceAnalysisForceFullAnalysis)</StyleCopForceFullAnalysis>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(StyleCopForceFullAnalysis)' == ''">
+    <StyleCopForceFullAnalysis>false</StyleCopForceFullAnalysis>
+  </PropertyGroup>
+
+  <!-- Define StyleCopCacheResults property. -->
+  <PropertyGroup Condition="('$(SourceAnalysisCacheResults)' != '') and ('$(StyleCopCacheResults)' == '')">
+    <StyleCopCacheResults>$(SourceAnalysisCacheResults)</StyleCopCacheResults>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(StyleCopCacheResults)' == ''">
+    <StyleCopCacheResults>true</StyleCopCacheResults>
+  </PropertyGroup>
+
+  <!-- Define StyleCopTreatErrorsAsWarnings property. -->
+  <PropertyGroup Condition="('$(SourceAnalysisTreatErrorsAsWarnings)' != '') and ('$(StyleCopTreatErrorsAsWarnings)' == '')">
+    <StyleCopTreatErrorsAsWarnings>$(SourceAnalysisTreatErrorsAsWarnings)</StyleCopTreatErrorsAsWarnings>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(StyleCopTreatErrorsAsWarnings)' == ''">
+    <StyleCopTreatErrorsAsWarnings>true</StyleCopTreatErrorsAsWarnings>
+  </PropertyGroup>
+
+  <!-- Define StyleCopEnabled property. -->
+  <PropertyGroup Condition="('$(SourceAnalysisEnabled)' != '') and ('$(StyleCopEnabled)' == '')">
+    <StyleCopEnabled>$(SourceAnalysisEnabled)</StyleCopEnabled>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(StyleCopEnabled)' == ''">
+    <StyleCopEnabled>true</StyleCopEnabled>
+  </PropertyGroup>
+
+  <!-- Define StyleCopOverrideSettingsFile property. -->
+  <PropertyGroup Condition="('$(SourceAnalysisOverrideSettingsFile)' != '') and ('$(StyleCopOverrideSettingsFile)' == '')">
+    <StyleCopOverrideSettingsFile>$(SourceAnalysisOverrideSettingsFile)</StyleCopOverrideSettingsFile>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(StyleCopOverrideSettingsFile)' == ''">
+    <StyleCopOverrideSettingsFile> </StyleCopOverrideSettingsFile>
+  </PropertyGroup>
+
+  <!-- Define StyleCopOutputFile property. -->
+  <PropertyGroup Condition="('$(SourceAnalysisOutputFile)' != '') and ('$(StyleCopOutputFile)' == '')">
+    <StyleCopOutputFile>$(SourceAnalysisOutputFile)</StyleCopOutputFile>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(StyleCopOutputFile)' == ''">
+    <StyleCopOutputFile>$(IntermediateOutputPath)StyleCopViolations.xml</StyleCopOutputFile>
+  </PropertyGroup>
+
+  <!-- Define all new properties which do not need to have both StyleCop and SourceAnalysis variations. -->
+  <PropertyGroup>
+    <!-- Specifying 0 will cause StyleCop to use the default violation count limit.
+         Specifying any positive number will cause StyleCop to use that number as the violation count limit.
+         Specifying any negative number will cause StyleCop to allow any number of violations without limit. -->
+    <StyleCopMaxViolationCount Condition="'$(StyleCopMaxViolationCount)' == ''">0</StyleCopMaxViolationCount>
+  </PropertyGroup>
+
+  <!-- Define target: StyleCopForceFullAnalysis -->
+  <Target Name="StyleCopForceFullAnalysis">
+    <CreateProperty Value="true">
+      <Output TaskParameter="Value" PropertyName="StyleCopForceFullAnalysis" />
+    </CreateProperty>
+  </Target>
+
+  <!-- Define target: StyleCop -->
+  <Target Name="StyleCop" Condition="'$(StyleCopEnabled)' != 'false'">
+    <!-- Determine what files should be checked. Take all Compile items, but exclude those that have
+        set ExcludeFromStyleCop=true or ExcludeFromSourceAnalysis=true. -->
+    <CreateItem Include="@(Compile)" Condition="('%(Compile.ExcludeFromStyleCop)' != 'true') and ('%(Compile.ExcludeFromSourceAnalysis)' != 'true')">
+      <Output TaskParameter="Include" ItemName="StyleCopFiles"/>
+    </CreateItem>
+
+    <Message Text="Forcing full StyleCop reanalysis." Condition="'$(StyleCopForceFullAnalysis)' == 'true'" Importance="Low" />
+
+    <Message Text="Analyzing @(StyleCopFiles)" Importance="Low" />
+
+    <!-- Run the StyleCop MSBuild task. -->
+    <StyleCopTask
+      ProjectFullPath="$(MSBuildProjectFile)"
+      SourceFiles="@(StyleCopFiles)"
+      AdditionalAddinPaths="@(StyleCopAdditionalAddinPaths)"
+      ForceFullAnalysis="$(StyleCopForceFullAnalysis)"
+      DefineConstants="$(DefineConstants)"
+      TreatErrorsAsWarnings="$(StyleCopTreatErrorsAsWarnings)"
+      CacheResults="$(StyleCopCacheResults)"
+      OverrideSettingsFile="$(StyleCopOverrideSettingsFile)"
+      OutputFile="$(StyleCopOutputFile)"
+      MaxViolationCount="$(StyleCopMaxViolationCount)"
+            />
+
+    <!-- Make output files cleanable -->
+    <CreateItem Include="$(StyleCopOutputFile)">
+      <Output TaskParameter="Include" ItemName="FileWrites"/>
+    </CreateItem>
+
+    <!-- Add the StyleCop.cache file to the list of files we've written - so they can be cleaned up on a Build Clean. -->
+    <CreateItem Include="StyleCop.Cache" Condition="'$(StyleCopCacheResults)' == 'true'">
+      <Output TaskParameter="Include" ItemName="FileWrites"/>
+    </CreateItem>
+  </Target>
+</Project>

Added: incubator/kafka/trunk/clients/csharp/lib/StyleCop/Settings.StyleCop
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/lib/StyleCop/Settings.StyleCop?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/lib/StyleCop/Settings.StyleCop (added)
+++ incubator/kafka/trunk/clients/csharp/lib/StyleCop/Settings.StyleCop Wed Sep 21 19:17:19 2011
@@ -0,0 +1,32 @@
+<StyleCopSettings Version="4.3">
+  <Parsers>
+    <Parser ParserId="Microsoft.StyleCop.CSharp.CsParser">
+      <ParserSettings>
+        <CollectionProperty Name="GeneratedFileFilters">
+          <Value>\.g\.cs$</Value>
+          <Value>\.generated\.cs$</Value>
+          <Value>\.g\.i\.cs$</Value>
+        </CollectionProperty>
+      </ParserSettings>
+    </Parser>
+  </Parsers>
+  <Analyzers>
+    <Analyzer AnalyzerId="Microsoft.StyleCop.CSharp.NamingRules">
+      <AnalyzerSettings>
+        <CollectionProperty Name="Hungarian">
+          <Value>as</Value>
+          <Value>do</Value>
+          <Value>id</Value>
+          <Value>if</Value>
+          <Value>in</Value>
+          <Value>is</Value>
+          <Value>my</Value>
+          <Value>no</Value>
+          <Value>on</Value>
+          <Value>to</Value>
+          <Value>ui</Value>
+        </CollectionProperty>
+      </AnalyzerSettings>
+    </Analyzer>
+  </Analyzers>
+</StyleCopSettings>
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/AsyncProducerConfig.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/AsyncProducerConfig.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/AsyncProducerConfig.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/AsyncProducerConfig.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Cfg
+{
+    using System;
+    using System.Collections.Generic;
+    using Kafka.Client.Serialization;
+    using Kafka.Client.Utils;
+
+    /// <summary>
+    /// Configuration used by the asynchronous producer
+    /// </summary>
+    public class AsyncProducerConfig : SyncProducerConfig, IAsyncProducerConfigShared
+    {
+        public const int DefaultQueueTime = 5000;
+
+        public const int DefaultQueueSize = 10000;
+
+        public const int DefaultBatchSize = 200;
+
+        public static readonly string DefaultSerializerClass = typeof(DefaultEncoder).FullName; 
+
+        public AsyncProducerConfig()
+        {
+            this.QueueTime = DefaultQueueTime;
+            this.QueueSize = DefaultQueueSize;
+            this.BatchSize = DefaultBatchSize;
+            this.SerializerClass = DefaultSerializerClass;  
+        }
+
+        public AsyncProducerConfig(KafkaClientConfiguration kafkaClientConfiguration) 
+            : this()
+        {
+            Guard.Assert<ArgumentNullException>(() => kafkaClientConfiguration != null);
+            Guard.Assert<ArgumentNullException>(() => kafkaClientConfiguration.KafkaServer != null);
+            Guard.Assert<ArgumentNullException>(() => kafkaClientConfiguration.KafkaServer.Address != null);
+            Guard.Assert<ArgumentOutOfRangeException>(() => kafkaClientConfiguration.KafkaServer.Port > 0);
+
+            this.Host = kafkaClientConfiguration.KafkaServer.Address;
+            this.Port = kafkaClientConfiguration.KafkaServer.Port;
+        }
+
+        public int QueueTime { get; set; }
+
+        public int QueueSize { get; set; }
+
+        public int BatchSize { get; set; }
+
+        public string SerializerClass { get; set; }
+
+        public string CallbackHandler { get; set; }
+
+        public string EventHandler { get; set; }
+
+        public IDictionary<string, string> CallbackHandlerProps { get; set; }
+
+        public IDictionary<string, string> EventHandlerProps { get; set; }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/BrokerPartitionInfo.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/BrokerPartitionInfo.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/BrokerPartitionInfo.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/BrokerPartitionInfo.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Cfg
+{
+    using System.Configuration;
+    using System.Globalization;
+
+    public class BrokerPartitionInfo : ConfigurationElement
+    {
+        [ConfigurationProperty("id")]
+        public int Id
+        {
+            get
+            {
+                return (int)this["id"];
+            }
+
+            set
+            {
+                this["id"] = value;
+            }
+        }
+
+        [ConfigurationProperty("address")]
+        public string Address
+        {
+            get
+            {
+                return (string)this["address"];
+            }
+
+            set
+            {
+                this["address"] = value;
+            }
+        }
+
+        [ConfigurationProperty("port")]
+        public int Port
+        {
+            get
+            {
+                return (int)this["port"];
+            }
+
+            set
+            {
+                this["port"] = value;
+            }
+        }
+
+        public string GetBrokerPartitionInfoAsString()
+        {
+            return string.Format(CultureInfo.InvariantCulture, "{0}:{1}:{2}", Id, Address, Port);
+        }
+    }
+}
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/BrokerPartitionInfoCollection.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/BrokerPartitionInfoCollection.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/BrokerPartitionInfoCollection.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/BrokerPartitionInfoCollection.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Cfg
+{
+    using System.Configuration;
+
+    public class BrokerPartitionInfoCollection : ConfigurationElementCollection
+    {
+        public BrokerPartitionInfo this[int index]
+        {
+            get
+            {
+                return this.BaseGet(index) as BrokerPartitionInfo;
+            }
+
+            set
+            {
+                if (this.BaseGet(index) != null)
+                {
+                    this.BaseRemoveAt(index);
+                }
+
+                this.BaseAdd(index, value);
+            }
+        }
+    
+        protected override ConfigurationElement CreateNewElement()
+        {
+            return new BrokerPartitionInfo();
+        }
+
+        protected override object GetElementKey(ConfigurationElement element)
+        {
+            return ((BrokerPartitionInfo)element).Id;
+        }
+    }
+}
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/Consumer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/Consumer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/Consumer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/Consumer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Cfg
+{
+    using System.Configuration;
+
+    public class Consumer : ConfigurationElement
+    {
+        [ConfigurationProperty("numberOfTries")]
+        public short NumberOfTries
+        {
+            get
+            {
+                return (short)this["numberOfTries"];
+            }
+
+            set
+            {
+                this["numberOfTries"] = value;
+            }
+        }
+
+        [ConfigurationProperty("groupId")]
+        public string GroupId
+        {
+            get
+            {
+                return (string)this["groupId"];
+            }
+
+            set
+            {
+                this["groupId"] = value;
+            }
+        }
+
+        [ConfigurationProperty("timeout")]
+        public int Timeout
+        {
+            get
+            {
+                return (int)this["timeout"];
+            }
+
+            set
+            {
+                this["timeout"] = value;
+            }
+        }
+
+        [ConfigurationProperty("autoOffsetReset")]
+        public string AutoOffsetReset
+        {
+            get
+            {
+                return (string)this["autoOffsetReset"];
+            }
+
+            set
+            {
+                this["autoOffsetReset"] = value;
+            }
+        }
+
+        [ConfigurationProperty("autoCommit")]
+        public bool AutoCommit
+        {
+            get
+            {
+                return (bool)this["autoCommit"];
+            }
+
+            set
+            {
+                this["autoCommit"] = value;
+            }
+        }
+
+        [ConfigurationProperty("autoCommitIntervalMs")]
+        public int AutoCommitIntervalMs
+        {
+            get
+            {
+                return (int)this["autoCommitIntervalMs"];
+            }
+
+            set
+            {
+                this["autoCommitIntervalMs"] = value;
+            }
+        }
+
+        [ConfigurationProperty("fetchSize")]
+        public int FetchSize
+        {
+            get
+            {
+                return (int)this["fetchSize"];
+            }
+
+            set
+            {
+                this["fetchSize"] = value;
+            }
+        }
+
+        [ConfigurationProperty("backOffIncrementMs")]
+        public int BackOffIncrementMs
+        {
+            get
+            {
+                return (int)this["backOffIncrementMs"];
+            }
+
+            set
+            {
+                this["backOffIncrementMs"] = value;
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ConsumerConfig.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ConsumerConfig.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ConsumerConfig.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ConsumerConfig.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Cfg
+{
+    /// <summary>
+    /// Configuration used by the consumer
+    /// </summary>
+    public class ConsumerConfig : ZKConfig
+    {
+        public const short DefaultNumberOfTries = 2;
+
+        public short NumberOfTries { get; set; }
+
+        public string Host { get; set; }
+
+        public int Port { get; set; }
+
+        public string GroupId { get; set; }
+
+        public int Timeout { get; set; }
+
+        public string AutoOffsetReset { get; set; }
+
+        public bool AutoCommit { get; set; }
+
+        public int AutoCommitIntervalMs { get; set; }
+
+        public int FetchSize { get; set; }
+
+        public int BackOffIncrementMs { get; set; }
+
+        public ConsumerConfig()
+        {
+            this.NumberOfTries = DefaultNumberOfTries;
+        }
+
+        public ConsumerConfig(KafkaClientConfiguration kafkaClientConfiguration) : this()
+        {
+            this.Host = kafkaClientConfiguration.KafkaServer.Address;
+            this.Port = kafkaClientConfiguration.KafkaServer.Port;
+            this.NumberOfTries = kafkaClientConfiguration.Consumer.NumberOfTries;
+            this.GroupId = kafkaClientConfiguration.Consumer.GroupId;
+            this.Timeout = kafkaClientConfiguration.Consumer.Timeout;
+            this.AutoOffsetReset = kafkaClientConfiguration.Consumer.AutoOffsetReset;
+            this.AutoCommit = kafkaClientConfiguration.Consumer.AutoCommit;
+            this.AutoCommitIntervalMs = kafkaClientConfiguration.Consumer.AutoCommitIntervalMs;
+            this.FetchSize = kafkaClientConfiguration.Consumer.FetchSize;
+            this.BackOffIncrementMs = kafkaClientConfiguration.Consumer.BackOffIncrementMs;
+            if (kafkaClientConfiguration.IsZooKeeperEnabled)
+            {
+                this.ZkConnect = kafkaClientConfiguration.ZooKeeperServers.AddressList;
+                this.ZkSessionTimeoutMs = kafkaClientConfiguration.ZooKeeperServers.SessionTimeout;
+                this.ZkConnectionTimeoutMs = kafkaClientConfiguration.ZooKeeperServers.ConnectionTimeout;
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/IAsyncProducerConfigShared.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/IAsyncProducerConfigShared.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/IAsyncProducerConfigShared.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/IAsyncProducerConfigShared.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Cfg
+{
+    using System.Collections.Generic;
+
+    internal interface IAsyncProducerConfigShared
+    {
+        int QueueTime { get; set; }
+
+        int QueueSize { get; set; }
+
+        int BatchSize { get; set; }
+
+        string SerializerClass { get; set; }
+
+        string CallbackHandler { get; set; }
+
+        string EventHandler { get; set; }
+
+        IDictionary<string, string> CallbackHandlerProps { get; set; }
+
+        IDictionary<string, string> EventHandlerProps { get; set; }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ISyncProducerConfigShared.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ISyncProducerConfigShared.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ISyncProducerConfigShared.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ISyncProducerConfigShared.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Cfg
+{
+    internal interface ISyncProducerConfigShared
+    {
+        int BufferSize { get; set; }
+
+        int ConnectTimeout { get; set; }
+
+        int SocketTimeout { get; set; }
+
+        int ReconnectInterval { get; set; }
+
+        int MaxMessageSize { get; set; }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/KafkaClientConfiguration.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/KafkaClientConfiguration.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/KafkaClientConfiguration.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/KafkaClientConfiguration.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Cfg
+{
+    using System.Configuration;
+    using System.Text;
+
+    /// <summary>
+    /// Implementation of the custom configuration section for the kafka client
+    /// </summary>
+    public class KafkaClientConfiguration : ConfigurationSection
+    {
+        private static KafkaClientConfiguration config = ConfigurationManager.GetSection("kafkaClientConfiguration") as KafkaClientConfiguration;
+        private bool enabled = true;
+
+        public static KafkaClientConfiguration GetConfiguration()
+        {
+            config.enabled = !string.IsNullOrEmpty(config.ZooKeeperServers.AddressList);
+            return config;
+        }
+
+        [ConfigurationProperty("kafkaServer")]
+        public KafkaServer KafkaServer
+        {
+            get { return (KafkaServer)this["kafkaServer"]; }
+            set { this["kafkaServer"] = value; }
+        }
+
+        [ConfigurationProperty("consumer")]
+        public Consumer Consumer
+        {
+            get { return (Consumer)this["consumer"]; }
+            set { this["consumer"] = value; }
+        }
+
+        [ConfigurationProperty("brokerPartitionInfos")]
+        public BrokerPartitionInfoCollection BrokerPartitionInfos
+        {
+            get
+            {
+                return (BrokerPartitionInfoCollection)this["brokerPartitionInfos"] ??
+                       new BrokerPartitionInfoCollection();
+            }
+        }
+
+        [ConfigurationProperty("zooKeeperServers")]
+        public ZooKeeperServers ZooKeeperServers
+        {
+            get { return (ZooKeeperServers)this["zooKeeperServers"]; }
+            set { this["zooKeeperServers"] = value; }
+        }
+
+        public string GetBrokerPartitionInfosAsString()
+        {
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < BrokerPartitionInfos.Count; i++)
+            {
+                sb.Append(BrokerPartitionInfos[i].GetBrokerPartitionInfoAsString());
+                if ((i + 1) < BrokerPartitionInfos.Count)
+                {
+                    sb.Append(",");
+                }
+            }
+
+            return sb.ToString();
+        }
+
+        internal void SupressZooKeeper()
+        {
+            this.enabled = false;
+        }
+
+        public bool IsZooKeeperEnabled
+        {
+            get { return this.enabled; }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/KafkaServer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/KafkaServer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/KafkaServer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/KafkaServer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Cfg
+{
+    using System.Configuration;
+
+    public class KafkaServer : ConfigurationElement
+    {
+        [ConfigurationProperty("address")]
+        public string Address
+        {
+            get
+            {
+                return (string)this["address"];
+            }
+
+            set
+            {
+                this["address"] = value;
+            }
+        }
+
+        [ConfigurationProperty("port")]
+        public int Port
+        {
+            get
+            {
+                return (int)this["port"];
+            }
+
+            set
+            {
+                this["port"] = value;
+            }
+        }
+    }
+}
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ProducerConfig.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ProducerConfig.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ProducerConfig.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ProducerConfig.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Cfg
+{
+    using System;
+    using System.Collections.Generic;
+    using Kafka.Client.Producers;
+    using Kafka.Client.Producers.Partitioning;
+    using Kafka.Client.Utils;
+
+    /// <summary>
+    /// High-level API configuration for the producer
+    /// </summary>
+    public class ProducerConfig : ZKConfig, ISyncProducerConfigShared, IAsyncProducerConfigShared
+    {
+        public const ProducerTypes DefaultProducerType = ProducerTypes.Sync;
+        public static readonly string DefaultPartitioner = typeof(DefaultPartitioner<>).FullName;
+
+        public ProducerConfig()
+        {
+            this.ProducerType = DefaultProducerType;
+            this.BufferSize = SyncProducerConfig.DefaultBufferSize;
+            this.ConnectTimeout = SyncProducerConfig.DefaultConnectTimeout;
+            this.SocketTimeout = SyncProducerConfig.DefaultSocketTimeout;
+            this.ReconnectInterval = SyncProducerConfig.DefaultReconnectInterval;
+            this.MaxMessageSize = SyncProducerConfig.DefaultMaxMessageSize;
+            this.QueueTime = AsyncProducerConfig.DefaultQueueTime;
+            this.QueueSize = AsyncProducerConfig.DefaultQueueSize;
+            this.BatchSize = AsyncProducerConfig.DefaultBatchSize;
+            this.SerializerClass = AsyncProducerConfig.DefaultSerializerClass; 
+        }
+
+        public ProducerConfig(KafkaClientConfiguration kafkaClientConfiguration) 
+            : this()
+        {
+            Guard.Assert<ArgumentNullException>(() => kafkaClientConfiguration != null);
+            if (kafkaClientConfiguration.IsZooKeeperEnabled)
+            {
+                this.ZkConnect = kafkaClientConfiguration.ZooKeeperServers.AddressList;
+                this.ZkSessionTimeoutMs = kafkaClientConfiguration.ZooKeeperServers.SessionTimeout;
+                this.ZkConnectionTimeoutMs = kafkaClientConfiguration.ZooKeeperServers.ConnectionTimeout;
+            }
+
+            this.BrokerPartitionInfo = kafkaClientConfiguration.GetBrokerPartitionInfosAsString();
+        }
+
+        public string BrokerPartitionInfo { get; set; }
+
+        public string PartitionerClass { get; set; }
+
+        public ProducerTypes ProducerType { get; set; }
+
+        public int BufferSize { get; set; }
+
+        public int ConnectTimeout { get; set; }
+
+        public int SocketTimeout { get; set; }
+
+        public int ReconnectInterval { get; set; }
+
+        public int MaxMessageSize { get; set; }
+
+        public int QueueTime { get; set; }
+
+        public int QueueSize { get; set; }
+
+        public int BatchSize { get; set; }
+
+        public string SerializerClass { get; set; }
+
+        public string CallbackHandler { get; set; }
+
+        public string EventHandler { get; set; }
+
+        public IDictionary<string, string> CallbackHandlerProps { get; set; }
+
+        public IDictionary<string, string> EventHandlerProps { get; set; }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/SyncProducerConfig.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/SyncProducerConfig.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/SyncProducerConfig.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/SyncProducerConfig.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Cfg
+{
+    using System;
+    using Kafka.Client.Utils;
+
+    public class SyncProducerConfig : ISyncProducerConfigShared
+    {
+        public const int DefaultBufferSize = 102400;
+
+        public const int DefaultConnectTimeout = 5000;
+
+        public const int DefaultSocketTimeout = 30000;
+
+        public const int DefaultReconnectInterval = 30000;
+
+        public const int DefaultMaxMessageSize = 1000000;
+
+        public SyncProducerConfig()
+        {
+            this.BufferSize = DefaultBufferSize;
+            this.ConnectTimeout = DefaultConnectTimeout;
+            this.SocketTimeout = DefaultSocketTimeout;
+            this.ReconnectInterval = DefaultReconnectInterval;
+            this.MaxMessageSize = DefaultMaxMessageSize;
+        }
+
+        public SyncProducerConfig(KafkaClientConfiguration kafkaClientConfiguration) : this()
+        {
+            Guard.Assert<ArgumentNullException>(() => kafkaClientConfiguration != null);
+
+            this.Host = kafkaClientConfiguration.KafkaServer.Address;
+            this.Port = kafkaClientConfiguration.KafkaServer.Port;
+        }
+
+        public int BufferSize { get; set; }
+
+        public int ConnectTimeout { get; set; }
+
+        public int SocketTimeout { get; set; }
+
+        public int ReconnectInterval { get; set; }
+
+        public int MaxMessageSize { get; set; }
+
+        public string Host { get; set; }
+
+        public int Port { get; set; }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ZKConfig.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ZKConfig.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ZKConfig.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ZKConfig.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Cfg
+{
+    public class ZKConfig
+    {
+        public ZKConfig()
+            : this(null, 6000, 6000, 2000)
+        {
+        }
+
+        public ZKConfig(string zkconnect, int zksessionTimeoutMs, int zkconnectionTimeoutMs, int zksyncTimeMs)
+        {
+            this.ZkConnect = zkconnect;
+            this.ZkConnectionTimeoutMs = zkconnectionTimeoutMs;
+            this.ZkSessionTimeoutMs = zksessionTimeoutMs;
+            this.ZkSyncTimeMs = zksyncTimeMs;
+        }
+
+        public string ZkConnect { get; set; }
+
+        public int ZkSessionTimeoutMs { get; set; }
+
+        public int ZkConnectionTimeoutMs { get; set; }
+
+        public int ZkSyncTimeMs { get; set; }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ZooKeeperServers.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ZooKeeperServers.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ZooKeeperServers.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ZooKeeperServers.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Cfg
+{
+    using System.Configuration;
+
+    public class ZooKeeperServers : ConfigurationElement
+    {
+        [ConfigurationProperty("addressList")]
+        public string AddressList
+        {
+            get { return (string)this["addressList"]; }
+            set { this["addressList"] = value; }
+        }
+
+        [ConfigurationProperty("sessionTimeout")]
+        public int SessionTimeout
+        {
+            get { return (int)this["sessionTimeout"]; }
+            set { this["sessionTimeout"] = value; }
+        }
+
+        [ConfigurationProperty("connectionTimeout")]
+        public int ConnectionTimeout
+        {
+            get { return (int)this["connectionTimeout"]; }
+            set { this["connectionTimeout"] = value; }
+        }
+    }
+}
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cluster/Broker.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cluster/Broker.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cluster/Broker.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cluster/Broker.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Cluster
+{
+    /// <summary>
+    /// Represents Kafka broker
+    /// </summary>
+    internal class Broker
+    {
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Broker"/> class.
+        /// </summary>
+        /// <param name="id">
+        /// The broker id.
+        /// </param>
+        /// <param name="creatorId">
+        /// The broker creator id.
+        /// </param>
+        /// <param name="host">
+        /// The broker host.
+        /// </param>
+        /// <param name="port">
+        /// The broker port.
+        /// </param>
+        public Broker(int id, string creatorId, string host, int port)
+        {
+            this.Id = id;
+            this.CreatorId = creatorId;
+            this.Host = host;
+            this.Port = port;
+        }
+
+        /// <summary>
+        /// Gets the broker Id.
+        /// </summary>
+        public int Id { get; private set; }
+
+        /// <summary>
+        /// Gets the broker creatorId.
+        /// </summary>
+        public string CreatorId { get; private set; }
+
+        /// <summary>
+        /// Gets the broker host.
+        /// </summary>
+        public string Host { get; private set; }
+
+        /// <summary>
+        /// Gets the broker port.
+        /// </summary>
+        public int Port { get; private set; }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cluster/Cluster.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cluster/Cluster.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cluster/Cluster.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cluster/Cluster.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+using System;
+using System.Globalization;
+
+namespace Kafka.Client.Cluster
+{
+    using System.Collections.Generic;
+using Kafka.Client.ZooKeeperIntegration;
+
+    /// <summary>
+    /// The set of active brokers in the cluster
+    /// </summary>
+    internal class Cluster
+    {
+        private readonly Dictionary<int, Broker> brokers = new Dictionary<int, Broker>();
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Cluster"/> class.
+        /// </summary>
+        public Cluster()
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Cluster"/> class.
+        /// </summary>
+        /// <param name="zkClient">IZooKeeperClient object</param>
+        public Cluster(IZooKeeperClient zkClient)
+        {
+            var nodes = zkClient.GetChildrenParentMayNotExist(ZooKeeperClient.DefaultBrokerIdsPath);
+            foreach (var node in nodes)
+            {
+                var brokerZkString = zkClient.ReadData<string>(ZooKeeperClient.DefaultBrokerIdsPath + "/" + node);
+                Broker broker = this.CreateBroker(node, brokerZkString);
+                if (brokers.ContainsKey(broker.Id))
+                {
+                    brokers[broker.Id] = broker;
+                }
+                else
+                {
+                    brokers.Add(broker.Id, broker);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Cluster"/> class.
+        /// </summary>
+        /// <param name="brokers">
+        /// The set of active brokers.
+        /// </param>
+        public Cluster(IEnumerable<Broker> brokers)
+        {
+            foreach (var broker in brokers)
+            {
+                this.brokers.Add(broker.Id, broker);
+            }
+        }
+
+        /// <summary>
+        /// Gets broker with given ID
+        /// </summary>
+        /// <param name="id">
+        /// The broker ID.
+        /// </param>
+        /// <returns>
+        /// The broker with given ID
+        /// </returns>
+        public Broker GetBroker(int id)
+        {
+            if (this.brokers.ContainsKey(id))
+            {
+                return this.brokers[id];
+            }
+
+            return null;
+        }
+
+        /// <summary>
+        /// Creates a new Broker object out of a BrokerInfoString
+        /// </summary>
+        /// <param name="node">node string</param>
+        /// <param name="brokerInfoString">the BrokerInfoString</param>
+        /// <returns>Broker object</returns>
+        private Broker CreateBroker(string node, string brokerInfoString)
+        {
+            int id;
+            if (int.TryParse(node, NumberStyles.Integer, CultureInfo.InvariantCulture, out id))
+            {
+                var brokerInfo = brokerInfoString.Split(':');
+                if (brokerInfo.Length > 2)
+                {
+                    int port;
+                    if (int.TryParse(brokerInfo[2], NumberStyles.Integer, CultureInfo.InvariantCulture, out port))
+                    {
+                        return new Broker(id, brokerInfo[0], brokerInfo[1], int.Parse(brokerInfo[2], CultureInfo.InvariantCulture));
+                    }
+                    else
+                    {
+                        throw new ArgumentException(String.Format(CultureInfo.CurrentCulture, "{0} is not a valid integer", brokerInfo[2]));
+                    }
+                }
+                else
+                {
+                    throw new ArgumentException(String.Format(CultureInfo.CurrentCulture, "{0} is not a valid BrokerInfoString", brokerInfoString));
+                }
+            }
+            else
+            {
+                throw new ArgumentException(String.Format(CultureInfo.CurrentCulture, "{0} is not a valid integer", node));
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cluster/Partition.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cluster/Partition.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cluster/Partition.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cluster/Partition.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Cluster
+{
+    using System;
+    using System.Globalization;
+
+    /// <summary>
+    /// Represents broker partition
+    /// </summary>
+    internal class Partition : IComparable<Partition>
+    {
+        /// <summary>
+        /// Factory method that instantiates <see cref="Partition"/>  object based on configuration given as string
+        /// </summary>
+        /// <param name="partition">
+        /// The partition info.
+        /// </param>
+        /// <returns>
+        /// Instantiated <see cref="Partition"/>  object
+        /// </returns>
+        public static Partition ParseFrom(string partition)
+        {
+            var pieces = partition.Split('-');
+            if (pieces.Length != 2)
+            {
+                throw new ArgumentException("Expected name in the form x-y");
+            }
+
+            return new Partition(int.Parse(pieces[0], CultureInfo.InvariantCulture), int.Parse(pieces[1], CultureInfo.InvariantCulture));
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Partition"/> class.
+        /// </summary>
+        /// <param name="brokerId">
+        /// The broker ID.
+        /// </param>
+        /// <param name="partId">
+        /// The partition ID.
+        /// </param>
+        public Partition(int brokerId, int partId)
+        {
+            this.BrokerId = brokerId;
+            this.PartId = partId;
+        }
+
+        /// <summary>
+        /// Gets the broker Dd.
+        /// </summary>
+        public int BrokerId { get; private set; }
+
+        /// <summary>
+        /// Gets the partition ID.
+        /// </summary>
+        public int PartId { get; private set; }
+
+        /// <summary>
+        /// Gets broker name as concatanate broker ID and partition ID
+        /// </summary>
+        public string Name
+        {
+            get { return this.BrokerId + "-" + this.PartId; }
+        }
+
+        /// <summary>
+        /// Compares current object with another object of type <see cref="Partition"/>
+        /// </summary>
+        /// <param name="other">
+        /// The other object.
+        /// </param>
+        /// <returns>
+        /// 0 if equals, positive number if greater and negative otherwise
+        /// </returns>
+        public int CompareTo(Partition other)
+        {
+            if (this.BrokerId == other.BrokerId)
+            {
+                return this.PartId - other.PartId;
+            }
+
+            return this.BrokerId - other.BrokerId;
+        }
+
+        /// <summary>
+        /// Gets string representation of current object
+        /// </summary>
+        /// <returns>
+        /// String that represents current object
+        /// </returns>
+        public override string ToString()
+        {
+            return "(" + this.BrokerId + "," + this.PartId + ")";
+        }
+
+        /// <summary>
+        /// Determines whether a given object is equal to the current object
+        /// </summary>
+        /// <param name="obj">
+        /// The other object.
+        /// </param>
+        /// <returns>
+        /// Equality of given and current objects
+        /// </returns>
+        public override bool Equals(object obj)
+        {
+            if (obj == null)
+            {
+                return false;
+            }
+
+            var other = obj as Partition;
+            if (other == null)
+            {
+                return false;
+            }
+
+            return this.BrokerId == other.BrokerId && this.PartId == other.PartId;
+        }
+
+        /// <summary>
+        /// Gets hash code of current object
+        /// </summary>
+        /// <returns>
+        /// Hash code
+        /// </returns>
+        public override int GetHashCode()
+        {
+            return (31 * (17 + this.BrokerId)) + this.PartId;
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Consumer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Consumer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Consumer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Consumer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,287 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Consumers
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Globalization;
+    using System.Linq;
+    using System.Reflection;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Exceptions;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Requests;
+    using Kafka.Client.Utils;
+    using log4net;
+
+    /// <summary>
+    /// The low-level API of consumer of Kafka messages
+    /// </summary>
+    /// <remarks>
+    /// Maintains a connection to a single broker and has a close correspondence 
+    /// to the network requests sent to the server.
+    /// Also, is completely stateless.
+    /// </remarks>
+    public class Consumer : IConsumer
+    {
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);  
+
+        private readonly ConsumerConfig config;
+
+        /// <summary>
+        /// Gets the server to which the connection is to be established.
+        /// </summary>
+        public string Host { get; private set; }
+
+        /// <summary>
+        /// Gets the port to which the connection is to be established.
+        /// </summary>
+        public int Port { get; private set; }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Consumer"/> class.
+        /// </summary>
+        /// <param name="config">
+        /// The consumer configuration.
+        /// </param>
+        public Consumer(ConsumerConfig config)
+        {
+            Guard.Assert<ArgumentNullException>(() => config != null);
+
+            this.config = config;
+            this.Host = config.Host;
+            this.Port = config.Port;
+        }
+
+        /// <summary>
+        /// Fetch a set of messages from a topic.
+        /// </summary>
+        /// <param name="request">
+        /// Specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
+        /// </param>
+        /// <returns>
+        /// A set of fetched messages.
+        /// </returns>
+        /// <remarks>
+        /// Offset is passed in on every request, allowing the user to maintain this metadata 
+        /// however they choose.
+        /// </remarks>
+        public BufferedMessageSet Fetch(FetchRequest request)
+        {
+            BufferedMessageSet result = null;
+            using (var conn = new KafkaConnection(this.Host, this.Port))
+            {
+                short tryCounter = 1;
+                bool success = false;
+                while (!success && tryCounter <= this.config.NumberOfTries)
+                {
+                    try
+                    {
+                        result = Fetch(conn, request);
+                        success = true;
+                    }
+                    catch (Exception ex)
+                    {
+                        //// if maximum number of tries reached
+                        if (tryCounter == this.config.NumberOfTries)
+                        {
+                            throw;
+                        }
+
+                        tryCounter++;
+                        Logger.InfoFormat(CultureInfo.CurrentCulture, "Fetch reconnect due to {0}", ex);
+                    }
+                }
+            }
+
+            return result;
+        }
+
+        /// <summary>
+        /// Combine multiple fetch requests in one call.
+        /// </summary>
+        /// <param name="request">
+        /// The list of fetch requests.
+        /// </param>
+        /// <returns>
+        /// A list of sets of fetched messages.
+        /// </returns>
+        /// <remarks>
+        /// Offset is passed in on every request, allowing the user to maintain this metadata 
+        /// however they choose.
+        /// </remarks>
+        public IList<BufferedMessageSet> MultiFetch(MultiFetchRequest request)
+        {
+            var result = new List<BufferedMessageSet>();
+            using (var conn = new KafkaConnection(this.Host, this.Port))
+            {
+                short tryCounter = 1;
+                bool success = false;
+                while (!success && tryCounter <= this.config.NumberOfTries)
+                {
+                    try
+                    {
+                        MultiFetch(conn, request, result);
+                        success = true;
+                    }
+                    catch (Exception ex)
+                    {
+                        // if maximum number of tries reached
+                        if (tryCounter == this.config.NumberOfTries)
+                        {
+                            throw;
+                        }
+
+                        tryCounter++;
+                        Logger.InfoFormat(CultureInfo.CurrentCulture, "MultiFetch reconnect due to {0}", ex);
+                    }
+                }
+            }
+
+            return result;
+        }
+
+        /// <summary>
+        /// Gets a list of valid offsets (up to maxSize) before the given time.
+        /// </summary>
+        /// <param name="request">
+        /// The offset request.
+        /// </param>
+        /// <returns>
+        /// The list of offsets, in descending order.
+        /// </returns>
+        public IList<long> GetOffsetsBefore(OffsetRequest request)
+        {
+            var offsets = new List<long>();
+            using (var conn = new KafkaConnection(this.Host, this.Port))
+            {
+                short tryCounter = 1;
+                bool success = false;
+                while (!success && tryCounter <= this.config.NumberOfTries)
+                {
+                    try
+                    {
+                        GetOffsetsBefore(conn, request, offsets);
+                        success = true;
+                    }
+                    catch (Exception ex)
+                    {
+                        // if maximum number of tries reached
+                        if (tryCounter == this.config.NumberOfTries)
+                        {
+                            throw;
+                        }
+
+                        tryCounter++;
+                        Logger.InfoFormat(CultureInfo.CurrentCulture, "GetOffsetsBefore reconnect due to {0}", ex);
+                    }
+                }
+            }
+
+            return offsets;
+        }
+
+        private static BufferedMessageSet Fetch(KafkaConnection conn, FetchRequest request)
+        {
+            conn.Write(request);
+            int dataLength = BitConverter.ToInt32(BitWorks.ReverseBytes(conn.Read(4)), 0);
+            if (dataLength > 0)
+            {
+                byte[] data = conn.Read(dataLength);
+
+                int errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Take(2).ToArray()), 0);
+                if (errorCode != KafkaException.NoError)
+                {
+                    throw new KafkaException(errorCode);
+                }
+
+                // skip the error code
+                byte[] unbufferedData = data.Skip(2).ToArray();
+                return BufferedMessageSet.ParseFrom(unbufferedData);
+            }
+
+            return null;
+        }
+
+        private static void MultiFetch(KafkaConnection conn, MultiFetchRequest request, IList<BufferedMessageSet> result)
+        {
+            result.Clear();
+            conn.Write(request);
+            int dataLength = BitConverter.ToInt32(BitWorks.ReverseBytes(conn.Read(4)), 0);
+            if (dataLength <= 0)
+            {
+                return;
+            }
+
+            byte[] data = conn.Read(dataLength);
+
+            int errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Take(2).ToArray()), 0);
+            if (errorCode != KafkaException.NoError)
+            {
+                throw new KafkaException(errorCode);
+            }
+
+            // skip the error code
+            byte[] unbufferedData = data.Skip(2).ToArray();
+            for (int i = 0; i < request.ConsumerRequests.Count; i++)
+            {
+                int partLength = BitConverter.ToInt32(BitWorks.ReverseBytes(unbufferedData.Take(4).ToArray()), 0);
+                errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(unbufferedData.Skip(4).Take(2).ToArray()), 0);
+                if (errorCode != KafkaException.NoError)
+                {
+                    throw new KafkaException(errorCode);
+                }
+
+                result.Add(BufferedMessageSet.ParseFrom(unbufferedData.Skip(6).Take(partLength - 2).ToArray()));
+                unbufferedData = unbufferedData.Skip(partLength + 4).ToArray();
+            }
+        }
+
+        private static void GetOffsetsBefore(KafkaConnection conn, OffsetRequest request, IList<long> offsets)
+        {
+            offsets.Clear(); // to make sure the list is clean after some previous attampts to get data
+            conn.Write(request);
+            int dataLength = BitConverter.ToInt32(BitWorks.ReverseBytes(conn.Read(4)), 0);
+
+            if (dataLength > 0)
+            {
+                byte[] data = conn.Read(dataLength);
+
+                int errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Take(2).ToArray()), 0);
+                if (errorCode != KafkaException.NoError)
+                {
+                    throw new KafkaException(errorCode);
+                }
+
+                // skip the error code and process the rest
+                byte[] unbufferedData = data.Skip(2).ToArray();
+
+                // first four bytes are the number of offsets
+                int numOfOffsets =
+                    BitConverter.ToInt32(BitWorks.ReverseBytes(unbufferedData.Take(4).ToArray()), 0);
+
+                for (int ix = 0; ix < numOfOffsets; ix++)
+                {
+                    int position = (ix * 8) + 4;
+                    offsets.Add(
+                        BitConverter.ToInt64(
+                            BitWorks.ReverseBytes(unbufferedData.Skip(position).Take(8).ToArray()), 0));
+                }
+            }
+        }
+    }
+}