You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/21 21:17:25 UTC

svn commit: r1173797 [6/10] - in /incubator/kafka/trunk/clients/csharp: ./ lib/StyleCop/ src/Kafka/ src/Kafka/Kafka.Client/ src/Kafka/Kafka.Client/Cfg/ src/Kafka/Kafka.Client/Cluster/ src/Kafka/Kafka.Client/Consumers/ src/Kafka/Kafka.Client/Exceptions/...

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperSessionCreatedEventArgs.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperSessionCreatedEventArgs.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperSessionCreatedEventArgs.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperSessionCreatedEventArgs.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Events
+{
+    /// <summary>
+    /// Contains ZooKeeper session created event data
+    /// </summary>
+    internal class ZooKeeperSessionCreatedEventArgs : ZooKeeperEventArgs
+    {
+        public static new readonly ZooKeeperSessionCreatedEventArgs Empty = new ZooKeeperSessionCreatedEventArgs();
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ZooKeeperSessionCreatedEventArgs"/> class.
+        /// </summary>
+        protected ZooKeeperSessionCreatedEventArgs()
+            : base("New session created")
+        {
+        }
+
+        /// <summary>
+        /// Gets the event type.
+        /// </summary>
+        public override ZooKeeperEventTypes Type
+        {
+            get
+            {
+                return ZooKeeperEventTypes.SessionCreated;
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperStateChangedEventArgs.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperStateChangedEventArgs.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperStateChangedEventArgs.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperStateChangedEventArgs.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Events
+{
+    using ZooKeeperNet;
+
+    /// <summary>
+    /// Contains ZooKeeper session state changed event data
+    /// </summary>
+    internal class ZooKeeperStateChangedEventArgs : ZooKeeperEventArgs
+    {
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ZooKeeperStateChangedEventArgs"/> class.
+        /// </summary>
+        /// <param name="state">
+        /// The current ZooKeeper state.
+        /// </param>
+        public ZooKeeperStateChangedEventArgs(KeeperState state)
+            : base("State changed to " + state)
+        {
+            this.State = state;
+        }
+
+        /// <summary>
+        /// Gets current ZooKeeper state
+        /// </summary>
+        public KeeperState State { get; private set; }
+
+        /// <summary>
+        /// Gets the event type.
+        /// </summary>
+        public override ZooKeeperEventTypes Type
+        {
+            get
+            {
+                return ZooKeeperEventTypes.StateChanged;
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,468 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration
+{
+    using System;
+    using System.Collections.Generic;
+    using Kafka.Client.ZooKeeperIntegration.Listeners;
+    using Org.Apache.Zookeeper.Data;
+    using ZooKeeperNet;
+
+    /// <summary>
+    /// Abstracts the interaction with zookeeper
+    /// </summary>
+    internal interface IZooKeeperClient : IWatcher, IDisposable
+    {
+        /// <summary>
+        /// Gets time (in miliseconds) of event thread idleness
+        /// </summary>
+        /// <remarks>
+        /// Used for testing purpose
+        /// </remarks>
+        int IdleTime { get; }
+
+        /// <summary>
+        /// Connects to ZooKeeper server within given time period and installs watcher in ZooKeeper
+        /// </summary>
+        void Connect();
+
+        /// <summary>
+        /// Closes current connection to ZooKeeper
+        /// </summary>
+        void Disconnect();
+
+        /// <summary>
+        /// Re-connect to ZooKeeper server when session expired
+        /// </summary>
+        /// <param name="servers">
+        /// The servers.
+        /// </param>
+        /// <param name="connectionTimeout">
+        /// The connection timeout.
+        /// </param>
+        void Reconnect(string servers, int connectionTimeout);
+
+        /// <summary>
+        /// Waits untill ZooKeeper connection is established
+        /// </summary>
+        /// <param name="connectionTimeout">
+        /// The connection timeout.
+        /// </param>
+        /// <returns>
+        /// Status
+        /// </returns>
+        bool WaitUntilConnected(int connectionTimeout);
+
+        /// <summary>
+        /// Retries given delegate until connections is established
+        /// </summary>
+        /// <param name="callback">
+        /// The delegate to invoke.
+        /// </param>
+        /// <typeparam name="T">
+        /// Type of data returned by delegate 
+        /// </typeparam>
+        /// <returns>
+        /// data returned by delegate
+        /// </returns>
+        T RetryUntilConnected<T>(Func<T> callback);
+
+        /// <summary>
+        /// Subscribes listeners on ZooKeeper state changes events
+        /// </summary>
+        /// <param name="listener">
+        /// The listener.
+        /// </param>
+        void Subscribe(IZooKeeperStateListener listener);
+
+        /// <summary>
+        /// Un-subscribes listeners on ZooKeeper state changes events
+        /// </summary>
+        /// <param name="listener">
+        /// The listener.
+        /// </param>
+        void Unsubscribe(IZooKeeperStateListener listener);
+
+        /// <summary>
+        /// Subscribes listeners on ZooKeeper child changes under given path
+        /// </summary>
+        /// <param name="path">
+        /// The parent path.
+        /// </param>
+        /// <param name="listener">
+        /// The listener.
+        /// </param>
+        void Subscribe(string path, IZooKeeperChildListener listener);
+
+        /// <summary>
+        /// Un-subscribes listeners on ZooKeeper child changes under given path
+        /// </summary>
+        /// <param name="path">
+        /// The parent path.
+        /// </param>
+        /// <param name="listener">
+        /// The listener.
+        /// </param>
+        void Unsubscribe(string path, IZooKeeperChildListener listener);
+
+        /// <summary>
+        /// Subscribes listeners on ZooKeeper data changes under given path
+        /// </summary>
+        /// <param name="path">
+        /// The parent path.
+        /// </param>
+        /// <param name="listener">
+        /// The listener.
+        /// </param>
+        void Subscribe(string path, IZooKeeperDataListener listener);
+
+        /// <summary>
+        /// Un-subscribes listeners on ZooKeeper data changes under given path
+        /// </summary>
+        /// <param name="path">
+        /// The parent path.
+        /// </param>
+        /// <param name="listener">
+        /// The listener.
+        /// </param>
+        void Unsubscribe(string path, IZooKeeperDataListener listener);
+
+        /// <summary>
+        /// Un-subscribes all listeners
+        /// </summary>
+        void UnsubscribeAll();
+
+        /// <summary>
+        /// Installs a child watch for the given path. 
+        /// </summary>
+        /// <param name="path">
+        /// The parent path.
+        /// </param>
+        /// <returns>
+        /// the current children of the path or null if the znode with the given path doesn't exist
+        /// </returns>
+        IList<string> WatchForChilds(string path);
+
+        /// <summary>
+        /// Installs a data watch for the given path. 
+        /// </summary>
+        /// <param name="path">
+        /// The parent path.
+        /// </param>
+        void WatchForData(string path);
+
+        /// <summary>
+        /// Checks whether znode for a given path exists
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <returns>
+        /// Result of check
+        /// </returns>
+        bool Exists(string path);
+
+        /// <summary>
+        /// Checks whether znode for a given path exists.
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="watch">
+        /// Indicates whether should reinstall watcher in ZooKeeper.
+        /// </param>
+        /// <returns>
+        /// Result of check
+        /// </returns>
+        bool Exists(string path, bool watch);
+
+        /// <summary>
+        /// Gets all children for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <returns>
+        /// Children
+        /// </returns>
+        IList<string> GetChildren(string path);
+
+        /// <summary>
+        /// Gets all children for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="watch">
+        /// Indicates whether should reinstall watcher in ZooKeeper.
+        /// </param>
+        /// <returns>
+        /// Children
+        /// </returns>
+        IList<string> GetChildren(string path, bool watch);
+
+        /// <summary>
+        /// Counts number of children for a given path.
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <returns>
+        /// Number of children 
+        /// </returns>
+        int CountChildren(string path);
+
+        /// <summary>
+        /// Fetches data from a given path in ZooKeeper
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="stats">
+        /// The statistics.
+        /// </param>
+        /// <param name="watch">
+        /// Indicates whether should reinstall watcher in ZooKeeper.
+        /// </param>
+        /// <typeparam name="T">
+        /// Expected type of data
+        /// </typeparam>
+        /// <returns>
+        /// Data
+        /// </returns>
+        T ReadData<T>(string path, Stat stats, bool watch)
+            where T : class;
+
+        /// <summary>
+        /// Fetches data from a given path in ZooKeeper
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="stats">
+        /// The statistics.
+        /// </param>
+        /// <typeparam name="T">
+        /// Expected type of data
+        /// </typeparam>
+        /// <returns>
+        /// Data
+        /// </returns>
+        T ReadData<T>(string path, Stat stats)
+            where T : class;
+
+        /// <summary>
+        /// Fetches data from a given path in ZooKeeper
+        /// </summary>
+        /// <typeparam name="T">
+        /// Expected type of data
+        /// </typeparam>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <returns>
+        /// Data or null, if znode does not exist
+        /// </returns>
+        T ReadData<T>(string path)
+            where T : class;
+
+        /// <summary>
+        /// Fetches data for given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="returnNullIfPathNotExists">
+        /// Indicates, whether should return null or throw exception when 
+        /// znode doesn't exist
+        /// </param>
+        /// <typeparam name="T">
+        /// Expected type of data
+        /// </typeparam>
+        /// <returns>
+        /// Data
+        /// </returns>
+        T ReadData<T>(string path, bool returnNullIfPathNotExists)
+            where T : class;
+
+        /// <summary>
+        /// Writes data for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        void WriteData(string path, object data);
+
+        /// <summary>
+        /// Writes data for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        /// <param name="expectedVersion">
+        /// Expected version of data
+        /// </param>
+        void WriteData(string path, object data, int expectedVersion);
+
+        /// <summary>
+        /// Deletes znode for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <returns>
+        /// Status
+        /// </returns>
+        bool Delete(string path);
+
+        /// <summary>
+        /// Deletes znode and his children for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <returns>
+        /// Status
+        /// </returns>
+        bool DeleteRecursive(string path);
+
+        /// <summary>
+        /// Creates persistent znode and all intermediate znodes (if do not exist) for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        void MakeSurePersistentPathExists(string path);
+
+        /// <summary>
+        /// Fetches children for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The path.
+        /// </param>
+        /// <returns>
+        /// Children or null, if znode does not exist
+        /// </returns>
+        IList<string> GetChildrenParentMayNotExist(string path);
+
+        /// <summary>
+        /// Creates a persistent znode for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="createParents">
+        /// Indicates whether should create all intermediate znodes
+        /// </param>
+        /// <remarks>
+        /// Persistent znodes won't disappear after session close
+        /// Doesn't re-create missing intermediate znodes
+        /// </remarks>
+        void CreatePersistent(string path, bool createParents);
+
+        /// <summary>
+        /// Creates a persistent znode for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <remarks>
+        /// Persistent znodes won't disappear after session close
+        /// Doesn't re-create missing intermediate znodes
+        /// </remarks>
+        void CreatePersistent(string path);
+
+        /// <summary>
+        /// Creates a persistent znode for a given path and writes data into it
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        /// <remarks>
+        /// Persistent znodes won't disappear after session close
+        /// </remarks>
+        void CreatePersistent(string path, object data);
+
+        /// <summary>
+        /// Creates a sequential, persistent znode for a given path and writes data into it
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        /// <remarks>
+        /// Persistent znodes won't dissapear after session close
+        /// </remarks>
+        /// <returns>
+        /// The created znode's path
+        /// </returns>
+        string CreatePersistentSequential(string path, object data);
+
+        /// <summary>
+        /// Creates a ephemeral znode for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <remarks>
+        /// Ephemeral znodes will disappear after session close
+        /// </remarks>
+        void CreateEphemeral(string path);
+
+        /// <summary>
+        /// Creates a ephemeral znode for a given path and writes data into it
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        /// <remarks>
+        /// Ephemeral znodes will disappear after session close
+        /// </remarks>
+        void CreateEphemeral(string path, object data);
+
+        /// <summary>
+        /// Creates a ephemeral, sequential znode for a given path and writes data into it
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        /// <remarks>
+        /// Ephemeral znodes will disappear after session close
+        /// </remarks>
+        /// <returns>
+        /// Created znode's path
+        /// </returns>
+        string CreateEphemeralSequential(string path, object data);
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperConnection.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperConnection.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperConnection.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperConnection.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration
+{
+    using System;
+    using System.Collections.Generic;
+    using Org.Apache.Zookeeper.Data;
+    using ZooKeeperNet;
+
+    /// <summary>
+    /// Abstracts connection with ZooKeeper server
+    /// </summary>
+    internal interface IZooKeeperConnection : IDisposable
+    {
+        /// <summary>
+        /// Gets the ZooKeeper client state
+        /// </summary>
+        ZooKeeper.States ClientState { get; }
+
+        /// <summary>
+        /// Gets the list of ZooKeeper servers.
+        /// </summary>
+        string Servers { get; }
+
+        /// <summary>
+        /// Gets the ZooKeeper session timeout
+        /// </summary>
+        int SessionTimeout { get; }
+
+        /// <summary>
+        /// Gets ZooKeeper client.
+        /// </summary>
+        ZooKeeper Client { get; }
+
+        /// <summary>
+        /// Connects to ZooKeeper server
+        /// </summary>
+        /// <param name="watcher">
+        /// The watcher to be installed in ZooKeeper.
+        /// </param>
+        void Connect(IWatcher watcher);
+
+        /// <summary>
+        /// Creates znode using given create mode for given path and writes given data to it
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        /// <param name="mode">
+        /// The create mode.
+        /// </param>
+        /// <returns>
+        /// The created znode's path
+        /// </returns>
+        string Create(string path, byte[] data, CreateMode mode);
+
+        /// <summary>
+        /// Deletes znode for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        void Delete(string path);
+
+        /// <summary>
+        /// Checks whether znode for a given path exists.
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="watch">
+        /// Indicates whether should reinstall watcher in ZooKeeper.
+        /// </param>
+        /// <returns>
+        /// Result of check
+        /// </returns>
+        bool Exists(string path, bool watch);
+
+        /// <summary>
+        /// Gets all children for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="watch">
+        /// Indicates whether should reinstall watcher in ZooKeeper.
+        /// </param>
+        /// <returns>
+        /// Children
+        /// </returns>
+        IList<string> GetChildren(string path, bool watch);
+
+        /// <summary>
+        /// Fetches data from a given path in ZooKeeper
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="stats">
+        /// The statistics.
+        /// </param>
+        /// <param name="watch">
+        /// Indicates whether should reinstall watcher in ZooKeeper.
+        /// </param>
+        /// <returns>
+        /// Data
+        /// </returns>
+        byte[] ReadData(string path, Stat stats, bool watch);
+
+        /// <summary>
+        /// Writes data for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        void WriteData(string path, byte[] data);
+
+        /// <summary>
+        /// Writes data for a given path
+        /// </summary>
+        /// <param name="path">
+        /// The given path.
+        /// </param>
+        /// <param name="data">
+        /// The data to write.
+        /// </param>
+        /// <param name="version">
+        /// Expected version of data
+        /// </param>
+        void WriteData(string path, byte[] data, int version);
+
+        /// <summary>
+        /// Gets time when connetion was created
+        /// </summary>
+        /// <param name="path">
+        /// The path.
+        /// </param>
+        /// <returns>
+        /// Connection creation time
+        /// </returns>
+        long GetCreateTime(string path);
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperSerializer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperSerializer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperSerializer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperSerializer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration
+{
+    /// <summary>
+    /// Zookeeper is able to store data in form of byte arrays. This interfacte is a bridge between those byte-array format
+    /// and higher level objects.
+    /// </summary>
+    internal interface IZooKeeperSerializer
+    {
+        /// <summary>
+        /// Serializes data
+        /// </summary>
+        /// <param name="obj">
+        /// The data to serialize
+        /// </param>
+        /// <returns>
+        /// Serialized data
+        /// </returns>
+        byte[] Serialize(object obj);
+
+        /// <summary>
+        /// Deserializes data
+        /// </summary>
+        /// <param name="bytes">
+        /// The serialized data
+        /// </param>
+        /// <returns>
+        /// The deserialized data
+        /// </returns>
+        object Deserialize(byte[] bytes);
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Listeners
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Globalization;
+    using System.Linq;
+    using System.Reflection;
+    using Kafka.Client.Cluster;
+    using Kafka.Client.Utils;
+    using Kafka.Client.ZooKeeperIntegration.Events;
+    using log4net;
+
+    /// <summary>
+    /// Listens to new broker registrations under a particular topic, in zookeeper and
+    /// keeps the related data structures updated
+    /// </summary>
+    internal class BrokerTopicsListener : IZooKeeperChildListener
+    {
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+        private readonly IDictionary<int, Broker> actualBrokerIdMap;
+        private readonly Action<int, string, int> callback;
+        private readonly IDictionary<string, SortedSet<Partition>> actualBrokerTopicsPartitionsMap;
+        private IDictionary<int, Broker> oldBrokerIdMap;
+        private IDictionary<string, SortedSet<Partition>> oldBrokerTopicsPartitionsMap;
+        private readonly IZooKeeperClient zkclient;
+        private readonly object syncLock = new object();
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="BrokerTopicsListener"/> class.
+        /// </summary>
+        /// <param name="zkclient">The wrapper on ZooKeeper client.</param>
+        /// <param name="actualBrokerTopicsPartitionsMap">The actual broker topics partitions map.</param>
+        /// <param name="actualBrokerIdMap">The actual broker id map.</param>
+        /// <param name="callback">The callback invoked after new broker is added.</param>
+        public BrokerTopicsListener(
+            IZooKeeperClient zkclient,
+            IDictionary<string, SortedSet<Partition>> actualBrokerTopicsPartitionsMap, 
+            IDictionary<int, Broker> actualBrokerIdMap, 
+            Action<int, string, int> callback)
+        {
+            this.zkclient = zkclient;
+            this.actualBrokerTopicsPartitionsMap = actualBrokerTopicsPartitionsMap;
+            this.actualBrokerIdMap = actualBrokerIdMap;
+            this.callback = callback;
+            this.oldBrokerIdMap = new Dictionary<int, Broker>(this.actualBrokerIdMap);
+            this.oldBrokerTopicsPartitionsMap = new Dictionary<string, SortedSet<Partition>>(this.actualBrokerTopicsPartitionsMap);
+            Logger.Debug("Creating broker topics listener to watch the following paths - \n"
+                + "/broker/topics, /broker/topics/topic, /broker/ids");
+            Logger.Debug("Initialized this broker topics listener with initial mapping of broker id to "
+                + "partition id per topic with " + this.oldBrokerTopicsPartitionsMap.ToMultiString(
+                    x => x.Key + " --> " + x.Value.ToMultiString(y => y.ToString(), ","), "; "));
+        }
+
+        /// <summary>
+        /// Called when the children of the given path changed
+        /// </summary>
+        /// <param name="e">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperChildChangedEventArgs"/> instance containing the event data
+        /// as parent path and children (null if parent was deleted).
+        /// </param>
+        public void HandleChildChange(ZooKeeperChildChangedEventArgs e)
+        {
+            Guard.Assert<ArgumentNullException>(() => e != null);
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(e.Path));
+            Guard.Assert<ArgumentNullException>(() => e.Children != null);
+
+            lock (this.syncLock)
+            {
+                try
+                {
+                    string path = e.Path;
+                    IList<string> childs = e.Children;
+                    Logger.Debug("Watcher fired for path: " + path);
+                    switch (path)
+                    {
+                        case ZooKeeperClient.DefaultBrokerTopicsPath:
+                            List<string> oldTopics = this.oldBrokerTopicsPartitionsMap.Keys.ToList();
+                            List<string> newTopics = childs.Except(oldTopics).ToList();
+                            Logger.Debug("List of topics was changed at " + e.Path);
+                            Logger.Debug("Current topics -> " + e.Children.ToMultiString(","));
+                            Logger.Debug("Old list of topics -> " + oldTopics.ToMultiString(","));
+                            Logger.Debug("List of newly registered topics -> " + newTopics.ToMultiString(","));
+                            foreach (var newTopic in newTopics)
+                            {
+                                string brokerTopicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + newTopic;
+                                IList<string> brokerList = this.zkclient.GetChildrenParentMayNotExist(brokerTopicPath);
+                                this.ProcessNewBrokerInExistingTopic(newTopic, brokerList);
+                                this.zkclient.Subscribe(ZooKeeperClient.DefaultBrokerTopicsPath + "/" + newTopic, this);
+                            }
+
+                            break;
+                        case ZooKeeperClient.DefaultBrokerIdsPath:
+                            Logger.Debug("List of brokers changed in the Kafka cluster " + e.Path);
+                            Logger.Debug("Currently registered list of brokers -> " + e.Children.ToMultiString(","));
+                            this.ProcessBrokerChange(path, childs);
+                            break;
+                        default:
+                            string[] parts = path.Split('/');
+                            string topic = parts.Last();
+                            if (parts.Length == 4 && parts[2] == "topics" && childs != null)
+                            {
+                                Logger.Debug("List of brokers changed at " + path);
+                                Logger.Debug(
+                                    "Currently registered list of brokers for topic " + topic + " -> " +
+                                    childs.ToMultiString(","));
+                                this.ProcessNewBrokerInExistingTopic(topic, childs);
+                            }
+
+                            break;
+                    }
+
+                    this.oldBrokerTopicsPartitionsMap = this.actualBrokerTopicsPartitionsMap;
+                    this.oldBrokerIdMap = this.actualBrokerIdMap;
+                }
+                catch (Exception exc)
+                {
+                    Logger.Debug("Error while handling " + e, exc);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Resets the state of listener.
+        /// </summary>
+        public void ResetState()
+        {
+            Logger.Debug("Before reseting broker topic partitions state -> " 
+                + this.oldBrokerTopicsPartitionsMap.ToMultiString(
+                x => x.Key + " --> " + x.Value.ToMultiString(y => y.ToString(), ","), "; "));
+            this.oldBrokerTopicsPartitionsMap = actualBrokerTopicsPartitionsMap;
+            Logger.Debug("After reseting broker topic partitions state -> "
+                + this.oldBrokerTopicsPartitionsMap.ToMultiString(
+                x => x.Key + " --> " + x.Value.ToMultiString(y => y.ToString(), ","), "; "));
+            Logger.Debug("Before reseting broker id map state -> "
+                + this.oldBrokerIdMap.ToMultiString(", "));
+            this.oldBrokerIdMap = this.actualBrokerIdMap;
+            Logger.Debug("After reseting broker id map state -> "
+                + this.oldBrokerIdMap.ToMultiString(", "));
+        }
+
+        /// <summary>
+        /// Generate the updated mapping of (brokerId, numPartitions) for the new list of brokers
+        /// registered under some topic.
+        /// </summary>
+        /// <param name="topic">The path of the topic under which the brokers have changed..</param>
+        /// <param name="childs">The list of changed brokers.</param>
+        private void ProcessNewBrokerInExistingTopic(string topic, IEnumerable<string> childs)
+        {
+            if (this.actualBrokerTopicsPartitionsMap.ContainsKey(topic))
+            {
+                Logger.Debug("Old list of brokers -> " + this.oldBrokerTopicsPartitionsMap[topic].ToMultiString(x => x.BrokerId.ToString(), ","));
+            }
+
+            var updatedBrokers = new SortedSet<int>(childs.Select(x => int.Parse(x, CultureInfo.InvariantCulture)));
+            string brokerTopicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic;
+            var sortedBrokerPartitions = new SortedDictionary<int, int>();
+            foreach (var bid in updatedBrokers)
+            {
+                var num = this.zkclient.ReadData<string>(brokerTopicPath + "/" + bid);
+                sortedBrokerPartitions.Add(bid, int.Parse(num, CultureInfo.InvariantCulture));
+            }
+
+            var updatedBrokerParts = new SortedSet<Partition>();
+            foreach (var bp in sortedBrokerPartitions)
+            {
+                for (int i = 0; i < bp.Value; i++)
+                {
+                    var bidPid = new Partition(bp.Key, i);
+                    updatedBrokerParts.Add(bidPid);
+                }
+            }
+
+            Logger.Debug("Currently registered list of brokers for topic " + topic + " -> " + childs.ToMultiString(", "));
+            SortedSet<Partition> mergedBrokerParts = updatedBrokerParts;
+            if (this.actualBrokerTopicsPartitionsMap.ContainsKey(topic))
+            {
+                SortedSet<Partition> oldBrokerParts = this.actualBrokerTopicsPartitionsMap[topic];
+                Logger.Debug(
+                    "Unregistered list of brokers for topic " + topic + " -> " + oldBrokerParts.ToMultiString(", "));
+                foreach (var oldBrokerPart in oldBrokerParts)
+                {
+                    mergedBrokerParts.Add(oldBrokerPart);
+                }
+            }
+            else
+            {
+                this.actualBrokerTopicsPartitionsMap.Add(topic, null);
+            }
+
+            this.actualBrokerTopicsPartitionsMap[topic] = new SortedSet<Partition>(mergedBrokerParts.Where(x => this.actualBrokerIdMap.ContainsKey(x.BrokerId)));
+        }
+
+        /// <summary>
+        /// Processes change in the broker lists.
+        /// </summary>
+        /// <param name="path">The parent path of brokers list.</param>
+        /// <param name="childs">The current brokers.</param>
+        private void ProcessBrokerChange(string path, IEnumerable<string> childs)
+        {
+            if (path != ZooKeeperClient.DefaultBrokerIdsPath)
+            {
+                return;
+            }
+
+            List<int> updatedBrokers = childs.Select(x => int.Parse(x, CultureInfo.InvariantCulture)).ToList();
+            List<int> oldBrokers = this.oldBrokerIdMap.Select(x => x.Key).ToList();
+            List<int> newBrokers = updatedBrokers.Except(oldBrokers).ToList();
+            Logger.Debug("List of newly registered brokers -> " + newBrokers.ToMultiString(","));
+            foreach (int bid in newBrokers)
+            {
+                string brokerInfo = this.zkclient.ReadData<string>(ZooKeeperClient.DefaultBrokerIdsPath + "/" + bid);
+                string[] brokerHost = brokerInfo.Split(':');
+                var port = int.Parse(brokerHost[2], CultureInfo.InvariantCulture); 
+                this.actualBrokerIdMap.Add(bid, new Broker(bid, brokerHost[1], brokerHost[1], port));
+                if (this.callback != null)
+                {
+                    Logger.Debug("Invoking the callback for broker: " + bid);
+                    this.callback(bid, brokerHost[1], port);
+                }
+            }
+
+            List<int> deadBrokers = oldBrokers.Except(updatedBrokers).ToList();
+            Logger.Debug("Deleting broker ids for dead brokers -> " + deadBrokers.ToMultiString(","));
+            foreach (int bid in deadBrokers)
+            {
+                Logger.Debug("Deleting dead broker: " + bid);
+                this.actualBrokerIdMap.Remove(bid);
+                foreach (var topicMap in this.actualBrokerTopicsPartitionsMap)
+                {
+                    int affected = topicMap.Value.RemoveWhere(x => x.BrokerId == bid);
+                    if (affected > 0)
+                    {
+                        Logger.Debug("Removing dead broker " + bid + " for topic: " + topicMap.Key);
+                        Logger.Debug("Actual list of mapped brokers is -> " + topicMap.Value.ToMultiString(x => x.ToString(), ","));
+                    }
+                }
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperChildListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperChildListener.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperChildListener.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperChildListener.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Listeners
+{
+    using Kafka.Client.ZooKeeperIntegration.Events;
+
+    /// <summary>
+    /// Listener that can be registered for listening on ZooKeeper znode changes for a given path
+    /// </summary>
+    internal interface IZooKeeperChildListener
+    {
+        /// <summary>
+        /// Called when the children of the given path changed
+        /// </summary>
+        /// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperChildChangedEventArgs"/> instance containing the event data
+        /// as parent path and children (null if parent was deleted).
+        /// </param>
+        /// <remarks> 
+        /// http://zookeeper.wiki.sourceforge.net/ZooKeeperWatches
+        /// </remarks>
+        void HandleChildChange(ZooKeeperChildChangedEventArgs args);
+
+        /// <summary>
+        /// Resets the state of listener.
+        /// </summary>
+        void ResetState();
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperDataListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperDataListener.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperDataListener.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperDataListener.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Listeners
+{
+    using Kafka.Client.ZooKeeperIntegration.Events;
+
+    /// <summary>
+    /// Listener that can be registered for listening on ZooKeeper znode data changes for a given path
+    /// </summary>
+    internal interface IZooKeeperDataListener
+    {
+        /// <summary>
+        /// Called when the data of the given path changed
+        /// </summary>
+        /// <param name="args">The <see cref="ZooKeeperDataChangedEventArgs"/> instance containing the event data
+        /// as path and data.
+        /// </param>
+        /// <remarks> 
+        /// http://zookeeper.wiki.sourceforge.net/ZooKeeperWatches
+        /// </remarks>
+        void HandleDataChange(ZooKeeperDataChangedEventArgs args);
+
+        /// <summary>
+        /// Called when the data of the given path was deleted
+        /// </summary>
+        /// <param name="args">The <see cref="ZooKeeperDataChangedEventArgs"/> instance containing the event data
+        /// as path.
+        /// </param>
+        /// <remarks> 
+        /// http://zookeeper.wiki.sourceforge.net/ZooKeeperWatches
+        /// </remarks>
+        void HandleDataDelete(ZooKeeperDataChangedEventArgs args);
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperStateListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperStateListener.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperStateListener.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperStateListener.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Listeners
+{
+    using Kafka.Client.ZooKeeperIntegration.Events;
+
+    /// <summary>
+    /// Handles the session expiration event in ZooKeeper
+    /// </summary>
+    internal interface IZooKeeperStateListener
+    {
+        /// <summary>
+        /// Called when the ZooKeeper connection state has changed.
+        /// </summary>
+        /// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperStateChangedEventArgs"/> instance containing the event data.</param>
+        void HandleStateChanged(ZooKeeperStateChangedEventArgs args);
+
+        /// <summary>
+        /// Called after the ZooKeeper session has expired and a new session has been created.
+        /// </summary>
+        /// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperSessionCreatedEventArgs"/> instance containing the event data.</param>
+        /// <remarks>
+        /// You would have to re-create any ephemeral nodes here.
+        /// </remarks>
+        void HandleSessionCreated(ZooKeeperSessionCreatedEventArgs args);
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,367 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Listeners
+{
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Globalization;
+    using System.Linq;
+    using System.Reflection;
+    using System.Threading;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Cluster;
+    using Kafka.Client.Consumers;
+    using Kafka.Client.Exceptions;
+    using Kafka.Client.Utils;
+    using Kafka.Client.ZooKeeperIntegration.Events;
+    using log4net;
+    using ZooKeeperNet;
+
+    internal class ZKRebalancerListener : IZooKeeperChildListener
+    {
+        private IDictionary<string, IList<string>> oldPartitionsPerTopicMap = new Dictionary<string, IList<string>>();
+
+        private IDictionary<string, IList<string>> oldConsumersPerTopicMap = new Dictionary<string, IList<string>>();
+
+        private readonly IDictionary<string, IDictionary<Partition, PartitionTopicInfo>> topicRegistry;
+
+        private readonly IDictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>> queues;
+
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+        private readonly string consumerIdString;
+
+        private readonly object syncLock;
+
+        private readonly ConsumerConfig config;
+
+        private readonly IZooKeeperClient zkClient;
+
+        private readonly ZKGroupDirs dirs;
+
+        private readonly Fetcher fetcher;
+
+        private readonly ZookeeperConsumerConnector zkConsumerConnector;
+
+        internal ZKRebalancerListener(
+            ConsumerConfig config,
+            string consumerIdString,
+            IDictionary<string, IDictionary<Partition, PartitionTopicInfo>> topicRegistry,
+            IZooKeeperClient zkClient,
+            ZookeeperConsumerConnector zkConsumerConnector,
+            IDictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>> queues,
+            Fetcher fetcher,
+            object syncLock)
+        {
+            this.syncLock = syncLock;
+            this.consumerIdString = consumerIdString;
+            this.config = config;
+            this.topicRegistry = topicRegistry;
+            this.zkClient = zkClient;
+            this.dirs = new ZKGroupDirs(config.GroupId);
+            this.zkConsumerConnector = zkConsumerConnector;
+            this.queues = queues;
+            this.fetcher = fetcher;
+        }
+
+        public void SyncedRebalance()
+        {
+            lock (this.syncLock)
+            {
+                for (int i = 0; i < ZookeeperConsumerConnector.MaxNRetries; i++)
+                {
+                    Logger.InfoFormat(CultureInfo.CurrentCulture, "begin rebalancing consumer {0} try #{1}", consumerIdString, i);
+                    bool done = false;
+                    try
+                    {
+                        done = this.Rebalance();
+                    }
+                    catch (Exception ex)
+                    {
+                        Logger.InfoFormat(CultureInfo.CurrentCulture, "exception during rebalance {0}", ex);
+                    }
+
+                    Logger.InfoFormat(CultureInfo.CurrentCulture, "end rebalancing consumer {0} try #{1}", consumerIdString, i);
+                    if (done)
+                    {
+                        return;
+                    }
+
+                    //// release all partitions, reset state and retry
+                    this.ReleasePartitionOwnership();
+                    this.ResetState();
+                    Thread.Sleep(config.ZkSyncTimeMs);
+                }
+            }
+
+            throw new ZKRebalancerException(string.Format(CultureInfo.CurrentCulture, "{0} can't rebalance after {1} retries", this.consumerIdString, ZookeeperConsumerConnector.MaxNRetries));
+        }
+
+        /// <summary>
+        /// Called when the children of the given path changed
+        /// </summary>
+        /// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperChildChangedEventArgs"/> instance containing the event data
+        /// as parent path and children (null if parent was deleted).
+        /// </param>
+        /// <remarks> 
+        /// http://zookeeper.wiki.sourceforge.net/ZooKeeperWatches
+        /// </remarks>
+        public void HandleChildChange(ZooKeeperChildChangedEventArgs args)
+        {
+            Guard.Assert<ArgumentNullException>(() => args != null);
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(args.Path));
+            Guard.Assert<ArgumentNullException>(() => args.Children != null);
+
+            SyncedRebalance();
+        }
+
+        /// <summary>
+        /// Resets the state of listener.
+        /// </summary>
+        public void ResetState()
+        {
+            this.topicRegistry.Clear();
+            this.oldConsumersPerTopicMap.Clear();
+            this.oldPartitionsPerTopicMap.Clear();
+        }
+
+        private bool Rebalance()
+        {
+            var myTopicThresdIdsMap = this.GetTopicCount(this.consumerIdString).GetConsumerThreadIdsPerTopic();
+            var cluster = new Cluster(zkClient);
+            var consumersPerTopicMap = this.GetConsumersPerTopic(this.config.GroupId);
+            var partitionsPerTopicMap = ZkUtils.GetPartitionsForTopics(this.zkClient, myTopicThresdIdsMap.Keys);
+            var relevantTopicThreadIdsMap = GetRelevantTopicMap(
+                myTopicThresdIdsMap,
+                partitionsPerTopicMap,
+                this.oldPartitionsPerTopicMap,
+                consumersPerTopicMap,
+                this.oldConsumersPerTopicMap);
+            if (relevantTopicThreadIdsMap.Count <= 0)
+            {
+                Logger.InfoFormat(CultureInfo.CurrentCulture, "Consumer {0} with {1} doesn't need to rebalance.", this.consumerIdString, consumersPerTopicMap);
+                return true;
+            }
+
+            Logger.Info("Committing all offsets");
+            this.zkConsumerConnector.CommitOffsets();
+
+            Logger.Info("Releasing parittion ownership");
+            this.ReleasePartitionOwnership();
+
+            var queuesToBeCleared = new List<BlockingCollection<FetchedDataChunk>>();
+            foreach (var item in relevantTopicThreadIdsMap)
+            {
+                this.topicRegistry.Remove(item.Key);
+                this.topicRegistry.Add(item.Key, new Dictionary<Partition, PartitionTopicInfo>());
+
+                var topicDirs = new ZKGroupTopicDirs(config.GroupId, item.Key);
+                var curConsumers = consumersPerTopicMap[item.Key];
+                var curPartitions = new List<string>(partitionsPerTopicMap[item.Key]);
+
+                var numberOfPartsPerConsumer = curPartitions.Count / curConsumers.Count;
+                var numberOfConsumersWithExtraPart = curPartitions.Count % curConsumers.Count;
+
+                Logger.InfoFormat(
+                    CultureInfo.CurrentCulture,
+                    "Consumer {0} rebalancing the following partitions: {1} for topic {2} with consumers: {3}",
+                    this.consumerIdString,
+                    string.Join(",", curPartitions),
+                    item.Key,
+                    string.Join(",", curConsumers));
+
+                foreach (string consumerThreadId in item.Value)
+                {
+                    var myConsumerPosition = curConsumers.IndexOf(consumerThreadId);
+                    if (myConsumerPosition < 0)
+                    {
+                        continue;
+                    }
+
+                    var startPart = (numberOfPartsPerConsumer * myConsumerPosition) +
+                                    Math.Min(myConsumerPosition, numberOfConsumersWithExtraPart);
+                    var numberOfParts = numberOfPartsPerConsumer + (myConsumerPosition + 1 > numberOfConsumersWithExtraPart ? 0 : 1);
+
+                    if (numberOfParts <= 0)
+                    {
+                        Logger.WarnFormat(CultureInfo.CurrentCulture, "No broker partitions consumed by consumer thread {0} for topic {1}", consumerThreadId, item.Key);
+                    }
+                    else
+                    {
+                        for (int i = startPart; i < startPart + numberOfParts; i++)
+                        {
+                            var partition = curPartitions[i];
+                            Logger.InfoFormat(CultureInfo.CurrentCulture, "{0} attempting to claim partition {1}", consumerThreadId, partition);
+                            if (!this.ProcessPartition(topicDirs, partition, item.Key, consumerThreadId))
+                            {
+                                return false;
+                            }
+                        }
+
+                        queuesToBeCleared.Add(queues[new Tuple<string, string>(item.Key, consumerThreadId)]);
+                    }
+                }
+            }
+
+            this.UpdateFetcher(cluster, queuesToBeCleared);
+            this.oldPartitionsPerTopicMap = partitionsPerTopicMap;
+            this.oldConsumersPerTopicMap = consumersPerTopicMap;
+            return true;
+        }
+
+        private void UpdateFetcher(Cluster cluster, IEnumerable<BlockingCollection<FetchedDataChunk>> queuesToBeCleared)
+        {
+            var allPartitionInfos = new List<PartitionTopicInfo>();
+            foreach (var item in this.topicRegistry.Values)
+            {
+                foreach (var partitionTopicInfo in item.Values)
+                {
+                    allPartitionInfos.Add(partitionTopicInfo);
+                }
+            }
+
+            Logger.InfoFormat(
+                CultureInfo.CurrentCulture,
+                "Consumer {0} selected partitions: {1}",
+                this.consumerIdString,
+                string.Join(",", allPartitionInfos.OrderBy(x => x.Partition.Name).Select(y => y.Partition.Name)));
+            if (this.fetcher != null)
+            {
+                this.fetcher.InitConnections(allPartitionInfos, cluster, queuesToBeCleared);
+            }
+        }
+
+        private bool ProcessPartition(ZKGroupTopicDirs topicDirs, string partition, string topic, string consumerThreadId)
+        {
+            var partitionOwnerPath = topicDirs.ConsumerOwnerDir + "/" + partition;
+            try
+            {
+                ZkUtils.CreateEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId);
+            }
+            catch (KeeperException.NodeExistsException)
+            {
+                //// The node hasn't been deleted by the original owner. So wait a bit and retry.
+                Logger.InfoFormat(CultureInfo.CurrentCulture, "waiting for the partition ownership to be deleted: {0}", partition);
+                return false;
+            }
+
+            AddPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId);
+            return true;
+        }
+
+        private void AddPartitionTopicInfo(ZKGroupTopicDirs topicDirs, string partitionString, string topic, string consumerThreadId)
+        {
+            var partition = Partition.ParseFrom(partitionString);
+            var partTopicInfoMap = this.topicRegistry[topic];
+            var znode = topicDirs.ConsumerOffsetDir + "/" + partition.Name;
+            var offsetString = this.zkClient.ReadData<string>(znode, true);
+            long offset = string.IsNullOrEmpty(offsetString) ? 0 : long.Parse(offsetString, CultureInfo.InvariantCulture);
+            var queue = this.queues[new Tuple<string, string>(topic, consumerThreadId)];
+            var partTopicInfo = new PartitionTopicInfo(
+                topic,
+                partition.BrokerId,
+                partition,
+                queue,
+                offset,
+                offset,
+                this.config.FetchSize);
+            partTopicInfoMap.Add(partition, partTopicInfo);
+            if (Logger.IsDebugEnabled)
+            {
+                Logger.DebugFormat(CultureInfo.CurrentCulture, "{0} selected new offset {1}", partTopicInfo, offset);
+            }
+        }
+
+        private void ReleasePartitionOwnership()
+        {
+            foreach (KeyValuePair<string, IDictionary<Partition, PartitionTopicInfo>> item in topicRegistry)
+            {
+                var topicDirs = new ZKGroupTopicDirs(this.config.GroupId, item.Key);
+                foreach (var partition in item.Value.Keys)
+                {
+                    string znode = topicDirs.ConsumerOwnerDir + "/" + partition.Name;
+                    ZkUtils.DeletePath(zkClient, znode);
+                    if (Logger.IsDebugEnabled)
+                    {
+                        Logger.DebugFormat(CultureInfo.CurrentCulture, "Consumer {0} releasing {1}", this.consumerIdString, znode);
+                    }
+                }
+            }
+        }
+
+        private TopicCount GetTopicCount(string consumerId)
+        {
+            var topicCountJson = this.zkClient.ReadData<string>(this.dirs.ConsumerRegistryDir + "/" + consumerId);
+            return TopicCount.ConstructTopicCount(consumerId, topicCountJson);
+        }
+
+        private IDictionary<string, IList<string>> GetConsumersPerTopic(string group)
+        {
+            var consumers = this.zkClient.GetChildrenParentMayNotExist(this.dirs.ConsumerRegistryDir);
+            var consumersPerTopicMap = new Dictionary<string, IList<string>>();
+            foreach (var consumer in consumers)
+            {
+                TopicCount topicCount = GetTopicCount(consumer);
+                foreach (KeyValuePair<string, IList<string>> consumerThread in topicCount.GetConsumerThreadIdsPerTopic())
+                {
+                    foreach (string consumerThreadId in consumerThread.Value)
+                    {
+                        if (!consumersPerTopicMap.ContainsKey(consumerThread.Key))
+                        {
+                            consumersPerTopicMap.Add(consumerThread.Key, new List<string> { consumerThreadId });
+                        }
+                        else
+                        {
+                            consumersPerTopicMap[consumerThread.Key].Add(consumerThreadId);
+                        }
+                    }
+                }
+            }
+
+            foreach (KeyValuePair<string, IList<string>> item in consumersPerTopicMap)
+            {
+                item.Value.ToList().Sort();
+            }
+
+            return consumersPerTopicMap;
+        }
+
+        private static IDictionary<string, IList<string>> GetRelevantTopicMap(
+            IDictionary<string, IList<string>> myTopicThreadIdsMap,
+            IDictionary<string, IList<string>> newPartMap,
+            IDictionary<string, IList<string>> oldPartMap,
+            IDictionary<string, IList<string>> newConsumerMap,
+            IDictionary<string, IList<string>> oldConsumerMap)
+        {
+            var relevantTopicThreadIdsMap = new Dictionary<string, IList<string>>();
+            foreach (var myMap in myTopicThreadIdsMap)
+            {
+                var oldPartValue = oldPartMap.ContainsKey(myMap.Key) ? oldPartMap[myMap.Key] : null;
+                var newPartValue = newPartMap.ContainsKey(myMap.Key) ? newPartMap[myMap.Key] : null;
+                var oldConsumerValue = oldConsumerMap.ContainsKey(myMap.Key) ? oldConsumerMap[myMap.Key] : null;
+                var newConsumerValue = newConsumerMap.ContainsKey(myMap.Key) ? newConsumerMap[myMap.Key] : null;
+                if (oldPartValue != newPartValue || oldConsumerValue != newConsumerValue)
+                {
+                    relevantTopicThreadIdsMap.Add(myMap.Key, myMap.Value);
+                }
+            }
+
+            return relevantTopicThreadIdsMap;
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Listeners
+{
+    using System;
+    using System.Globalization;
+    using System.Reflection;
+    using Kafka.Client.Consumers;
+    using Kafka.Client.Utils;
+    using Kafka.Client.ZooKeeperIntegration.Events;
+    using log4net;
+    using ZooKeeperNet;
+
+    internal class ZKSessionExpireListener : IZooKeeperStateListener
+    {
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+        private readonly string consumerIdString;
+
+        private readonly ZKRebalancerListener loadBalancerListener;
+
+        private readonly ZookeeperConsumerConnector zkConsumerConnector;
+
+        private readonly ZKGroupDirs dirs;
+
+        private readonly TopicCount topicCount;
+
+        public ZKSessionExpireListener(ZKGroupDirs dirs, string consumerIdString, TopicCount topicCount, ZKRebalancerListener loadBalancerListener, ZookeeperConsumerConnector zkConsumerConnector)
+        {
+            this.consumerIdString = consumerIdString;
+            this.loadBalancerListener = loadBalancerListener;
+            this.zkConsumerConnector = zkConsumerConnector;
+            this.dirs = dirs;
+            this.topicCount = topicCount;
+        }
+
+        /// <summary>
+        /// Called when the ZooKeeper connection state has changed.
+        /// </summary>
+        /// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperStateChangedEventArgs"/> instance containing the event data.</param>
+        /// <remarks>
+        /// Do nothing, since zkclient will do reconnect for us.
+        /// </remarks>
+        public void HandleStateChanged(ZooKeeperStateChangedEventArgs args)
+        {
+            Guard.Assert<ArgumentNullException>(() => args != null);
+            Guard.Assert<ArgumentException>(() => args.State != KeeperState.Unknown);
+        }
+
+        /// <summary>
+        /// Called after the ZooKeeper session has expired and a new session has been created.
+        /// </summary>
+        /// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperSessionCreatedEventArgs"/> instance containing the event data.</param>
+        /// <remarks>
+        /// You would have to re-create any ephemeral nodes here.
+        /// Explicitly trigger load balancing for this consumer.
+        /// </remarks>
+        public void HandleSessionCreated(ZooKeeperSessionCreatedEventArgs args)
+        {
+            Guard.Assert<ArgumentNullException>(() => args != null);
+
+            Logger.InfoFormat(
+                CultureInfo.CurrentCulture,
+                "ZK expired; release old broker partition ownership; re-register consumer {0}",
+                this.consumerIdString);
+            this.loadBalancerListener.ResetState();
+            this.zkConsumerConnector.RegisterConsumerInZk(this.dirs, this.consumerIdString, this.topicCount);
+            this.loadBalancerListener.SyncedRebalance();
+        }
+    }
+}