You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/21 21:17:25 UTC

svn commit: r1173797 [7/10] - in /incubator/kafka/trunk/clients/csharp: ./ lib/StyleCop/ src/Kafka/ src/Kafka/Kafka.Client/ src/Kafka/Kafka.Client/Cfg/ src/Kafka/Kafka.Client/Cluster/ src/Kafka/Kafka.Client/Consumers/ src/Kafka/Kafka.Client/Exceptions/...

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.Watcher.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.Watcher.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.Watcher.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.Watcher.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,680 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration
+{
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Threading;
+    using Kafka.Client.Utils;
+    using Kafka.Client.ZooKeeperIntegration.Events;
+    using Kafka.Client.ZooKeeperIntegration.Listeners;
+    using ZooKeeperNet;
+
+    internal partial class ZooKeeperClient
+    {
+        /// <summary>
+        /// Represents the method that will handle a ZooKeeper event  
+        /// </summary>
+        /// <param name="args">
+        /// The args.
+        /// </param>
+        /// <typeparam name="T">
+        /// Type of event data
+        /// </typeparam>
+        public delegate void ZooKeeperEventHandler<T>(T args)
+            where T : ZooKeeperEventArgs;
+
+        /// <summary>
+        /// Occurs when ZooKeeper connection state changes
+        /// </summary>
+        public event ZooKeeperEventHandler<ZooKeeperStateChangedEventArgs> StateChanged
+        {
+            add
+            {
+                this.EnsuresNotDisposed();
+                lock (this.eventLock)
+                {
+                    this.stateChangedHandlers -= value;
+                    this.stateChangedHandlers += value;
+                }
+            }
+
+            remove
+            {
+                this.EnsuresNotDisposed();
+                lock (this.eventLock)
+                {
+                    this.stateChangedHandlers -= value;
+                }
+            }
+        }
+
+        /// <summary>
+        /// Occurs when ZooKeeper session re-creates
+        /// </summary>
+        public event ZooKeeperEventHandler<ZooKeeperSessionCreatedEventArgs> SessionCreated
+        {
+            add
+            {
+                this.EnsuresNotDisposed();
+                lock (this.eventLock)
+                {
+                    this.sessionCreatedHandlers -= value;
+                    this.sessionCreatedHandlers += value;
+                }
+            }
+
+            remove
+            {
+                this.EnsuresNotDisposed();
+                lock (this.eventLock)
+                {
+                    this.sessionCreatedHandlers -= value;
+                }
+            }
+        }
+
+        private readonly ConcurrentQueue<ZooKeeperEventArgs> eventsQueue = new ConcurrentQueue<ZooKeeperEventArgs>();
+        private readonly object eventLock = new object();
+        private ZooKeeperEventHandler<ZooKeeperStateChangedEventArgs> stateChangedHandlers;
+        private ZooKeeperEventHandler<ZooKeeperSessionCreatedEventArgs> sessionCreatedHandlers;
+        private Thread eventWorker;
+        private Thread zooKeeperEventWorker;
+        private readonly ConcurrentDictionary<string, ChildChangedEventItem> childChangedHandlers = new ConcurrentDictionary<string, ChildChangedEventItem>();
+        private readonly ConcurrentDictionary<string, DataChangedEventItem> dataChangedHandlers = new ConcurrentDictionary<string, DataChangedEventItem>();
+        private DateTime? idleTime;
+
+        /// <summary>
+        /// Gets time (in miliseconds) of event thread iddleness
+        /// </summary>
+        /// <remarks>
+        /// Used for testing purpose
+        /// </remarks>
+        public int IdleTime
+        {
+            get
+            {
+                return this.idleTime.HasValue ? Convert.ToInt32((DateTime.Now - this.idleTime.Value).TotalMilliseconds) : 0;
+            }
+        }
+
+        /// <summary>
+        /// Processes ZooKeeper event
+        /// </summary>
+        /// <param name="e">
+        /// The event data.
+        /// </param>
+        /// <remarks>
+        /// Requires installed watcher
+        /// </remarks>
+        public void Process(WatchedEvent e)
+        {
+            this.EnsuresNotDisposed();
+            Logger.Debug("Received event: " + e);
+            this.zooKeeperEventWorker = Thread.CurrentThread;
+            if (this.shutdownTriggered)
+            {
+                Logger.Debug("ignoring event '{" + e.Type + " | " + e.Path + "}' since shutdown triggered");
+                return;
+            }
+
+            bool stateChanged = e.Path == null;
+            bool znodeChanged = e.Path != null;
+            bool dataChanged =
+                e.Type == EventType.NodeDataChanged
+                || e.Type == EventType.NodeDeleted
+                || e.Type == EventType.NodeCreated
+                || e.Type == EventType.NodeChildrenChanged;
+
+            lock (this.somethingChanged)
+            {
+                try
+                {
+                    if (stateChanged)
+                    {
+                        this.ProcessStateChange(e);
+                    }
+
+                    if (dataChanged)
+                    {
+                        this.ProcessDataOrChildChange(e);
+                    }
+                }
+                finally
+                {
+                    if (stateChanged)
+                    {
+                        lock (this.stateChangedLock)
+                        {
+                            Monitor.PulseAll(this.stateChangedLock);
+                        }
+
+                        if (e.State == KeeperState.Expired)
+                        {
+                            lock (this.znodeChangedLock)
+                            {
+                                Monitor.PulseAll(this.znodeChangedLock);
+                            }
+
+                            foreach (string path in this.childChangedHandlers.Keys)
+                            {
+                                this.Enqueue(new ZooKeeperChildChangedEventArgs(path));
+                            }
+
+                            foreach (string path in this.dataChangedHandlers.Keys)
+                            {
+                                this.Enqueue(new ZooKeeperDataChangedEventArgs(path));
+                            }
+                        }
+                    }
+
+                    if (znodeChanged)
+                    {
+                        lock (this.znodeChangedLock)
+                        {
+                            Monitor.PulseAll(this.znodeChangedLock);
+                        }
+                    }
+                }
+
+                Monitor.PulseAll(this.somethingChanged);
+            }
+        }
+
+        /// <summary>
+        /// Subscribes listeners on ZooKeeper state changes events
+        /// </summary>
+        /// <param name="listener">
+        /// The listener.
+        /// </param>
+        public void Subscribe(IZooKeeperStateListener listener)
+        {
+            Guard.Assert<ArgumentNullException>(() => listener != null);
+
+            this.EnsuresNotDisposed();
+            this.StateChanged += listener.HandleStateChanged;
+            this.SessionCreated += listener.HandleSessionCreated;
+            Logger.Debug("Subscribed state changes handler " + listener.GetType().Name);
+        }
+
+        /// <summary>
+        /// Un-subscribes listeners on ZooKeeper state changes events
+        /// </summary>
+        /// <param name="listener">
+        /// The listener.
+        /// </param>
+        public void Unsubscribe(IZooKeeperStateListener listener)
+        {
+            Guard.Assert<ArgumentNullException>(() => listener != null);
+
+            this.EnsuresNotDisposed();
+            this.StateChanged -= listener.HandleStateChanged;
+            this.SessionCreated -= listener.HandleSessionCreated;
+            Logger.Debug("Unsubscribed state changes handler " + listener.GetType().Name);
+        }
+
+        /// <summary>
+        /// Subscribes listeners on ZooKeeper child changes under given path
+        /// </summary>
+        /// <param name="path">
+        /// The parent path.
+        /// </param>
+        /// <param name="listener">
+        /// The listener.
+        /// </param>
+        public void Subscribe(string path, IZooKeeperChildListener listener)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.Assert<ArgumentNullException>(() => listener != null);
+
+            this.EnsuresNotDisposed();
+            this.childChangedHandlers.AddOrUpdate(
+                path,
+                new ChildChangedEventItem(Logger, listener.HandleChildChange),
+                (key, oldValue) => { oldValue.ChildChanged += listener.HandleChildChange; return oldValue; });
+            this.WatchForChilds(path);
+            Logger.Debug("Subscribed child changes handler " + listener.GetType().Name + " for path: " + path);
+        }
+
+        /// <summary>
+        /// Un-subscribes listeners on ZooKeeper child changes under given path
+        /// </summary>
+        /// <param name="path">
+        /// The parent path.
+        /// </param>
+        /// <param name="listener">
+        /// The listener.
+        /// </param>
+        public void Unsubscribe(string path, IZooKeeperChildListener listener)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.Assert<ArgumentNullException>(() => listener != null);
+
+            this.EnsuresNotDisposed();
+            this.childChangedHandlers.AddOrUpdate(
+                path,
+                new ChildChangedEventItem(Logger),
+                (key, oldValue) => { oldValue.ChildChanged -= listener.HandleChildChange; return oldValue; });
+            Logger.Debug("Unsubscribed child changes handler " + listener.GetType().Name + " for path: " + path);
+        }
+
+        /// <summary>
+        /// Subscribes listeners on ZooKeeper data changes under given path
+        /// </summary>
+        /// <param name="path">
+        /// The parent path.
+        /// </param>
+        /// <param name="listener">
+        /// The listener.
+        /// </param>
+        public void Subscribe(string path, IZooKeeperDataListener listener)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.Assert<ArgumentNullException>(() => listener != null);
+
+            this.EnsuresNotDisposed();
+            this.dataChangedHandlers.AddOrUpdate(
+                path,
+                new DataChangedEventItem(Logger, listener.HandleDataChange, listener.HandleDataDelete),
+                (key, oldValue) =>
+                {
+                    oldValue.DataChanged += listener.HandleDataChange;
+                    oldValue.DataDeleted += listener.HandleDataDelete;
+                    return oldValue;
+                });
+            this.WatchForData(path);
+            Logger.Debug("Subscribed data changes handler " + listener.GetType().Name + " for path: " + path);
+        }
+
+        /// <summary>
+        /// Un-subscribes listeners on ZooKeeper data changes under given path
+        /// </summary>
+        /// <param name="path">
+        /// The parent path.
+        /// </param>
+        /// <param name="listener">
+        /// The listener.
+        /// </param>
+        public void Unsubscribe(string path, IZooKeeperDataListener listener)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+            Guard.Assert<ArgumentNullException>(() => listener != null);
+
+            this.EnsuresNotDisposed();
+            this.dataChangedHandlers.AddOrUpdate(
+                path,
+                new DataChangedEventItem(Logger),
+                (key, oldValue) =>
+                {
+                    oldValue.DataChanged -= listener.HandleDataChange;
+                    oldValue.DataDeleted -= listener.HandleDataDelete;
+                    return oldValue;
+                });
+            Logger.Debug("Unsubscribed data changes handler " + listener.GetType().Name + " for path: " + path);
+        }
+
+        /// <summary>
+        /// Un-subscribes all listeners
+        /// </summary>
+        public void UnsubscribeAll()
+        {
+            this.EnsuresNotDisposed();
+            lock (this.eventLock)
+            {
+                this.stateChangedHandlers = null;
+                this.sessionCreatedHandlers = null;
+                this.childChangedHandlers.Clear();
+                this.dataChangedHandlers.Clear();
+            }
+
+            Logger.Debug("Unsubscribed all handlers");
+        }
+
+        /// <summary>
+        /// Installs a child watch for the given path. 
+        /// </summary>
+        /// <param name="path">
+        /// The parent path.
+        /// </param>
+        /// <returns>
+        /// the current children of the path or null if the znode with the given path doesn't exist
+        /// </returns>
+        public IList<string> WatchForChilds(string path)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            if (this.zooKeeperEventWorker != null && Thread.CurrentThread == this.zooKeeperEventWorker)
+            {
+                throw new InvalidOperationException("Must not be done in the zookeeper event thread.");
+            }
+
+            return this.RetryUntilConnected(
+                () =>
+                {
+                    this.Exists(path);
+                    try
+                    {
+                        return this.GetChildren(path);
+                    }
+                    catch (KeeperException.NoNodeException)
+                    {
+                        return null;
+                    }
+                });
+        }
+
+        /// <summary>
+        /// Installs a data watch for the given path. 
+        /// </summary>
+        /// <param name="path">
+        /// The parent path.
+        /// </param>
+        public void WatchForData(string path)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            this.RetryUntilConnected(
+                () => this.Exists(path, true));
+        }
+
+        /// <summary>
+        /// Checks whether any data or child listeners are registered
+        /// </summary>
+        /// <param name="path">
+        /// The path.
+        /// </param>
+        /// <returns>
+        /// Value indicates whether any data or child listeners are registered
+        /// </returns>
+        private bool HasListeners(string path)
+        {
+            ChildChangedEventItem childChanged;
+            this.childChangedHandlers.TryGetValue(path, out childChanged);
+            if (childChanged != null && childChanged.Count > 0)
+            {
+                return true;
+            }
+
+            DataChangedEventItem dataChanged;
+            this.dataChangedHandlers.TryGetValue(path, out dataChanged);
+            if (dataChanged != null && dataChanged.TotalCount > 0)
+            {
+                return true;
+            }
+
+            return false;
+        }
+
+        /// <summary>
+        /// Event thread starting method
+        /// </summary>
+        private void RunEventWorker()
+        {
+            Logger.Debug("Starting ZooKeeper watcher event thread");
+            try
+            {
+                this.PoolEventsQueue();
+            }
+            catch (ThreadInterruptedException)
+            {
+                Logger.Debug("Terminate ZooKeeper watcher event thread");
+            }
+        }
+
+        /// <summary>
+        /// Pools ZooKeeper events form events queue
+        /// </summary>
+        /// <remarks>
+        /// Thread sleeps if queue is empty
+        /// </remarks>
+        private void PoolEventsQueue()
+        {
+            while (true)
+            {
+                while (!this.eventsQueue.IsEmpty)
+                {
+                    this.Dequeue();
+                }
+
+                lock (this.somethingChanged)
+                {
+                    Logger.Debug("Awaiting events ...");
+                    this.idleTime = DateTime.Now;
+                    Monitor.Wait(this.somethingChanged);
+                    this.idleTime = null;
+                }
+            }
+        }
+
+        /// <summary>
+        /// Enqueues new event from ZooKeeper in events queue
+        /// </summary>
+        /// <param name="e">
+        /// The event from ZooKeeper.
+        /// </param>
+        private void Enqueue(ZooKeeperEventArgs e)
+        {
+            Logger.Debug("New event queued: " + e);
+            this.eventsQueue.Enqueue(e);
+        }
+
+        /// <summary>
+        /// Dequeues event from events queue and invokes subscribed handlers
+        /// </summary>
+        private void Dequeue()
+        {
+            try
+            {
+                ZooKeeperEventArgs e;
+                var success = this.eventsQueue.TryDequeue(out e);
+                if (success)
+                {
+                    if (e != null)
+                    {
+                        Logger.Debug("Event dequeued: " + e);
+                        switch (e.Type)
+                        {
+                            case ZooKeeperEventTypes.StateChanged:
+                                this.OnStateChanged((ZooKeeperStateChangedEventArgs)e);
+                                break;
+                            case ZooKeeperEventTypes.SessionCreated:
+                                this.OnSessionCreated((ZooKeeperSessionCreatedEventArgs)e);
+                                break;
+                            case ZooKeeperEventTypes.ChildChanged:
+                                this.OnChildChanged((ZooKeeperChildChangedEventArgs)e);
+                                break;
+                            case ZooKeeperEventTypes.DataChanged:
+                                this.OnDataChanged((ZooKeeperDataChangedEventArgs)e);
+                                break;
+                            default:
+                                throw new InvalidOperationException("Not supported event type");
+                        }
+                    }
+                }
+            }
+            catch (Exception exc)
+            {
+                Logger.Warn("Error handling event ", exc);
+            }
+        }
+
+        /// <summary>
+        /// Processess ZooKeeper state changes events
+        /// </summary>
+        /// <param name="e">
+        /// The event data.
+        /// </param>
+        private void ProcessStateChange(WatchedEvent e)
+        {
+            Logger.Info("zookeeper state changed (" + e.State + ")");
+            lock (this.stateChangedLock)
+            {
+                this.currentState = e.State;
+            }
+
+            if (this.shutdownTriggered)
+            {
+                return;
+            }
+
+            this.Enqueue(new ZooKeeperStateChangedEventArgs(e.State));
+            if (e.State == KeeperState.Expired)
+            {
+                this.Reconnect(this.connection.Servers, this.connection.SessionTimeout);
+                this.Enqueue(ZooKeeperSessionCreatedEventArgs.Empty);
+            }
+        }
+
+        /// <summary>
+        /// Processess ZooKeeper childs or data changes events
+        /// </summary>
+        /// <param name="e">
+        /// The event data.
+        /// </param>
+        private void ProcessDataOrChildChange(WatchedEvent e)
+        {
+            if (this.shutdownTriggered)
+            {
+                return;
+            }
+
+            if (e.Type == EventType.NodeChildrenChanged
+                || e.Type == EventType.NodeCreated
+                || e.Type == EventType.NodeDeleted)
+            {
+                this.Enqueue(new ZooKeeperChildChangedEventArgs(e.Path));
+            }
+
+            if (e.Type == EventType.NodeDataChanged
+                || e.Type == EventType.NodeCreated
+                || e.Type == EventType.NodeDeleted)
+            {
+                this.Enqueue(new ZooKeeperDataChangedEventArgs(e.Path));
+            }
+        }
+
+        /// <summary>
+        /// Invokes subscribed handlers for ZooKeeeper state changes event
+        /// </summary>
+        /// <param name="e">
+        /// The event data.
+        /// </param>
+        private void OnStateChanged(ZooKeeperStateChangedEventArgs e)
+        {
+            try
+            {
+                var handlers = this.stateChangedHandlers;
+                if (handlers == null)
+                {
+                    return;
+                }
+
+                foreach (var handler in handlers.GetInvocationList())
+                {
+                    Logger.Debug(e + " sent to " + handler.Target);
+                }
+
+                handlers(e);
+            }
+            catch (Exception exc)
+            {
+                Logger.Error("Failed to handle state changed event.", exc);
+            }
+        }
+
+        /// <summary>
+        /// Invokes subscribed handlers for ZooKeeeper session re-creates event
+        /// </summary>
+        /// <param name="e">
+        /// The event data.
+        /// </param>
+        private void OnSessionCreated(ZooKeeperSessionCreatedEventArgs e)
+        {
+            var handlers = this.sessionCreatedHandlers;
+            if (handlers == null)
+            {
+                return;
+            }
+
+            foreach (var handler in handlers.GetInvocationList())
+            {
+                Logger.Debug(e + " sent to " + handler.Target);
+            }
+
+            handlers(e);
+        }
+
+        /// <summary>
+        /// Invokes subscribed handlers for ZooKeeeper child changes event
+        /// </summary>
+        /// <param name="e">
+        /// The event data.
+        /// </param>
+        private void OnChildChanged(ZooKeeperChildChangedEventArgs e)
+        {
+            ChildChangedEventItem handlers;
+            this.childChangedHandlers.TryGetValue(e.Path, out handlers);
+            if (handlers == null || handlers.Count == 0)
+            {
+                return;
+            }
+
+            this.Exists(e.Path);
+            try
+            {
+                IList<string> children = this.GetChildren(e.Path);
+                e.Children = children;
+            }
+            catch (KeeperException.NoNodeException)
+            {
+            }
+
+            handlers.OnChildChanged(e);
+        }
+
+        /// <summary>
+        /// Invokes subscribed handlers for ZooKeeeper data changes event
+        /// </summary>
+        /// <param name="e">
+        /// The event data.
+        /// </param>
+        private void OnDataChanged(ZooKeeperDataChangedEventArgs e)
+        {
+            DataChangedEventItem handlers;
+            this.dataChangedHandlers.TryGetValue(e.Path, out handlers);
+            if (handlers == null || handlers.TotalCount == 0)
+            {
+                return;
+            }
+
+            try
+            {
+                this.Exists(e.Path, true);
+                var data = this.ReadData<string>(e.Path, null, true);
+                e.Data = data;
+                handlers.OnDataChanged(e);
+            }
+            catch (KeeperException.NoNodeException)
+            {
+                handlers.OnDataDeleted(e);
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,891 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Reflection;
+    using System.Threading;
+    using Kafka.Client.Exceptions;
+    using Kafka.Client.Utils;
+    using log4net;
+    using Org.Apache.Zookeeper.Data;
+    using ZooKeeperNet;
+
+    /// <summary>
+    /// Abstracts the interaction with zookeeper and allows permanent (not just one time) watches on nodes in ZooKeeper 
+    /// </summary>
+    internal partial class ZooKeeperClient : IZooKeeperClient
+    {
+        private const int DefaultConnectionTimeout = int.MaxValue;
+        public const string DefaultConsumersPath = "/consumers";
+        public const string DefaultBrokerIdsPath = "/brokers/ids";
+        public const string DefaultBrokerTopicsPath = "/brokers/topics";
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+        private IZooKeeperConnection connection;
+        private bool shutdownTriggered;
+        private KeeperState currentState;
+        private readonly IZooKeeperSerializer serializer;
+        private readonly object stateChangedLock = new object();
+        private readonly object znodeChangedLock = new object();
+        private readonly object somethingChanged = new object();
+        private readonly object shuttingDownLock = new object();
+        private volatile bool disposed;
+        private readonly int connectionTimeout;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ZooKeeperClient"/> class.
+        /// </summary>
+        /// <param name="connection">
+        /// The connection to ZooKeeper.
+        /// </param>
+        /// <param name="serializer">
+        /// The given serializer.
+        /// </param>
+        /// <param name="connectionTimeout">
+        /// The connection timeout (in miliseconds). Default is infinitive.
+        /// </param>
+        /// <remarks>
+        /// Default serializer is string UTF-8 serializer
+        /// </remarks>
+        public ZooKeeperClient(
+            IZooKeeperConnection connection, 
+            IZooKeeperSerializer serializer, 
+            int connectionTimeout = DefaultConnectionTimeout)
+        {
+            this.serializer = serializer;
+            this.connection = connection;
+            this.connectionTimeout = connectionTimeout;
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ZooKeeperClient"/> class.
+        /// </summary>
+        /// <param name="servers">
+        /// The list of ZooKeeper servers.
+        /// </param>
+        /// <param name="sessionTimeout">
+        /// The session timeout (in miliseconds).
+        /// </param>
+        /// <param name="serializer">
+        /// The given serializer.
+        /// </param>
+        /// <remarks>
+        /// Default serializer is string UTF-8 serializer.
+        /// It is recommended to use quite large sessions timeouts for ZooKeeper.
+        /// </remarks>
+        public ZooKeeperClient(string servers, int sessionTimeout, IZooKeeperSerializer serializer)
+            : this(new ZooKeeperConnection(servers, sessionTimeout), serializer)
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ZooKeeperClient"/> class.
+        /// </summary>
+        /// <param name="servers">
+        /// The list of ZooKeeper servers.
+        /// </param>
+        /// <param name="sessionTimeout">
+        /// The session timeout (in miliseconds).
+        /// </param>
+        /// <param name="serializer">
+        /// The given serializer.
+        /// </param>
+        /// <param name="connectionTimeout">
+        /// The connection timeout (in miliseconds).
+        /// </param>
+        /// <remarks>
+        /// Default serializer is string UTF-8 serializer.
+        /// It is recommended to use quite large sessions timeouts for ZooKeeper.
+        /// </remarks>
+        public ZooKeeperClient(
+            string servers, 
+            int sessionTimeout, 
+            IZooKeeperSerializer serializer,
+            int connectionTimeout)
+            : this(new ZooKeeperConnection(servers, sessionTimeout), serializer, connectionTimeout)
+        {
+        }
+
+        /// <summary>
+        /// Connects to ZooKeeper server within given time period and installs watcher in ZooKeeper
+        /// </summary>
+        /// <remarks>
+        /// Also, starts background thread for event handling
+        /// </remarks>
+        public void Connect()
+        {
+            this.EnsuresNotDisposed();
+            bool started = false;
+            try
+            {
+                this.shutdownTriggered = false;
+                this.eventWorker = new Thread(this.RunEventWorker) { IsBackground = true };
+                this.eventWorker.Name = "ZooKeeperkWatcher-EventThread-" + this.eventWorker.ManagedThreadId + "-" + this.connection.Servers;
+                this.eventWorker.Start();
+                this.connection.Connect(this);
+                Logger.Debug("Awaiting connection to Zookeeper server");
+                if (!this.WaitUntilConnected(this.connectionTimeout))
+                {
+                    throw new ZooKeeperException(
+                        "Unable to connect to zookeeper server within timeout: " + this.connection.SessionTimeout);
+                }
+
+                started = true;
+                Logger.Debug("Connection to Zookeeper server established");
+            }
+            catch (ThreadInterruptedException)
+            {
+                throw new InvalidOperationException(
+                    "Not connected with zookeeper server yet. Current state is " + this.connection.ClientState);
+            }
+            finally
+            {
+                if (!started)
+                {
+                    this.Disconnect();
+                }
+            }
+        }
+
+        /// <summary>
+        /// Closes current connection to ZooKeeper
+        /// </summary>
+        /// <remarks>
+        /// Also, stops background thread
+        /// </remarks>
+        public void Disconnect()
+        {
+            Logger.Debug("Closing ZooKeeperClient...");
+            this.shutdownTriggered = true;
+            this.eventWorker.Interrupt();
+            this.eventWorker.Join(2000);
+            this.connection.Dispose();
+            this.connection = null;
+        }
+
+        /// <summary>
+        /// Re-connect to ZooKeeper server when session expired
+        /// </summary>
+        /// <param name="servers">
+        /// The servers.
+        /// </param>
+        /// <param name="connectionTimeout">
+        /// The connection timeout.
+        /// </param>
+        public void Reconnect(string servers, int connectionTimeout)
+        {
+            this.EnsuresNotDisposed();
+            Logger.Debug("Reconnecting");
+            this.connection.Dispose();
+            this.connection = new ZooKeeperConnection(servers, connectionTimeout);
+            this.connection.Connect(this);
+            Logger.Debug("Reconnected");
+        }
+
+        /// <summary>
+        /// Waits untill ZooKeeper connection is established
+        /// </summary>
+        /// <param name="connectionTimeout">
+        /// The connection timeout.
+        /// </param>
+        /// <returns>
+        /// Status
+        /// </returns>
+        public bool WaitUntilConnected(int connectionTimeout)
+        {
+            Guard.Assert<ArgumentOutOfRangeException>(() => connectionTimeout > 0);
+            this.EnsuresNotDisposed();
+            if (this.eventWorker != null && this.eventWorker == Thread.CurrentThread)
+            {
+                throw new InvalidOperationException("Must not be done in the ZooKeeper event thread.");
+            }
+
+            Logger.Debug("Waiting for keeper state: " + KeeperState.SyncConnected);
+            bool stillWaiting = true;
+            lock (this.stateChangedLock)
+            {
+                while (this.currentState != KeeperState.SyncConnected)
+                {
+                    if (!stillWaiting)
+                    {
+                        return false;
+                    }
+
+                    stillWaiting = Monitor.Wait(this.stateChangedLock, connectionTimeout);
+                }
+
+                Logger.Debug("State is " + this.currentState);
+            }
+
+            return true;
+        }
+
+        /// <summary>
+        /// Retries given delegate until connections is established
+        /// </summary>
+        /// <param name="callback">
+        /// The delegate to invoke.
+        /// </param>
+        /// <typeparam name="T">
+        /// Type of data returned by delegate 
+        /// </typeparam>
+        /// <returns>
+        /// data returned by delegate
+        /// </returns>
+        public T RetryUntilConnected<T>(Func<T> callback)
+        {
+            Guard.Assert<ArgumentNullException>(() => callback != null);
+            this.EnsuresNotDisposed();
+            if (this.zooKeeperEventWorker != null && this.zooKeeperEventWorker == Thread.CurrentThread)
+            {
+                throw new InvalidOperationException("Must not be done in the zookeeper event thread");
+            }
+
+            while (true)
+            {
+                try
+                {
+                    return callback();
+                }
+                catch (KeeperException.ConnectionLossException)
+                {
+                    Thread.Yield();
+                    this.WaitUntilConnected(this.connection.SessionTimeout);
+                }
+                catch (KeeperException.SessionExpiredException)
+                {
+                    Thread.Yield();
+                    this.WaitUntilConnected(this.connection.SessionTimeout);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Checks whether znode for a given path exists
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <returns>
+        /// Result of check
+        /// </returns>
+        /// <remarks>
+        /// Will reinstall watcher in ZooKeeper if any listener for given path exists 
+        /// </remarks>
+        public bool Exists(string path)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            bool hasListeners = this.HasListeners(path);
+            return this.Exists(path, hasListeners);
+        }
+
+        /// <summary>
+        /// Checks whether znode for a given path exists.
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="watch">
+        /// Indicates whether should reinstall watcher in ZooKeeper.
+        /// </param>
+        /// <returns>
+        /// Result of check
+        /// </returns>
+        public bool Exists(string path, bool watch)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            return this.RetryUntilConnected(
+                () => this.connection.Exists(path, watch));
+        }
+
+        /// <summary>
+        /// Gets all children for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <returns>
+        /// Children
+        /// </returns>
+        /// <remarks>
+        /// Will reinstall watcher in ZooKeeper if any listener for given path exists 
+        /// </remarks>
+        public IList<string> GetChildren(string path)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            bool hasListeners = this.HasListeners(path);
+            return this.GetChildren(path, hasListeners);
+        }
+
+        /// <summary>
+        /// Gets all children for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="watch">
+        /// Indicates whether should reinstall watcher in ZooKeeper.
+        /// </param>
+        /// <returns>
+        /// Children
+        /// </returns>
+        public IList<string> GetChildren(string path, bool watch)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            return this.RetryUntilConnected(
+                () => this.connection.GetChildren(path, watch));
+        }
+
+        /// <summary>
+        /// Counts number of children for a given path.
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <returns>
+        /// Number of children 
+        /// </returns>
+        /// <remarks>
+        /// Will reinstall watcher in ZooKeeper if any listener for given path exists.
+        /// Returns 0 if path does not exist
+        /// </remarks>
+        public int CountChildren(string path)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            try
+            {
+                return this.GetChildren(path).Count;
+            }
+            catch (KeeperException.NoNodeException)
+            {
+                return 0;
+            }
+        }
+
+        /// <summary>
+        /// Fetches data from a given path in ZooKeeper
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="stats">
+        /// The statistics.
+        /// </param>
+        /// <param name="watch">
+        /// Indicates whether should reinstall watcher in ZooKeeper.
+        /// </param>
+        /// <typeparam name="T">
+        /// Expected type of data
+        /// </typeparam>
+        /// <returns>
+        /// Data
+        /// </returns>
+        /// <remarks>
+        /// Uses given serializer to deserialize data
+        /// Use null for stats
+        /// </remarks>
+        public T ReadData<T>(string path, Stat stats, bool watch)
+            where T : class 
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            byte[] bytes = this.RetryUntilConnected(
+                () => this.connection.ReadData(path, stats, watch));
+            return this.serializer.Deserialize(bytes) as T;
+        }
+
+        /// <summary>
+        /// Fetches data from a given path in ZooKeeper
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="stats">
+        /// The statistics.
+        /// </param>
+        /// <typeparam name="T">
+        /// Expected type of data
+        /// </typeparam>
+        /// <returns>
+        /// Data
+        /// </returns>
+        /// <remarks>
+        /// Uses given serializer to deserialize data.
+        /// Will reinstall watcher in ZooKeeper if any listener for given path exists.
+        /// Use null for stats
+        /// </remarks>
+        public T ReadData<T>(string path, Stat stats) where T : class
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            bool hasListeners = this.HasListeners(path);
+            return this.ReadData<T>(path, null, hasListeners);
+        }
+
+        /// <summary>
+        /// Writes data for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        public void WriteData(string path, object data)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            this.WriteData(path, data, -1);
+        }
+
+        /// <summary>
+        /// Writes data for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        /// <param name="expectedVersion">
+        /// Expected version of data
+        /// </param>
+        /// <remarks>
+        /// Use -1 for expected version
+        /// </remarks>
+        public void WriteData(string path, object data, int expectedVersion)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            byte[] bytes = this.serializer.Serialize(data);
+            this.RetryUntilConnected(
+                () =>
+                    {
+                        this.connection.WriteData(path, bytes, expectedVersion);
+                        return null as object;
+                    });
+        }
+
+        /// <summary>
+        /// Deletes znode for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <returns>
+        /// Status
+        /// </returns>
+        public bool Delete(string path)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            return this.RetryUntilConnected(
+                () =>
+                    {
+                        try
+                        {
+                            this.connection.Delete(path);
+                            return true;
+                        }
+                        catch (KeeperException.NoNodeException)
+                        {
+                            return false;
+                        }
+                    });
+        }
+
+        /// <summary>
+        /// Deletes znode and his children for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <returns>
+        /// Status
+        /// </returns>
+        public bool DeleteRecursive(string path)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            IList<string> children;
+            try
+            {
+                children = this.GetChildren(path, false);
+            }
+            catch (KeeperException.NoNodeException)
+            {
+                return true;
+            }
+
+            foreach (var child in children)
+            {
+                if (!this.DeleteRecursive(path + "/" + child))
+                {
+                    return false;
+                }
+            }
+
+            return this.Delete(path);
+        }
+
+        /// <summary>
+        /// Creates persistent znode and all intermediate znodes (if do not exist) for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        public void MakeSurePersistentPathExists(string path)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            if (!this.Exists(path))
+            {
+                this.CreatePersistent(path, true);
+            }
+        }
+
+        /// <summary>
+        /// Fetches children for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The path.
+        /// </param>
+        /// <returns>
+        /// Children or null, if znode does not exist
+        /// </returns>
+        public IList<string> GetChildrenParentMayNotExist(string path)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            try
+            {
+                return this.GetChildren(path);
+            }
+            catch (KeeperException.NoNodeException)
+            {
+                return null;
+            }
+        }
+
+        /// <summary>
+        /// Fetches data from a given path in ZooKeeper
+        /// </summary>
+        /// <typeparam name="T">
+        /// Expected type of data
+        /// </typeparam>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <returns>
+        /// Data or null, if znode does not exist
+        /// </returns>
+        public T ReadData<T>(string path)
+            where T : class
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            return this.ReadData<T>(path, false);
+        }
+
+        /// <summary>
+        /// Closes connection to ZooKeeper
+        /// </summary>
+        public void Dispose()
+        {
+            if (this.disposed)
+            {
+                return;
+            }
+
+            lock (this.shuttingDownLock)
+            {
+                if (this.disposed)
+                {
+                    return;
+                }
+
+                this.disposed = true;
+            }
+
+            try
+            {
+                this.Disconnect();
+            }
+            catch (ThreadInterruptedException)
+            {
+            }
+            catch (Exception exc)
+            {
+                Logger.Debug("Ignoring unexpected errors on closing ZooKeeperClient", exc);
+            }
+
+            Logger.Debug("Closing ZooKeeperClient... done");
+        }
+
+        /// <summary>
+        /// Creates a persistent znode for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="createParents">
+        /// Indicates, whether should create missing intermediate znodes
+        /// </param>
+        /// <remarks>
+        /// Persistent znodes won't disappear after session close
+        /// </remarks>
+        public void CreatePersistent(string path, bool createParents)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            try
+            {
+                this.Create(path, null, CreateMode.Persistent);
+            }
+            catch (KeeperException.NodeExistsException)
+            {
+                if (!createParents)
+                {
+                    throw;
+                }
+            }
+            catch (KeeperException.NoNodeException)
+            {
+                if (!createParents)
+                {
+                    throw;
+                }
+
+                string parentDir = path.Substring(0, path.LastIndexOf('/'));
+                this.CreatePersistent(parentDir, true);
+                this.CreatePersistent(path, true);
+            }
+        }
+
+        /// <summary>
+        /// Creates a persistent znode for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <remarks>
+        /// Persistent znodes won't disappear after session close
+        /// Doesn't re-create missing intermediate znodes
+        /// </remarks>
+        public void CreatePersistent(string path)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.CreatePersistent(path, false);
+        }
+
+        /// <summary>
+        /// Creates a persistent znode for a given path and writes data into it
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        /// <remarks>
+        /// Persistent znodes won't disappear after session close
+        /// Doesn't re-create missing intermediate znodes
+        /// </remarks>
+        public void CreatePersistent(string path, object data)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.Create(path, data, CreateMode.Persistent);
+        }
+
+        /// <summary>
+        /// Creates a sequential, persistent znode for a given path and writes data into it
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        /// <remarks>
+        /// Persistent znodes won't disappear after session close
+        /// Doesn't re-create missing intermediate znodes
+        /// </remarks>
+        /// <returns>
+        /// The created znode's path
+        /// </returns>
+        public string CreatePersistentSequential(string path, object data)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            return this.Create(path, data, CreateMode.PersistentSequential);
+        }
+
+        /// <summary>
+        /// Helper method to create znode
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        /// <param name="mode">
+        /// The create mode.
+        /// </param>
+        /// <returns>
+        /// The created znode's path
+        /// </returns>
+        private string Create(string path, object data, CreateMode mode)
+        {
+            if (path == null)
+            {
+                throw new ArgumentNullException("Path must not be null");
+            }
+
+            byte[] bytes = data == null ? null : this.serializer.Serialize(data);
+            return this.RetryUntilConnected(() => 
+                this.connection.Create(path, bytes, mode));
+        }
+
+        /// <summary>
+        /// Creates a ephemeral znode for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <remarks>
+        /// Ephemeral znodes will disappear after session close
+        /// </remarks>
+        public void CreateEphemeral(string path)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.Create(path, null, CreateMode.Ephemeral);
+        }
+
+        /// <summary>
+        /// Creates a ephemeral znode for a given path and writes data into it
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        /// <remarks>
+        /// Ephemeral znodes will disappear after session close
+        /// </remarks>
+        public void CreateEphemeral(string path, object data)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.Create(path, data, CreateMode.Ephemeral);
+        }
+
+        /// <summary>
+        /// Creates a ephemeral, sequential znode for a given path and writes data into it
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        /// <remarks>
+        /// Ephemeral znodes will disappear after session close
+        /// </remarks>
+        /// <returns>
+        /// Created znode's path
+        /// </returns>
+        public string CreateEphemeralSequential(string path, object data)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            return this.Create(path, data, CreateMode.EphemeralSequential);
+        }
+
+        /// <summary>
+        /// Fetches data for given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="returnNullIfPathNotExists">
+        /// Indicates, whether should return null or throw exception when 
+        /// znode doesn't exist
+        /// </param>
+        /// <typeparam name="T">
+        /// Expected type of data
+        /// </typeparam>
+        /// <returns>
+        /// Data
+        /// </returns>
+        public T ReadData<T>(string path, bool returnNullIfPathNotExists)
+            where T : class 
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            try
+            {
+                return this.ReadData<T>(path, null);
+            }
+            catch (KeeperException.NoNodeException)
+            {
+                if (!returnNullIfPathNotExists)
+                {
+                    throw;
+                }
+
+                return null;
+            }
+        }
+
+        /// <summary>
+        /// Ensures that object wasn't disposed
+        /// </summary>
+        private void EnsuresNotDisposed()
+        {
+            if (this.disposed)
+            {
+                throw new ObjectDisposedException(this.GetType().Name);
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,326 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration
+{
+    using System;
+    using System.Collections.Generic;
+    using System.IO;
+    using System.Reflection;
+    using Kafka.Client.Exceptions;
+    using Kafka.Client.Utils;
+    using log4net;
+    using Org.Apache.Zookeeper.Data;
+    using ZooKeeperNet;
+
+    /// <summary>
+    /// Abstracts connection with ZooKeeper server
+    /// </summary>
+    internal class ZooKeeperConnection : IZooKeeperConnection
+    {
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+        public const int DefaultSessionTimeout = 30000;
+
+        private readonly object syncLock = new object();
+
+        private readonly object shuttingDownLock = new object();
+
+        private volatile bool disposed;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ZooKeeperConnection"/> class.
+        /// </summary>
+        /// <param name="servers">
+        /// The list of ZooKeeper servers.
+        /// </param>
+        public ZooKeeperConnection(string servers)
+            : this(servers, DefaultSessionTimeout)
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ZooKeeperConnection"/> class.
+        /// </summary>
+        /// <param name="servers">
+        /// The list of ZooKeeper servers.
+        /// </param>
+        /// <param name="sessionTimeout">
+        /// The session timeout.
+        /// </param>
+        public ZooKeeperConnection(string servers, int sessionTimeout)
+        {
+            this.Servers = servers;
+            this.SessionTimeout = sessionTimeout;
+        }
+
+        /// <summary>
+        /// Gets the list of ZooKeeper servers.
+        /// </summary>
+        public string Servers { get; private set; }
+
+        /// <summary>
+        /// Gets the ZooKeeper session timeout
+        /// </summary>
+        public int SessionTimeout { get; private set; }
+
+        /// <summary>
+        /// Gets ZooKeeper client.
+        /// </summary>
+        public ZooKeeper Client { get; private set; }
+
+        /// <summary>
+        /// Gets the ZooKeeper client state
+        /// </summary>
+        public ZooKeeper.States ClientState
+        {
+            get
+            {
+                return this.Client == null ? null : this.Client.State;
+            }
+        }
+
+        /// <summary>
+        /// Connects to ZooKeeper server
+        /// </summary>
+        /// <param name="watcher">
+        /// The watcher to be installed in ZooKeeper.
+        /// </param>
+        public void Connect(IWatcher watcher)
+        {
+            this.EnsuresNotDisposed();
+            lock (this.syncLock)
+            {
+                if (this.Client != null)
+                {
+                    throw new InvalidOperationException("ZooKeeper client has already been started");
+                }
+
+                try
+                {
+                    Logger.Debug("Starting ZK client");
+                    this.Client = new ZooKeeper(this.Servers, new TimeSpan(0, 0, 0, 0, this.SessionTimeout), watcher);
+                }
+                catch (IOException exc)
+                {
+                    throw new ZooKeeperException("Unable to connect to " + this.Servers, exc);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Deletes znode for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        public void Delete(string path)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            this.Client.Delete(path, -1);
+        }
+
+        /// <summary>
+        /// Checks whether znode for a given path exists.
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="watch">
+        /// Indicates whether should reinstall watcher in ZooKeeper.
+        /// </param>
+        /// <returns>
+        /// Result of check
+        /// </returns>
+        public bool Exists(string path, bool watch)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            return this.Client.Exists(path, true) != null;
+        }
+
+        /// <summary>
+        /// Creates znode using given create mode for given path and writes given data to it
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        /// <param name="mode">
+        /// The create mode.
+        /// </param>
+        /// <returns>
+        /// The created znode's path
+        /// </returns>
+        public string Create(string path, byte[] data, CreateMode mode)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            return this.Client.Create(path, data, Ids.OPEN_ACL_UNSAFE, mode);
+        }
+
+        /// <summary>
+        /// Gets all children for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="watch">
+        /// Indicates whether should reinstall watcher in ZooKeeper.
+        /// </param>
+        /// <returns>
+        /// Children
+        /// </returns>
+        public IList<string> GetChildren(string path, bool watch)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            return this.Client.GetChildren(path, watch);
+        }
+
+        /// <summary>
+        /// Fetches data from a given path in ZooKeeper
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="stats">
+        /// The statistics.
+        /// </param>
+        /// <param name="watch">
+        /// Indicates whether should reinstall watcher in ZooKeeper.
+        /// </param>
+        /// <returns>
+        /// Data
+        /// </returns>
+        public byte[] ReadData(string path, Stat stats, bool watch)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            return this.Client.GetData(path, watch, stats);
+        }
+
+        /// <summary>
+        /// Writes data for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        public void WriteData(string path, byte[] data)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            this.WriteData(path, data, -1);
+        }
+
+        /// <summary>
+        /// Writes data for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        /// <param name="version">
+        /// Expected version of data
+        /// </param>
+        public void WriteData(string path, byte[] data, int version)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            this.Client.SetData(path, data, version);
+        }
+
+        /// <summary>
+        /// Gets time when connetion was created
+        /// </summary>
+        /// <param name="path">
+        /// The path.
+        /// </param>
+        /// <returns>
+        /// Connection creation time
+        /// </returns>
+        public long GetCreateTime(string path)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+            this.EnsuresNotDisposed();
+            Stat stats = this.Client.Exists(path, false);
+            return stats != null ? stats.Ctime : -1;
+        }
+
+        /// <summary>
+        /// Closes underlying ZooKeeper client
+        /// </summary>
+        public void Dispose()
+        {
+            if (this.disposed)
+            {
+                return;
+            }
+
+            lock (this.shuttingDownLock)
+            {
+                if (this.disposed)
+                {
+                    return;
+                }
+
+                this.disposed = true;
+            }
+            
+            try
+            {
+                if (this.Client != null)
+                {
+                    Logger.Debug("Closing ZooKeeper client connected to " + this.Servers);
+                    this.Client.Dispose();
+                    this.Client = null;
+                    Logger.Debug("ZooKeeper client connection closed");
+                }
+            }
+            catch (Exception exc)
+            {
+                Logger.Warn("Ignoring unexpected errors on closing", exc);
+            }
+        }
+
+        /// <summary>
+        /// Ensures object wasn't disposed
+        /// </summary>
+        private void EnsuresNotDisposed()
+        {
+            if (this.disposed)
+            {
+                throw new ObjectDisposedException(this.GetType().Name);
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration
+{
+    using System;
+    using System.Linq;
+    using System.Text;
+    using Kafka.Client.Utils;
+
+    /// <summary>
+    /// Zookeeper is able to store data in form of byte arrays. This interfacte is a bridge between those byte-array format
+    /// and higher level objects.
+    /// </summary>
+    internal class ZooKeeperStringSerializer : IZooKeeperSerializer
+    {
+        public static readonly ZooKeeperStringSerializer Serializer = new ZooKeeperStringSerializer();
+
+        /// <summary>
+        /// Prevents a default instance of the <see cref="ZooKeeperStringSerializer"/> class from being created.
+        /// </summary>
+        private ZooKeeperStringSerializer()
+        {
+        }
+
+        /// <summary>
+        /// Serializes data using UTF-8 encoding
+        /// </summary>
+        /// <param name="obj">
+        /// The data to serialize
+        /// </param>
+        /// <returns>
+        /// Serialized data
+        /// </returns>
+        public byte[] Serialize(object obj)
+        {
+            Guard.Assert<ArgumentNullException>(() => obj != null);
+            return Encoding.UTF8.GetBytes(obj.ToString());
+        }
+
+        /// <summary>
+        /// Deserializes data using UTF-8 encoding
+        /// </summary>
+        /// <param name="bytes">
+        /// The serialized data
+        /// </param>
+        /// <returns>
+        /// The deserialized data
+        /// </returns>
+        public object Deserialize(byte[] bytes)
+        {
+            Guard.Assert<ArgumentNullException>(() => bytes != null);
+            Guard.Assert<ArgumentException>(() => bytes.Count() > 0);
+
+            return bytes == null ? null : Encoding.UTF8.GetString(bytes);
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.FxCop
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.FxCop?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.FxCop (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.FxCop Wed Sep 21 19:17:19 2011
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="utf-8"?>
+<FxCopProject Version="10.0" Name="Kafka">
+ <ProjectOptions>
+  <SharedProject>True</SharedProject>
+  <Stylesheet Apply="False">$(FxCopDir)\Xml\FxCopReport.xsl</Stylesheet>
+  <SaveMessages>
+   <Project Status="None" NewOnly="False" />
+   <Report Status="Active" NewOnly="False" />
+  </SaveMessages>
+  <ProjectFile Compress="True" DefaultTargetCheck="True" DefaultRuleCheck="True" SaveByRuleGroup="" Deterministic="True" />
+  <EnableMultithreadedLoad>True</EnableMultithreadedLoad>
+  <EnableMultithreadedAnalysis>True</EnableMultithreadedAnalysis>
+  <SourceLookup>True</SourceLookup>
+  <AnalysisExceptionsThreshold>10</AnalysisExceptionsThreshold>
+  <RuleExceptionsThreshold>1</RuleExceptionsThreshold>
+  <Spelling Locale="en-US" />
+  <OverrideRuleVisibilities>False</OverrideRuleVisibilities>
+  <CustomDictionaries SearchFxCopDir="True" SearchUserProfile="True" SearchProjectDir="True" />
+  <SearchGlobalAssemblyCache>True</SearchGlobalAssemblyCache>
+  <DeadlockDetectionTimeout>120</DeadlockDetectionTimeout>
+  <IgnoreGeneratedCode>True</IgnoreGeneratedCode>
+ </ProjectOptions>
+ <Targets>
+  <Target Name="$(ProjectDir)/Kafka.Client/bin/Integration/Kafka.Client.dll" Analyze="True" AnalyzeAllChildren="True" />
+ </Targets>
+ <Rules>
+  <RuleFiles>
+   <RuleFile Name="$(FxCopDir)\Rules\DesignRules.dll" Enabled="True" AllRulesEnabled="False">
+    <Rule Name="AbstractTypesShouldNotHaveConstructors" Enabled="True" />
+    <Rule Name="AvoidEmptyInterfaces" Enabled="True" />
+    <Rule Name="AvoidExcessiveParametersOnGenericTypes" Enabled="True" />
+    <Rule Name="AvoidNamespacesWithFewTypes" Enabled="True" />
+    <Rule Name="AvoidOutParameters" Enabled="True" />
+    <Rule Name="CollectionsShouldImplementGenericInterface" Enabled="True" />
+    <Rule Name="ConsiderPassingBaseTypesAsParameters" Enabled="True" />
+    <Rule Name="DeclareEventHandlersCorrectly" Enabled="True" />
+    <Rule Name="DeclareTypesInNamespaces" Enabled="True" />
+    <Rule Name="DefaultParametersShouldNotBeUsed" Enabled="True" />
+    <Rule Name="DefineAccessorsForAttributeArguments" Enabled="True" />
+    <Rule Name="DoNotCatchGeneralExceptionTypes" Enabled="True" />
+    <Rule Name="DoNotDeclareProtectedMembersInSealedTypes" Enabled="True" />
+    <Rule Name="DoNotDeclareStaticMembersOnGenericTypes" Enabled="True" />
+    <Rule Name="DoNotDeclareVirtualMembersInSealedTypes" Enabled="True" />
+    <Rule Name="DoNotDeclareVisibleInstanceFields" Enabled="True" />
+    <Rule Name="DoNotExposeGenericLists" Enabled="True" />
+    <Rule Name="DoNotHideBaseClassMethods" Enabled="True" />
+    <Rule Name="DoNotNestGenericTypesInMemberSignatures" Enabled="True" />
+    <Rule Name="DoNotOverloadOperatorEqualsOnReferenceTypes" Enabled="True" />
+    <Rule Name="DoNotPassTypesByReference" Enabled="True" />
+    <Rule Name="DoNotRaiseExceptionsInUnexpectedLocations" Enabled="True" />
+    <Rule Name="EnumeratorsShouldBeStronglyTyped" Enabled="True" />
+    <Rule Name="EnumsShouldHaveZeroValue" Enabled="True" />
+    <Rule Name="EnumStorageShouldBeInt32" Enabled="True" />
+    <Rule Name="ExceptionsShouldBePublic" Enabled="True" />
+    <Rule Name="GenericMethodsShouldProvideTypeParameter" Enabled="True" />
+    <Rule Name="ICollectionImplementationsHaveStronglyTypedMembers" Enabled="True" />
+    <Rule Name="ImplementIDisposableCorrectly" Enabled="True" />
+    <Rule Name="ImplementStandardExceptionConstructors" Enabled="True" />
+    <Rule Name="IndexersShouldNotBeMultidimensional" Enabled="True" />
+    <Rule Name="InterfaceMethodsShouldBeCallableByChildTypes" Enabled="True" />
+    <Rule Name="ListsAreStronglyTyped" Enabled="True" />
+    <Rule Name="MarkAssembliesWithAssemblyVersion" Enabled="True" />
+    <Rule Name="MarkAssembliesWithClsCompliant" Enabled="True" />
+    <Rule Name="MarkAssembliesWithComVisible" Enabled="True" />
+    <Rule Name="MarkAttributesWithAttributeUsage" Enabled="True" />
+    <Rule Name="MarkEnumsWithFlags" Enabled="True" />
+    <Rule Name="MembersShouldNotExposeCertainConcreteTypes" Enabled="True" />
+    <Rule Name="MovePInvokesToNativeMethodsClass" Enabled="True" />
+    <Rule Name="NestedTypesShouldNotBeVisible" Enabled="True" />
+    <Rule Name="OverloadOperatorEqualsOnOverloadingAddAndSubtract" Enabled="True" />
+    <Rule Name="OverrideMethodsOnComparableTypes" Enabled="True" />
+    <Rule Name="PropertiesShouldNotBeWriteOnly" Enabled="True" />
+    <Rule Name="ProvideObsoleteAttributeMessage" Enabled="True" />
+    <Rule Name="ReplaceRepetitiveArgumentsWithParamsArray" Enabled="True" />
+    <Rule Name="StaticHolderTypesShouldBeSealed" Enabled="True" />
+    <Rule Name="StaticHolderTypesShouldNotHaveConstructors" Enabled="True" />
+    <Rule Name="StringUriOverloadsCallSystemUriOverloads" Enabled="True" />
+    <Rule Name="TypesShouldNotExtendCertainBaseTypes" Enabled="True" />
+    <Rule Name="TypesThatOwnDisposableFieldsShouldBeDisposable" Enabled="True" />
+    <Rule Name="TypesThatOwnNativeResourcesShouldBeDisposable" Enabled="True" />
+    <Rule Name="UriParametersShouldNotBeStrings" Enabled="True" />
+    <Rule Name="UriPropertiesShouldNotBeStrings" Enabled="True" />
+    <Rule Name="UriReturnValuesShouldNotBeStrings" Enabled="True" />
+    <Rule Name="UseEventsWhereAppropriate" Enabled="True" />
+    <Rule Name="UseGenericEventHandlerInstances" Enabled="True" />
+    <Rule Name="UseGenericsWhereAppropriate" Enabled="True" />
+    <Rule Name="UseIntegralOrStringArgumentForIndexers" Enabled="True" />
+    <Rule Name="UsePropertiesWhereAppropriate" Enabled="True" />
+   </RuleFile>
+   <RuleFile Name="$(FxCopDir)\Rules\GlobalizationRules.dll" Enabled="True" AllRulesEnabled="True" />
+   <RuleFile Name="$(FxCopDir)\Rules\InteroperabilityRules.dll" Enabled="True" AllRulesEnabled="True" />
+   <RuleFile Name="$(FxCopDir)\Rules\MobilityRules.dll" Enabled="True" AllRulesEnabled="True" />
+   <RuleFile Name="$(FxCopDir)\Rules\NamingRules.dll" Enabled="True" AllRulesEnabled="True" />
+   <RuleFile Name="$(FxCopDir)\Rules\PerformanceRules.dll" Enabled="True" AllRulesEnabled="True" />
+   <RuleFile Name="$(FxCopDir)\Rules\PortabilityRules.dll" Enabled="True" AllRulesEnabled="True" />
+   <RuleFile Name="$(FxCopDir)\Rules\SecurityRules.dll" Enabled="True" AllRulesEnabled="True" />
+   <RuleFile Name="$(FxCopDir)\Rules\SecurityTransparencyRules.dll" Enabled="True" AllRulesEnabled="True" />
+   <RuleFile Name="$(FxCopDir)\Rules\UsageRules.dll" Enabled="True" AllRulesEnabled="True" />
+  </RuleFiles>
+  <Groups />
+  <Settings />
+ </Rules>
+ <FxCopReport Version="10.0" />
+</FxCopProject>

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.sln
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.sln?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.sln (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.sln Wed Sep 21 19:17:19 2011
@@ -1,38 +1,45 @@
-
-Microsoft Visual Studio Solution File, Format Version 11.00
-# Visual Studio 2010
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kafka.Client", "Kafka.Client\Kafka.Client.csproj", "{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}"
-EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kafka.Client.Tests", "Tests\Kafka.Client.Tests\Kafka.Client.Tests.csproj", "{9BA1A0BF-B207-4A11-8883-5F64B113C07D}"
-EndProject
-Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{06FD20F1-CE06-430E-AF6E-2EBECE6E47B3}"
-EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kafka.Client.IntegrationTests", "Tests\Kafka.Client.IntegrationTests\Kafka.Client.IntegrationTests.csproj", "{AF29C330-49BD-4648-B692-882E922C435B}"
-EndProject
-Global
-	GlobalSection(SolutionConfigurationPlatforms) = preSolution
-		Debug|Any CPU = Debug|Any CPU
-		Release|Any CPU = Release|Any CPU
-	EndGlobalSection
-	GlobalSection(ProjectConfigurationPlatforms) = postSolution
-		{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
-		{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Debug|Any CPU.Build.0 = Debug|Any CPU
-		{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Release|Any CPU.ActiveCfg = Release|Any CPU
-		{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Release|Any CPU.Build.0 = Release|Any CPU
-		{9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
-		{9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Debug|Any CPU.Build.0 = Debug|Any CPU
-		{9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Release|Any CPU.ActiveCfg = Release|Any CPU
-		{9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Release|Any CPU.Build.0 = Release|Any CPU
-		{AF29C330-49BD-4648-B692-882E922C435B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
-		{AF29C330-49BD-4648-B692-882E922C435B}.Debug|Any CPU.Build.0 = Debug|Any CPU
-		{AF29C330-49BD-4648-B692-882E922C435B}.Release|Any CPU.ActiveCfg = Release|Any CPU
-		{AF29C330-49BD-4648-B692-882E922C435B}.Release|Any CPU.Build.0 = Release|Any CPU
-	EndGlobalSection
-	GlobalSection(SolutionProperties) = preSolution
-		HideSolutionNode = FALSE
-	EndGlobalSection
-	GlobalSection(NestedProjects) = preSolution
-		{9BA1A0BF-B207-4A11-8883-5F64B113C07D} = {06FD20F1-CE06-430E-AF6E-2EBECE6E47B3}
-		{AF29C330-49BD-4648-B692-882E922C435B} = {06FD20F1-CE06-430E-AF6E-2EBECE6E47B3}
-	EndGlobalSection
-EndGlobal
+
+Microsoft Visual Studio Solution File, Format Version 11.00
+# Visual Studio 2010
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kafka.Client", "Kafka.Client\Kafka.Client.csproj", "{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kafka.Client.Tests", "Tests\Kafka.Client.Tests\Kafka.Client.Tests.csproj", "{9BA1A0BF-B207-4A11-8883-5F64B113C07D}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{06FD20F1-CE06-430E-AF6E-2EBECE6E47B3}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kafka.Client.IntegrationTests", "Tests\Kafka.Client.IntegrationTests\Kafka.Client.IntegrationTests.csproj", "{AF29C330-49BD-4648-B692-882E922C435B}"
+EndProject
+Global
+	GlobalSection(SolutionConfigurationPlatforms) = preSolution
+		Debug|Any CPU = Debug|Any CPU
+		Integration|Any CPU = Integration|Any CPU
+		Release|Any CPU = Release|Any CPU
+	EndGlobalSection
+	GlobalSection(ProjectConfigurationPlatforms) = postSolution
+		{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Debug|Any CPU.Build.0 = Debug|Any CPU
+		{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Integration|Any CPU.ActiveCfg = Integration|Any CPU
+		{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Integration|Any CPU.Build.0 = Integration|Any CPU
+		{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Release|Any CPU.Build.0 = Release|Any CPU
+		{9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+		{9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Integration|Any CPU.ActiveCfg = Integration|Any CPU
+		{9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Integration|Any CPU.Build.0 = Integration|Any CPU
+		{9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Release|Any CPU.Build.0 = Release|Any CPU
+		{AF29C330-49BD-4648-B692-882E922C435B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{AF29C330-49BD-4648-B692-882E922C435B}.Debug|Any CPU.Build.0 = Debug|Any CPU
+		{AF29C330-49BD-4648-B692-882E922C435B}.Integration|Any CPU.ActiveCfg = Integration|Any CPU
+		{AF29C330-49BD-4648-B692-882E922C435B}.Integration|Any CPU.Build.0 = Integration|Any CPU
+		{AF29C330-49BD-4648-B692-882E922C435B}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{AF29C330-49BD-4648-B692-882E922C435B}.Release|Any CPU.Build.0 = Release|Any CPU
+	EndGlobalSection
+	GlobalSection(SolutionProperties) = preSolution
+		HideSolutionNode = FALSE
+	EndGlobalSection
+	GlobalSection(NestedProjects) = preSolution
+		{9BA1A0BF-B207-4A11-8883-5F64B113C07D} = {06FD20F1-CE06-430E-AF6E-2EBECE6E47B3}
+		{AF29C330-49BD-4648-B692-882E922C435B} = {06FD20F1-CE06-430E-AF6E-2EBECE6E47B3}
+	EndGlobalSection
+EndGlobal

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/App.config
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/App.config?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/App.config (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/App.config Wed Sep 21 19:17:19 2011
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<configuration>
+    <configSections>
+        <section
+            name="kafkaClientConfiguration"
+            type="Kafka.Client.Cfg.KafkaClientConfiguration, Kafka.Client"
+            allowLocation="true"
+            allowDefinition="Everywhere"
+      />
+        <section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net, Version=1.2.10.0, Culture=neutral, PublicKeyToken=1b44e1d426115821" />
+    </configSections>
+    <log4net configSource="Log4Net.config" />
+  <kafkaClientConfiguration>
+    <!--<kafkaServer address="192.168.3.251" port="9092"></kafkaServer>-->
+    <kafkaServer address="192.168.1.39" port="9092"></kafkaServer>
+    <consumer numberOfTries="2" groupId="testGroup" timeout="10000" autoOffsetReset="smallest" autoCommit="true" autoCommitIntervalMs="1000" fetchSize="307200" backOffIncrementMs="2000"/>
+    <brokerPartitionInfos>
+      <add id="0" address="192.168.1.39" port="9092" />
+      <add id="1" address="192.168.1.39" port="9101" />
+      <add id="2" address="192.168.1.39" port="9102" />
+      <!--<add id="0" address="192.168.3.251" port="9092" />-->
+      <!--<add id="2" address="192.168.3.251" port="9092" />-->
+    </brokerPartitionInfos>
+    <zooKeeperServers addressList="192.168.1.39:2181" sessionTimeout="30000" connectionTimeout="3000"></zooKeeperServers>
+    <!--<zooKeeperServers addressList="192.168.3.251:2181" sessionTimeout="30000"></zooKeeperServers>-->
+  </kafkaClientConfiguration>
+</configuration>
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Debug/App.config
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Debug/App.config?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Debug/App.config (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Debug/App.config Wed Sep 21 19:17:19 2011
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<configuration>
+    <configSections>
+        <section
+            name="kafkaClientConfiguration"
+            type="Kafka.Client.Cfg.KafkaClientConfiguration, Kafka.Client"
+            allowLocation="true"
+            allowDefinition="Everywhere"
+      />
+        <section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net, Version=1.2.10.0, Culture=neutral, PublicKeyToken=1b44e1d426115821" />
+    </configSections>
+    <log4net configSource="Log4Net.config" />
+  <kafkaClientConfiguration>
+    <!--<kafkaServer address="192.168.3.251" port="9092"></kafkaServer>-->
+    <kafkaServer address="192.168.1.39" port="9092"></kafkaServer>
+    <consumer numberOfTries="2" groupId="testGroup" timeout="10000" autoOffsetReset="smallest" autoCommit="true" autoCommitIntervalMs="1000" fetchSize="307200" backOffIncrementMs="2000"/>
+    <brokerPartitionInfos>
+      <add id="0" address="192.168.1.39" port="9092" />
+      <add id="1" address="192.168.1.39" port="9101" />
+      <add id="2" address="192.168.1.39" port="9102" />
+      <!--<add id="0" address="192.168.3.251" port="9092" />-->
+      <!--<add id="2" address="192.168.3.251" port="9092" />-->
+    </brokerPartitionInfos>
+    <zooKeeperServers addressList="192.168.1.39:2181" sessionTimeout="30000" connectionTimeout="3000"></zooKeeperServers>
+    <!--<zooKeeperServers addressList="192.168.3.251:2181" sessionTimeout="30000"></zooKeeperServers>-->
+  </kafkaClientConfiguration>
+</configuration>
\ No newline at end of file