You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/02/11 23:17:30 UTC

svn commit: r1567419 - in /incubator/streams/trunk/streams-contrib/streams-persist-kafka: ./ src/main/java/org/apache/streams/kafka/ src/main/jsonschema/org/apache/streams/kafka/ src/main/resources/

Author: sblackmon
Date: Tue Feb 11 22:17:29 2014
New Revision: 1567419

URL: http://svn.apache.org/r1567419
Log:
added kafka reader capability - not yet tested!



Modified:
    incubator/streams/trunk/streams-contrib/streams-persist-kafka/pom.xml
    incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java
    incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
    incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties

Modified: incubator/streams/trunk/streams-contrib/streams-persist-kafka/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-kafka/pom.xml?rev=1567419&r1=1567418&r2=1567419&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-kafka/pom.xml (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-kafka/pom.xml Tue Feb 11 22:17:29 2014
@@ -34,7 +34,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.9.2</artifactId>
+            <artifactId>kafka_2.10</artifactId>
             <version>0.8.0</version>
             <exclusions>
                 <exclusion>
@@ -45,6 +45,14 @@
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-log4j12</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jdmk</groupId>
+                    <artifactId>jmxtools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jmx</groupId>
+                    <artifactId>jmxri</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency> <!-- Have to add to be able to .get() from storm tuple -->

Modified: incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java?rev=1567419&r1=1567418&r2=1567419&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java Tue Feb 11 22:17:29 2014
@@ -14,12 +14,16 @@ public class KafkaConfigurator {
 
     public static KafkaConfiguration detectConfiguration(Config kafka) {
         String brokerlist = StreamsConfigurator.config.getString("kafka.metadata.broker.list");
+        String zkconnect = StreamsConfigurator.config.getString("kafka.zkconnect");
         String topic = StreamsConfigurator.config.getString("kafka.topic");
+        String groupId = StreamsConfigurator.config.getString("kafka.groupid");
 
         KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
 
         kafkaConfiguration.setBrokerlist(brokerlist);
-        kafkaConfiguration.setBrokerlist(brokerlist);
+        kafkaConfiguration.setZkconnect(zkconnect);
+        kafkaConfiguration.setTopic(topic);
+        kafkaConfiguration.setGroupId(groupId);
 
         return kafkaConfiguration;
     }

Modified: incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json?rev=1567419&r1=1567418&r2=1567419&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json Tue Feb 11 22:17:29 2014
@@ -7,15 +7,19 @@
     "properties": {
         "brokerlist": {
             "type": "string",
-            "description": "A comma-delimited list of broker nodes"
+            "description": "A comma-delimited list of broker nodes, used by producer"
         },
         "zkconnect": {
             "type": "string",
-            "description": "A comma-delimited list of zookeeper host:ports"
+            "description": "A comma-delimited list of zookeeper host:ports, used by consumer"
         },
         "topic": {
             "type": "string",
             "description": "A topic to read/write from"
+        },
+        "groupId": {
+            "type": "string",
+            "description": "A required field for partitioning distributed consumers"
         }
     }
 }
\ No newline at end of file

Modified: incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties?rev=1567419&r1=1567418&r2=1567419&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties Tue Feb 11 22:17:29 2014
@@ -34,4 +34,10 @@ kafka.serializer.class=kafka.serializer.
 #queue.enqueue.timeout.ms=
 
 # the number of messages batched at the producer
-#batch.num.messages=
\ No newline at end of file
+#batch.num.messages=
+
+kafka.groupid=kafka
+
+kafka.zk.connect=localhost:2181
+
+