You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2011/10/30 22:17:14 UTC
svn commit: r1195249 - in /incubator/kafka/trunk/clients/clojure: ./
leiningen/ resources/ src/kafka/ test/kafka/
Author: nehanarkhede
Date: Sun Oct 30 21:17:13 2011
New Revision: 1195249
URL: http://svn.apache.org/viewvc?rev=1195249&view=rev
Log:
KAFKA-177 Remove the clojure clients until correctly implemented and refactored; patched by nehanarkhede; reviewd by jefferydamick
Modified:
incubator/kafka/trunk/clients/clojure/.gitignore
incubator/kafka/trunk/clients/clojure/LICENSE
incubator/kafka/trunk/clients/clojure/README.md
incubator/kafka/trunk/clients/clojure/leiningen/run_example.clj
incubator/kafka/trunk/clients/clojure/project.clj
incubator/kafka/trunk/clients/clojure/resources/log4j.properties
incubator/kafka/trunk/clients/clojure/src/kafka/buffer.clj
incubator/kafka/trunk/clients/clojure/src/kafka/example.clj
incubator/kafka/trunk/clients/clojure/src/kafka/kafka.clj
incubator/kafka/trunk/clients/clojure/src/kafka/print.clj
incubator/kafka/trunk/clients/clojure/src/kafka/serializable.clj
incubator/kafka/trunk/clients/clojure/src/kafka/types.clj
incubator/kafka/trunk/clients/clojure/test/kafka/buffer_test.clj
incubator/kafka/trunk/clients/clojure/test/kafka/print_test.clj
incubator/kafka/trunk/clients/clojure/test/kafka/serializable_test.clj
Modified: incubator/kafka/trunk/clients/clojure/.gitignore
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/clojure/.gitignore?rev=1195249&r1=1195248&r2=1195249&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/clojure/.gitignore (original)
+++ incubator/kafka/trunk/clients/clojure/.gitignore Sun Oct 30 21:17:13 2011
@@ -1,2 +0,0 @@
-lib
-classes
Modified: incubator/kafka/trunk/clients/clojure/LICENSE
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/clojure/LICENSE?rev=1195249&r1=1195248&r2=1195249&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/clojure/LICENSE (original)
+++ incubator/kafka/trunk/clients/clojure/LICENSE Sun Oct 30 21:17:13 2011
@@ -1,202 +0,0 @@
-
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
-TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
-1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
-2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
-3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
-4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
-5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
-6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
-7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
-8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
-9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
-END OF TERMS AND CONDITIONS
-
-APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
-Copyright [yyyy] [name of copyright owner]
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
Modified: incubator/kafka/trunk/clients/clojure/README.md
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/clojure/README.md?rev=1195249&r1=1195248&r2=1195249&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/clojure/README.md (original)
+++ incubator/kafka/trunk/clients/clojure/README.md Sun Oct 30 21:17:13 2011
@@ -1,50 +0,0 @@
-# kafka-clj
-kafka-clj provides a producer and consumer that supports a basic fetch API as well as a managed sequence interface. Multifetch is not supported yet.
-
-## Quick Start
-
-Download and start [Kafka](http://sna-projects.com/kafka/quickstart.php).
-
-Pull dependencies with [Leiningen](https://github.com/technomancy/leiningen):
-
- $ lein deps
-
-And run the example:
-
- $ lein run-example
-
-## Usage
-
-### Sending messages
-
- (with-open [p (producer "localhost" 9092)]
- (produce p "test" 0 "Message 1")
- (produce p "test" 0 ["Message 2" "Message 3"]))
-
-### Simple consumer
-
- (with-open [c (consumer "localhost" 9092)]
- (let [offs (offsets c "test" 0 -1 10)]
- (consume c "test" 0 (last offs) 1000000)))
-
-### Consumer sequence
-
- (with-open [c (consumer "localhost" 9092)]
- (doseq [m (consume-seq c "test" 0 {:blocking true})]
- (println m)))
-
-Following options are supported:
-
-* :blocking _boolean_ default false, sequence returns nil the first time fetch does not return new messages. If set to true, the sequence tries to fetch new messages :repeat-count times every :repeat-timeout milliseconds.
-* :repeat-count _int_ number of attempts to fetch new messages before terminating, default 10.
-* :repeat-timeout _int_ wait time in milliseconds between fetch attempts, default 1000.
-* :offset _long_ initialized to highest offset if not provided.
-* :max-size _int_ max result message size, default 1000000.
-
-### Serialization
-
-Load namespace _kafka.print_ for basic print_dup/read-string serialization or _kafka.serializeable_ for Java object serialization. For custom serialization implement Pack and Unpack protocols.
-
-
-Questions? Email adam.smyczek \_at\_ gmail.com.
-
Modified: incubator/kafka/trunk/clients/clojure/leiningen/run_example.clj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/clojure/leiningen/run_example.clj?rev=1195249&r1=1195248&r2=1195249&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/clojure/leiningen/run_example.clj (original)
+++ incubator/kafka/trunk/clients/clojure/leiningen/run_example.clj Sun Oct 30 21:17:13 2011
@@ -1,24 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one or more
-;; contributor license agreements. See the NOTICE file distributed with
-;; this work for additional information regarding copyright ownership.
-;; The ASF licenses this file to You under the Apache License, Version 2.0
-;; (the "License"); you may not use this file except in compliance with
-;; the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns leiningen.run-example
- (:use [leiningen.compile :only (eval-in-project)]))
-
-(defn run-example
- [project & args]
- (eval-in-project project
- `(do
- (require 'kafka.example)
- (kafka.example/run))))
-
Modified: incubator/kafka/trunk/clients/clojure/project.clj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/clojure/project.clj?rev=1195249&r1=1195248&r2=1195249&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/clojure/project.clj (original)
+++ incubator/kafka/trunk/clients/clojure/project.clj Sun Oct 30 21:17:13 2011
@@ -1,27 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one or more
-;; contributor license agreements. See the NOTICE file distributed with
-;; this work for additional information regarding copyright ownership.
-;; The ASF licenses this file to You under the Apache License, Version 2.0
-;; (the "License"); you may not use this file except in compliance with
-;; the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(defproject kafka-clj "0.1-SNAPSHOT"
- :description "Kafka client for Clojure."
- :url "http://sna-projects.com/kafka/"
- :dependencies [[org.clojure/clojure "1.2.0"]
- [org.clojure/clojure-contrib "1.2.0"]
- [log4j "1.2.15" :exclusions [javax.mail/mail
- javax.jms/jms
- com.sun.jdmk/jmxtools
- com.sun.jmx/jmxri]]]
- :disable-deps-clean false
- :warn-on-reflection true
- :source-path "src"
- :test-path "test")
Modified: incubator/kafka/trunk/clients/clojure/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/clojure/resources/log4j.properties?rev=1195249&r1=1195248&r2=1195249&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/clojure/resources/log4j.properties (original)
+++ incubator/kafka/trunk/clients/clojure/resources/log4j.properties Sun Oct 30 21:17:13 2011
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-log4j.rootLogger=INFO, A1
-
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern= %-5p %c - %m%n
Modified: incubator/kafka/trunk/clients/clojure/src/kafka/buffer.clj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/clojure/src/kafka/buffer.clj?rev=1195249&r1=1195248&r2=1195249&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/clojure/src/kafka/buffer.clj (original)
+++ incubator/kafka/trunk/clients/clojure/src/kafka/buffer.clj Sun Oct 30 21:17:13 2011
@@ -1,189 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one or more
-;; contributor license agreements. See the NOTICE file distributed with
-;; this work for additional information regarding copyright ownership.
-;; The ASF licenses this file to You under the Apache License, Version 2.0
-;; (the "License"); you may not use this file except in compliance with
-;; the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns #^{:doc "Wrapper around ByteBuffer,
- provides a DSL to model byte messages."}
- kafka.buffer
- (:import (java.nio ByteBuffer)
- (java.nio.channels SocketChannel)))
-
-(def #^{:doc "Buffer stack bind in with-buffer."}
- *buf* [])
-
-(def #^{:doc "Number of attempts to read a complete buffer from channel."}
- *channel-read-count* 5)
-
-;
-; Main buffer functions
-;
-
-(defn buffer
- "Creates a new ByteBuffer of argument size."
- [^int size]
- (ByteBuffer/allocate size))
-
-(defn ^ByteBuffer top
- "Returns top buffer from *buf* stack."
- []
- (peek *buf*))
-
-(defn flip
- []
- (.flip (top)))
-
-(defn rewind
- []
- (.rewind (top)))
-
-(defn clear
- []
- (.clear (top)))
-
-(defn has-remaining
- []
- (.hasRemaining (top)))
-
-;
-; Write to buffer
-;
-
-(defprotocol Put
- "Put protocol defines a generic buffer put method."
- (put [this]))
-
-(extend-type Byte
- Put
- (put [this] (.put (top) this)))
-
-(extend-type Integer
- Put
- (put [this] (.putInt (top) this)))
-
-(extend-type Short
- Put
- (put [this] (.putShort (top) this)))
-
-(extend-type Long
- Put
- (put [this] (.putLong (top) this)))
-
-(extend-type String
- Put
- (put [this] (.put (top) (.getBytes this "UTF-8"))))
-
-(extend-type (class (byte-array 0))
- Put
- (put [this] (.put (top) ^bytes this)))
-
-(extend-type clojure.lang.IPersistentCollection
- Put
- (put [this] (doseq [e this] (put e))))
-
-(defmacro length-encoded
- [type & body]
- `(with-buffer (.slice (top))
- (put (~type 0))
- (let [^ByteBuffer this# (top)
- ^ByteBuffer parent# (peek (pop *buf*))
- type-size# (.position this#)]
- ~@body
- (let [size# (.position this#)]
- (.rewind this#)
- (put (~type (- size# type-size#)))
- (.position parent# (+ (.position parent#) size#))))))
-
-(defmacro with-put
- [size f & body]
- `(with-buffer (.slice (top))
- (put (byte-array ~size))
- ~@body
- (let [^ByteBuffer this# (top)
- ^ByteBuffer parent# (peek (pop *buf*))
- pos# (.position this#)
- ba# (byte-array (- pos# ~size))]
- (doto this# (.rewind) (.get (byte-array ~size)) (.get ba#))
- (.rewind this#)
- (put (~f ba#))
- (.position parent# (+ (.position parent#) pos#)))))
-
-;
-; Read from buffer
-;
-
-(defn get-byte
- []
- (.get (top)))
-
-(defn get-short
- []
- (.getShort (top)))
-
-(defn get-int
- []
- (.getInt (top)))
-
-(defn get-long
- []
- (.getLong (top)))
-
-(defn get-array
- "Reads byte array of argument length from buffer."
- [^int length]
- (let [ba (byte-array length)]
- (.get (top) ba)
- ba))
-
-(defn get-string
- "Reads string of argument length from buffer."
- [^int length]
- (let [ba (byte-array length)]
- (.get (top) ba)
- (String. ba "UTF-8")))
-
-;
-; Util functions and macros
-;
-
-(defmacro with-buffer
- "Evaluates body in the context of the buffer."
- [buffer & body]
- `(binding [*buf* (conj *buf* ~buffer)]
- ~@body))
-
-(defn read-from
- "Reads from channel to the underlying top buffer.
- Throws ConnectException if channel is closed."
- [^SocketChannel channel]
- (let [size (.read channel (top))]
- (if (< size 0)
- (throw (java.net.ConnectException. "Channel closed?"))
- size)))
-
-(defn read-completely-from
- "Read the complete top buffer from the channel."
- [^SocketChannel channel]
- (loop [t *channel-read-count* size 0]
- (let [s (read-from channel)]
- (cond
- (< t 0)
- (throw (Exception. "Unable to read complete buffer from channel."))
- (has-remaining)
- (recur (dec t) (+ size s))
- :else size))))
-
-(defn write-to
- "Writes underlying top buffer to channel."
- [^SocketChannel channel]
- (.write channel (top)))
-
Modified: incubator/kafka/trunk/clients/clojure/src/kafka/example.clj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/clojure/src/kafka/example.clj?rev=1195249&r1=1195248&r2=1195249&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/clojure/src/kafka/example.clj (original)
+++ incubator/kafka/trunk/clients/clojure/src/kafka/example.clj Sun Oct 30 21:17:13 2011
@@ -1,52 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one or more
-;; contributor license agreements. See the NOTICE file distributed with
-;; this work for additional information regarding copyright ownership.
-;; The ASF licenses this file to You under the Apache License, Version 2.0
-;; (the "License"); you may not use this file except in compliance with
-;; the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns #^{:doc "Producer/Consumer example."}
- kafka.example
- (:use (clojure.contrib logging)
- (kafka types kafka print)))
-
-(defmacro thread
- "Executes body in a thread, logs exceptions."
- [ & body]
- `(future
- (try
- ~@body
- (catch Exception e#
- (error "Exception." e#)))))
-
-(defn start-consumer
- []
- (thread
- (with-open [c (consumer "localhost" 9092)]
- (doseq [m (consume-seq c "test" 0 {:blocking true})]
- (println "Consumed <-- " m)))
- (println "Finished consuming.")))
-
-(defn start-producer
- []
- (thread
- (with-open [p (producer "localhost" 9092)]
- (doseq [i (range 1 20)]
- (let [m (str "Message " i)]
- (produce p "test" 0 m)
- (println "Produced --> " m)
- (Thread/sleep 1000))))
- (println "Finished producing.")))
-
-(defn run
- []
- (start-consumer)
- (start-producer))
-
Modified: incubator/kafka/trunk/clients/clojure/src/kafka/kafka.clj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/clojure/src/kafka/kafka.clj?rev=1195249&r1=1195248&r2=1195249&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/clojure/src/kafka/kafka.clj (original)
+++ incubator/kafka/trunk/clients/clojure/src/kafka/kafka.clj Sun Oct 30 21:17:13 2011
@@ -1,281 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one or more
-;; contributor license agreements. See the NOTICE file distributed with
-;; this work for additional information regarding copyright ownership.
-;; The ASF licenses this file to You under the Apache License, Version 2.0
-;; (the "License"); you may not use this file except in compliance with
-;; the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns #^{:doc "Core kafka-clj module,
- provides producer and consumer factories."}
- kafka.kafka
- (:use (kafka types buffer)
- (clojure.contrib logging))
- (:import (kafka.types Message)
- (java.nio.channels SocketChannel)
- (java.net Socket InetSocketAddress)
- (java.util.zip CRC32)))
-
-;
-; Utils
-;
-
-(defn- crc32-int
- "CRC for byte array."
- [^bytes ba]
- (let [crc (doto (CRC32.) (.update ba))
- lv (.getValue crc)]
- (.intValue (bit-and lv 0xffffffff))))
-
-(defn- new-channel
- "Create and setup a new channel for a host name, port and options.
- Supported options:
- :receive-buffer-size - receive socket buffer size, default 65536.
- :send-buffer-size - send socket buffer size, default 65536.
- :socket-timeout - socket timeout."
- [^String host ^Integer port opts]
- (let [receive-buf-size (or (:receive-buffer-size opts) 65536)
- send-buf-size (or (:send-buffer-size opts) 65536)
- so-timeout (or (:socket-timeout opts) 60000)
- ch (SocketChannel/open)]
- (doto (.socket ch)
- (.setReceiveBufferSize receive-buf-size)
- (.setSendBufferSize send-buf-size)
- (.setSoTimeout so-timeout))
- (doto ch
- (.configureBlocking true)
- (.connect (InetSocketAddress. host port)))))
-
-(defn- close-channel
- "Close the channel."
- [^SocketChannel channel]
- (.close channel)
- (.close (.socket channel)))
-
-(defn- response-size
- "Read first four bytes from channel as an integer."
- [channel]
- (with-buffer (buffer 4)
- (read-completely-from channel)
- (flip)
- (get-int)))
-
-(defmacro with-error-code
- "Convenience response error code check."
- [request & body]
- `(let [error-code# (get-short)] ; error code
- (if (not= error-code# 0)
- (error (str "Request " ~request " returned error code: " error-code# "."))
- ~@body)))
-
-;
-; Producer
-;
-
-(defn- send-message
- "Send messages."
- [channel topic partition messages opts]
- (let [size (or (:send-buffer-size opts) 65536)]
- (with-buffer (buffer size)
- (length-encoded int ; request size
- (put (short 0)) ; request type
- (length-encoded short ; topic size
- (put topic)) ; topic
- (put (int partition)) ; partition
- (length-encoded int ; messages size
- (doseq [m messages]
- (let [^Message pm (pack m)]
- (length-encoded int ; message size
- (put (byte 0)) ; magic
- (with-put 4 crc32-int ; crc
- (put (.message pm)))))))) ; message
- (flip)
- (write-to channel))))
-
-(defn producer
- "Producer factory. See new-channel for list of supported options."
- [host port & [opts]]
- (let [channel (new-channel host port opts)]
- (reify Producer
- (produce [this topic partition messages]
- (let [msg (if (sequential? messages) messages [messages])]
- (send-message channel topic partition msg opts)))
- (close [this]
- (close-channel channel)))))
-
-;
-; Consumer
-;
-
-; Offset
-
-(defn- offset-fetch-request
- "Fetch offsets request."
- [channel topic partition time max-offsets]
- (let [size (+ 4 2 2 (count topic) 4 8 4)]
- (with-buffer (buffer size)
- (length-encoded int ; request size
- (put (short 4)) ; request type
- (length-encoded short ; topic size
- (put topic)) ; topic
- (put (int partition)) ; partition
- (put (long time)) ; time
- (put (int max-offsets))) ; max-offsets
- (flip)
- (write-to channel))))
-
-(defn- fetch-offsets
- "Fetch offsets as an integer sequence."
- [channel topic partition time max-offsets]
- (offset-fetch-request channel topic partition time max-offsets)
- (let [rsp-size (response-size channel)]
- (with-buffer (buffer rsp-size)
- (read-completely-from channel)
- (flip)
- (with-error-code "Fetch-Offsets"
- (loop [c (get-int) res []]
- (if (> c 0)
- (recur (dec c) (conj res (get-long)))
- (doall res)))))))
-
-; Messages
-
-(defn- message-fetch-request
- "Fetch messages request."
- [channel topic partition offset max-size]
- (let [size (+ 4 2 2 (count topic) 4 8 4)]
- (with-buffer (buffer size)
- (length-encoded int ; request size
- (put (short 1)) ; request type
- (length-encoded short ; topic size
- (put topic)) ; topic
- (put (int partition)) ; partition
- (put (long offset)) ; offset
- (put (int max-size))) ; max size
- (flip)
- (write-to channel))))
-
-(defn- read-response
- "Read response from buffer. Returns a pair [new offset, messages sequence]."
- [offset]
- (with-error-code "Fetch-Messages"
- (loop [off offset msg []]
- (if (has-remaining)
- (let [size (get-int) ; message size
- magic (get-byte) ; magic
- crc (get-int) ; crc
- message (get-array (- size 5))]
- (recur (+ off size 4) (conj msg (unpack (Message. message)))))
- [off (doall msg)]))))
-
-(defn- fetch-messages
- "Message fetch, returns a pair [new offset, messages sequence]."
- [channel topic partition offset max-size]
- (message-fetch-request channel topic partition offset max-size)
- (let [rsp-size (response-size channel)]
- (with-buffer (buffer rsp-size)
- (read-completely-from channel)
- (flip)
- (read-response offset))))
-
-; Consumer sequence
-
-(defn- seq-fetch
- "Non-blocking fetch function used by consumer sequence."
- [channel topic partition opts]
- (let [max-size (or (:max-size opts) 1000000)]
- (fn [offset]
- (fetch-messages channel topic partition offset max-size))))
-
-(defn- blocking-seq-fetch
- "Blocking fetch function used by consumer sequence."
- [channel topic partition opts]
- (let [repeat-count (or (:repeat-count opts) 10)
- repeat-timeout (or (:repeat-timeout opts) 1000)
- max-size (or (:max-size opts) 1000000)]
- (fn [offset]
- (loop [c repeat-count]
- (if (> c 0)
- (let [rs (fetch-messages channel topic partition offset max-size)]
- (if (or (nil? rs) (= offset (first rs)))
- (do
- (Thread/sleep repeat-timeout)
- (recur (dec c)))
- (doall rs)))
- (debug "Stopping blocking seq fetch."))))))
-
-(defn- fetch-queue
- [offset queue fetch-fn]
- (if (empty? @queue)
- (let [[new-offset msg] (fetch-fn @offset)]
- (when new-offset
- (debug (str "Fetched " (count msg) " messages:"))
- (debug (str "New offset " new-offset "."))
- (swap! queue #(reduce conj % (reverse msg)))
- (reset! offset new-offset)))))
-
-(defn- consumer-seq
- "Sequence constructor."
- [offset fetch-fn]
- (let [offset (atom offset)
- queue (atom (seq []))]
- (reify
- clojure.lang.IPersistentCollection
- (seq [this] this)
- (cons [this _] (throw (Exception. "cons not supported for consumer sequence.")))
- (empty [this] nil)
- (equiv [this o]
- (fatal "Implement equiv for consumer seq!")
- false)
- clojure.lang.ISeq
- (first [this]
- (fetch-queue offset queue fetch-fn)
- (first @queue))
- (next [this]
- (swap! queue rest)
- (fetch-queue offset queue fetch-fn)
- (if (not (empty? @queue)) this))
- (more [this]
- (swap! queue rest)
- (fetch-queue offset queue fetch-fn)
- (if (empty? @queue) (empty) this))
- Object
- (toString [this]
- (str "ConsumerQueue")))))
-
-; Consumer factory
-
-(defn consumer
- "Consumer factory. See new-channel for list of supported options."
- [host port & [opts]]
- (let [channel (new-channel host port opts)]
- (reify Consumer
- (consume [this topic partition offset max-size]
- (fetch-messages channel topic partition offset max-size))
-
- (offsets [this topic partition time max-offsets]
- (fetch-offsets channel topic partition time max-offsets))
-
- (consume-seq [this topic partition]
- (let [[offset] (fetch-offsets channel topic partition -1 1)]
- (debug (str "Initializing last offset to " offset "."))
- (consumer-seq (or offset 0) (seq-fetch channel topic partition opts))))
-
- (consume-seq [this topic partition opts]
- (let [[offset] (or (:offset opts)
- (fetch-offsets channel topic partition -1 1))
- fetch-fn (if (:blocking opts)
- (blocking-seq-fetch channel topic partition opts)
- (seq-fetch channel topic partition opts))]
- (debug (str "Initializing last offset to " offset "."))
- (consumer-seq (or offset 0) fetch-fn)))
-
- (close [this]
- (close-channel channel)))))
-
Modified: incubator/kafka/trunk/clients/clojure/src/kafka/print.clj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/clojure/src/kafka/print.clj?rev=1195249&r1=1195248&r2=1195249&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/clojure/src/kafka/print.clj (original)
+++ incubator/kafka/trunk/clients/clojure/src/kafka/print.clj Sun Oct 30 21:17:13 2011
@@ -1,36 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one or more
-;; contributor license agreements. See the NOTICE file distributed with
-;; this work for additional information regarding copyright ownership.
-;; The ASF licenses this file to You under the Apache License, Version 2.0
-;; (the "License"); you may not use this file except in compliance with
-;; the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns #^{:doc "Basic Clojure print-dup -> read-string message serialization."}
- kafka.print
- (:use kafka.types)
- (:import (kafka.types Message)))
-
-(extend-type Object
- Pack
- (pack [this]
- (let [^String st (with-out-str (print-dup this *out*))]
- (kafka.types.Message. (.getBytes st "UTF-8")))))
-
-(extend-type Message
- Unpack
- (unpack [this]
- (let [^bytes ba (.message this)
- msg (String. ba "UTF-8")]
- (if (not (empty? msg))
- (try
- (read-string msg)
- (catch Exception e
- (println "Invalid expression " msg)))))))
-
Modified: incubator/kafka/trunk/clients/clojure/src/kafka/serializable.clj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/clojure/src/kafka/serializable.clj?rev=1195249&r1=1195248&r2=1195249&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/clojure/src/kafka/serializable.clj (original)
+++ incubator/kafka/trunk/clients/clojure/src/kafka/serializable.clj Sun Oct 30 21:17:13 2011
@@ -1,36 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one or more
-;; contributor license agreements. See the NOTICE file distributed with
-;; this work for additional information regarding copyright ownership.
-;; The ASF licenses this file to You under the Apache License, Version 2.0
-;; (the "License"); you may not use this file except in compliance with
-;; the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns #^{:doc "Serialization for all Java Serializable objects."}
- kafka.serializable
- (:use kafka.types)
- (:import (kafka.types Message)
- (java.io Serializable
- ObjectOutputStream ByteArrayOutputStream
- ObjectInputStream ByteArrayInputStream)))
-
-(extend-type Serializable
- Pack
- (pack [this]
- (let [bas (ByteArrayOutputStream.)]
- (with-open [oos (ObjectOutputStream. bas)]
- (.writeObject oos this))
- (kafka.types.Message. (.toByteArray bas)))))
-
-(extend-type Message
- Unpack
- (unpack [this]
- (with-open [ois (ObjectInputStream. (ByteArrayInputStream. (.message this)))]
- (.readObject ois))))
-
Modified: incubator/kafka/trunk/clients/clojure/src/kafka/types.clj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/clojure/src/kafka/types.clj?rev=1195249&r1=1195248&r2=1195249&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/clojure/src/kafka/types.clj (original)
+++ incubator/kafka/trunk/clients/clojure/src/kafka/types.clj Sun Oct 30 21:17:13 2011
@@ -1,42 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one or more
-;; contributor license agreements. See the NOTICE file distributed with
-;; this work for additional information regarding copyright ownership.
-;; The ASF licenses this file to You under the Apache License, Version 2.0
-;; (the "License"); you may not use this file except in compliance with
-;; the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns #^{:doc "Base kafka-clj types."}
- kafka.types)
-
-(deftype #^{:doc "Message type, a wrapper around a byte array."}
- Message [^bytes message])
-
-(defprotocol Pack
- "Pack protocol converts an object to a Message."
- (pack [this] "Convert object to a Message."))
-
-(defprotocol Unpack
- "Unpack protocol, reads an object from a Message."
- (unpack [^Message this] "Read an object from the message."))
-
-(defprotocol Producer
- "Producer protocol."
- (produce [this topic partition messages] "Send message[s] for a topic to a partition.")
- (close [this] "Closes the producer, socket and channel."))
-
-(defprotocol Consumer
- "Consumer protocol."
- (consume [this topic partition offset max-size] "Fetch messages. Returns a pair [last-offset, message sequence]")
- (offsets [this topic partition time max-offsets] "Query offsets. Returns offsets seq.")
-
- (consume-seq [this topic partition]
- [this topic partition opts] "Creates a sequence over the consumer.")
- (close [this] "Close the consumer, socket and channel."))
-
Modified: incubator/kafka/trunk/clients/clojure/test/kafka/buffer_test.clj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/clojure/test/kafka/buffer_test.clj?rev=1195249&r1=1195248&r2=1195249&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/clojure/test/kafka/buffer_test.clj (original)
+++ incubator/kafka/trunk/clients/clojure/test/kafka/buffer_test.clj Sun Oct 30 21:17:13 2011
@@ -1,60 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one or more
-;; contributor license agreements. See the NOTICE file distributed with
-;; this work for additional information regarding copyright ownership.
-;; The ASF licenses this file to You under the Apache License, Version 2.0
-;; (the "License"); you may not use this file except in compliance with
-;; the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns kafka.buffer-test
- (:use (kafka buffer)
- clojure.test))
-
-(deftest test-put-get
- (with-buffer (buffer 64)
- (put (byte 5))
- (put (short 10))
- (put (int 20))
- (put (long 40))
- (put "test")
- (put (byte-array 3 [(byte 1) (byte 2) (byte 3)]))
- (flip)
-
- (is (= (get-byte) (byte 5)))
- (is (= (get-short) (short 10)))
- (is (= (get-int) (int 20)))
- (is (= (get-long) (long 40)))
- (is (= (get-string 4) "test"))
- (let [ba (get-array 3)]
- (is (= (nth ba 0) (byte 1)))
- (is (= (nth ba 1) (byte 2)))
- (is (= (nth ba 2) (byte 3))))))
-
-(deftest test-with-put
- (with-buffer (buffer 64)
- (with-put 4 count
- (put "test 1"))
- (flip)
-
- (is (= (get-int) (int 6)))
- (is (= (get-string 6) "test 1"))))
-
-(deftest test-length-encoded
- (with-buffer (buffer 64)
- (length-encoded short
- (put "test 1"))
- (length-encoded int
- (put "test 2"))
- (flip)
-
- (is (= (get-short) (short 6)))
- (is (= (get-string 6) "test 1"))
- (is (= (get-int) (int 6)))
- (is (= (get-string 6) "test 2"))))
-
Modified: incubator/kafka/trunk/clients/clojure/test/kafka/print_test.clj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/clojure/test/kafka/print_test.clj?rev=1195249&r1=1195248&r2=1195249&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/clojure/test/kafka/print_test.clj (original)
+++ incubator/kafka/trunk/clients/clojure/test/kafka/print_test.clj Sun Oct 30 21:17:13 2011
@@ -1,26 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one or more
-;; contributor license agreements. See the NOTICE file distributed with
-;; this work for additional information regarding copyright ownership.
-;; The ASF licenses this file to You under the Apache License, Version 2.0
-;; (the "License"); you may not use this file except in compliance with
-;; the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns kafka.print-test
- (:use (kafka types print)
- clojure.test))
-
-(deftest test-pack-unpack
- (is (= "test" (unpack (pack "test"))))
- (is (= 123 (unpack (pack 123))))
- (is (= true (unpack (pack true))))
- (is (= [1 2 3] (unpack (pack [1 2 3]))))
- (is (= {:a 1} (unpack (pack {:a 1}))))
- (is (= '(+ 1 2 3) (unpack (pack '(+ 1 2 3))))))
-
Modified: incubator/kafka/trunk/clients/clojure/test/kafka/serializable_test.clj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/clojure/test/kafka/serializable_test.clj?rev=1195249&r1=1195248&r2=1195249&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/clojure/test/kafka/serializable_test.clj (original)
+++ incubator/kafka/trunk/clients/clojure/test/kafka/serializable_test.clj Sun Oct 30 21:17:13 2011
@@ -1,28 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one or more
-;; contributor license agreements. See the NOTICE file distributed with
-;; this work for additional information regarding copyright ownership.
-;; The ASF licenses this file to You under the Apache License, Version 2.0
-;; (the "License"); you may not use this file except in compliance with
-;; the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns kafka.serializable-test
- (:use (kafka types serializable)
- clojure.test))
-
-(deftest test-pack-unpack
- (is (= "test" (unpack (pack "test"))))
- (is (= 123 (unpack (pack 123))))
- (is (= true (unpack (pack true))))
- (is (= [1 2 3] (unpack (pack [1 2 3]))))
- (is (= {:a 1} (unpack (pack {:a 1}))))
- (is (= '(+ 1 2 3) (unpack (pack '(+ 1 2 3)))))
- (let [now (java.util.Date.)]
- (is (= now (unpack (pack now))))))
-