You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/03/02 11:46:21 UTC

[1/2] storm git commit: STORM-2971: Replace storm-kafka with storm-kafka-client in Flux examples, fix Flux bug where setter argument types are not checked or coerced

Repository: storm
Updated Branches:
  refs/heads/master e8e1a4e8f -> 21832ad40


STORM-2971: Replace storm-kafka with storm-kafka-client in Flux examples, fix Flux bug where setter argument types are not checked or coerced


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/288d97cb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/288d97cb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/288d97cb

Branch: refs/heads/master
Commit: 288d97cb5497f27b69b2fb126c2b0097cd02605a
Parents: a38f0c5
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sun Feb 25 16:10:35 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Thu Mar 1 08:16:21 2018 +0100

----------------------------------------------------------------------
 flux/README.md                                  | 137 ++++++++-----------
 flux/flux-core/pom.xml                          |   2 +-
 .../java/org/apache/storm/flux/FluxBuilder.java |  27 ++--
 .../flux/test/OnlyValueRecordTranslator.java    |  37 +++++
 .../storm/flux/test/TridentTopologySource.java  |   1 -
 .../src/test/resources/configs/kafka_test.yaml  |  61 ++++-----
 flux/flux-examples/README.md                    |   2 +-
 flux/flux-examples/pom.xml                      |   6 +-
 .../examples/OnlyValueRecordTranslator.java     |  37 +++++
 .../src/main/resources/kafka_spout.yaml         |  71 ++++------
 10 files changed, 200 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/README.md
----------------------------------------------------------------------
diff --git a/flux/README.md b/flux/README.md
index 5aa76ae..58ed25d 100644
--- a/flux/README.md
+++ b/flux/README.md
@@ -57,7 +57,7 @@ the layout and configuration of your topologies.
    in your topology code
  * Support for existing topology code (see below)
  * Define Storm Core API (Spouts/Bolts) using a flexible YAML DSL
- * YAML DSL support for most Storm components (storm-kafka, storm-hdfs, storm-hbase, etc.)
+ * YAML DSL support for most Storm components (storm-kafka-client, storm-hdfs, storm-hbase, etc.)
  * Convenient support for multi-lang components
  * External property substitution/filtering for easily switching between configurations/environments (similar to Maven-style
    `${variable.name}` substitution)
@@ -354,19 +354,20 @@ storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_co
 With the following `dev.properties` file:
 
 ```properties
-kafka.zookeeper.hosts: localhost:2181
+kafka.bootstrap.hosts: localhost:9092
 ```
 
 You would then be able to reference those properties by key in your `.yaml` file using `${}` syntax:
 
 ```yaml
-  - id: "zkHosts"
-    className: "org.apache.storm.kafka.ZkHosts"
+  - id: "spoutConfigBuilder"
+    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
     constructorArgs:
-      - "${kafka.zookeeper.hosts}"
+      - "${kafka.bootstrap.hosts}"
+      - ["myKafkaTopic"]
 ```
 
-In this case, Flux would replace `${kafka.zookeeper.hosts}` with `localhost:2181` before parsing the YAML contents.
+In this case, Flux would replace `${kafka.bootstrap.hosts}` with `localhost:9092` before parsing the YAML contents.
 
 ### Environment Variable Substitution/Filtering
 Flux also allows environment variable substitution. For example, if an environment variable named `ZK_HOSTS` if defined,
@@ -378,16 +379,16 @@ ${ENV-ZK_HOSTS}
 
 ## Components
 Components are essentially named object instances that are made available as configuration options for spouts and
-bolts. If you are familiar with the Spring framework, components are roughly analagous to Spring beans.
+bolts. If you are familiar with the Spring framework, components are roughly analogous to Spring beans.
 
 Every component is identified, at a minimum, by a unique identifier (String) and a class name (String). For example,
-the following will make an instance of the `org.apache.storm.kafka.StringScheme` class available as a reference under the key
-`"stringScheme"` . This assumes the `org.apache.storm.kafka.StringScheme` has a default constructor.
+the following will make an instance of the `org.apache.storm.flux.examples.OnlyValueRecordTranslator` class available as a reference under the key
+`"recordTranslator"` . This assumes the `org.apache.storm.flux.examples.OnlyValueRecordTranslator` has a default constructor.
 
 ```yaml
 components:
-  - id: "stringScheme"
-    className: "org.apache.storm.kafka.StringScheme"
+  - id: "recordTranslator"
+    className: "org.apache.storm.flux.examples.OnlyValueRecordTranslator"
 ```
 
 ### Contructor Arguments, References, Properties and Configuration Methods
@@ -395,32 +396,36 @@ components:
 ####Constructor Arguments
 Arguments to a class constructor can be configured by adding a `contructorArgs` element to a components.
 `constructorArgs` is a list of objects that will be passed to the class' constructor. The following example creates an
-object by calling the constructor that takes a single string as an argument:
+object by calling the constructor that takes a string and an array of strings as arguments:
 
 ```yaml
-  - id: "zkHosts"
-    className: "org.apache.storm.kafka.ZkHosts"
+  - id: "spoutConfigBuilder"
+    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
     constructorArgs:
-      - "localhost:2181"
-      - true
+      - "${kafka.bootstrap.hosts}"
+      - ["myKafkaTopic"]
 ```
 
 ####References
 Each component instance is identified by a unique id that allows it to be used/reused by other components. To
 reference an existing component, you specify the id of the component with the `ref` tag.
 
-In the following example, a component with the id `"stringScheme"` is created, and later referenced, as a an argument
+In the following example, a component with the id `"recordTranslator"` is created, and later referenced, as a an argument
 to another component's constructor:
 
 ```yaml
 components:
-  - id: "stringScheme"
-    className: "org.apache.storm.kafka.StringScheme"
+  - id: "recordTranslator"
+    className: "org.apache.storm.flux.examples.OnlyValueRecordTranslator"
 
-  - id: "stringMultiScheme"
-    className: "org.apache.storm.spout.SchemeAsMultiScheme"
+    - id: "spoutConfigBuilder"
+    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
     constructorArgs:
-      - ref: "stringScheme" # component with id "stringScheme" must be declared above.
+      - "localhost:9092"
+      - ["myKafkaTopic"]
+    properties:
+      - name: "recordTranslator"
+        ref: "onlyValueRecordTranslator" #Component with id "recordTranslator" must be declared above
 ```
 
 You can also reference existing components in list via specifying the id of the components with the `reflist` tag.
@@ -448,27 +453,20 @@ In addition to calling constructors with different arguments, Flux also allows y
 JavaBean-like setter methods and fields declared as `public`:
 
 ```yaml
-  - id: "spoutConfig"
-    className: "org.apache.storm.kafka.SpoutConfig"
+  - id: "spoutConfigBuilder"
+    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
     constructorArgs:
-      # brokerHosts
-      - ref: "zkHosts"
-      # topic
-      - "myKafkaTopic"
-      # zkRoot
-      - "/kafkaSpout"
-      # id
-      - "myId"
+      - "localhost:9092"
+      - ["myKafkaTopic"]
     properties:
-      - name: "ignoreZkOffsets"
-        value: true
-      - name: "scheme"
-        ref: "stringMultiScheme"
+      - name: "pollTimeoutMs"
+        value: 5000
 ```
 
-In the example above, the `properties` declaration will cause Flux to look for a public method in the `SpoutConfig` with
-the signature `setForceFromStart(boolean b)` and attempt to invoke it. If a setter method is not found, Flux will then
-look for a public instance variable with the name `ignoreZkOffsets` and attempt to set its value.
+In the example above, the `properties` declaration will cause Flux to look for a public method in the `KafkaSpoutConfig$Builder` with
+the signature `setPollTimeoutMs(int i)` and attempt to invoke it. If a setter method is not found, Flux will then
+look for a public instance variable with the name `pollTimeoutMs` and attempt to set its value.
+Note that Flux will attempt to coerce actual parameter types to fit the setter parameter types, so e.g. calling `setMyFloat(float f)` with `value: 10` is possible.
 
 References may also be used as property values.
 
@@ -623,46 +621,31 @@ Kafka spout example:
 
 ```yaml
 components:
-  - id: "stringScheme"
-    className: "org.apache.storm.kafka.StringScheme"
-
-  - id: "stringMultiScheme"
-    className: "org.apache.storm.spout.SchemeAsMultiScheme"
-    constructorArgs:
-      - ref: "stringScheme"
-
-  - id: "zkHosts"
-    className: "org.apache.storm.kafka.ZkHosts"
+  - id: "onlyValueRecordTranslator"
+    className: "org.apache.storm.flux.examples.OnlyValueRecordTranslator"
+    
+  - id: "spoutConfigBuilder"
+    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
     constructorArgs:
-      - "localhost:2181"
-
-# Alternative kafka config
-#  - id: "kafkaConfig"
-#    className: "org.apache.storm.kafka.KafkaConfig"
-#    constructorArgs:
-#      # brokerHosts
-#      - ref: "zkHosts"
-#      # topic
-#      - "myKafkaTopic"
-#      # clientId (optional)
-#      - "myKafkaClientId"
-
+      - "localhost:9092"
+      - ["myKafkaTopic"]
+    properties:
+      - name: "firstPollOffsetStrategy"
+        value: EARLIEST
+      - name: "recordTranslator"
+        ref: "onlyValueRecordTranslator"
+    configMethods:
+      - name: "setProp"
+        args:
+          - {
+              "key.deserializer" : "org.apache.kafka.common.serialization.StringDeserializer",
+              "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
+            }
+                
   - id: "spoutConfig"
-    className: "org.apache.storm.kafka.SpoutConfig"
+    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig"
     constructorArgs:
-      # brokerHosts
-      - ref: "zkHosts"
-      # topic
-      - "myKafkaTopic"
-      # zkRoot
-      - "/kafkaSpout"
-      # id
-      - "myId"
-    properties:
-      - name: "ignoreZkOffsets"
-        value: true
-      - name: "scheme"
-        ref: "stringMultiScheme"
+      - ref: "spoutConfigBuilder"
 
 config:
   topology.workers: 1
@@ -670,7 +653,7 @@ config:
 # spout definitions
 spouts:
   - id: "kafka-spout"
-    className: "org.apache.storm.kafka.KafkaSpout"
+    className: "org.apache.storm.kafka.spout.KafkaSpout"
     constructorArgs:
       - ref: "spoutConfig"
 

http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-core/pom.xml
----------------------------------------------------------------------
diff --git a/flux/flux-core/pom.xml b/flux/flux-core/pom.xml
index acdf805..d29e619 100644
--- a/flux/flux-core/pom.xml
+++ b/flux/flux-core/pom.xml
@@ -38,7 +38,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-kafka</artifactId>
+            <artifactId>storm-kafka-client</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
index 20bde18..e511add 100644
--- a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
@@ -25,6 +25,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -277,9 +278,10 @@ public class FluxBuilder {
                 Object value = prop.isReference() ? context.getComponent(prop.getRef()) : prop.getValue();
                 Method setter = findSetter(clazz, prop.getName(), value);
                 if (setter != null) {
-                    LOG.debug("found setter, attempting to invoke");
+                    Object[] methodArgs = getArgsWithListCoercion(Collections.singletonList(value), setter.getParameterTypes());
+                    LOG.debug("found setter, attempting to invoke with {}", methodArgs);
                     // invoke setter
-                    setter.invoke(instance, new Object[]{value});
+                    setter.invoke(instance, methodArgs);
                 } else {
                     // look for a public instance variable
                     LOG.debug("no setter found. Looking for a public instance variable...");
@@ -299,15 +301,20 @@ public class FluxBuilder {
 
     private static Method findSetter(Class clazz, String property, Object arg) {
         String setterName = toSetterName(property);
-        Method retval = null;
         Method[] methods = clazz.getMethods();
+        LOG.debug("Target setter: {}, arg: {}", setterName, arg);
         for (Method method : methods) {
             if (setterName.equals(method.getName())) {
-                LOG.debug("Found setter method: " + method.getName());
-                retval = method;
+                Class<?>[] parameterTypes = method.getParameterTypes();
+                LOG.debug("Found setter method: {}, parameter types: {}", method.getName(), parameterTypes);
+                boolean invokable = canInvokeWithArgs(Collections.singletonList(arg), method.getParameterTypes());
+                LOG.debug("** invokable --> {}", invokable);
+                if (invokable) {
+                    return method;
+                }
             }
         }
-        return retval;
+        return null;
     }
 
     private static String toSetterName(String name) {
@@ -350,7 +357,7 @@ public class FluxBuilder {
             Constructor con = findCompatibleConstructor(constructorArgs, clazz);
             if (con != null) {
                 LOG.debug("Found something seemingly compatible, attempting invocation...");
-                obj = con.newInstance(getArgsWithListCoercian(constructorArgs, con.getParameterTypes()));
+                obj = con.newInstance(getArgsWithListCoercion(constructorArgs, con.getParameterTypes()));
             } else {
                 String msg = String.format("Couldn't find a suitable constructor for class '%s' with arguments '%s'.",
                         clazz.getName(),
@@ -368,7 +375,7 @@ public class FluxBuilder {
             }
             method = findCompatibleMethod(methodArgs, clazz, def.getFactory());
             if (method != null) {
-                obj = method.invoke(null, getArgsWithListCoercian(methodArgs, method.getParameterTypes()));
+                obj = method.invoke(null, getArgsWithListCoercion(methodArgs, method.getParameterTypes()));
             } else {
                 String msg = String.format("Couldn't find a suitable static method '%s' for class '%s' with arguments '%s'.",
                         def.getFactory(),
@@ -530,7 +537,7 @@ public class FluxBuilder {
             String methodName = methodDef.getName();
             Method method = findCompatibleMethod(args, clazz, methodName);
             if (method != null) {
-                Object[] methodArgs = getArgsWithListCoercian(args, method.getParameterTypes());
+                Object[] methodArgs = getArgsWithListCoercion(args, method.getParameterTypes());
                 method.invoke(instance, methodArgs);
             } else {
                 String msg = String.format("Unable to find configuration method '%s' in class '%s' with arguments %s.",
@@ -580,7 +587,7 @@ public class FluxBuilder {
      * list to an java.lang.Object array that can be used to invoke the constructor. If an argument needs
      * to be coerced from a List to an Array, do so.
      */
-    private static Object[] getArgsWithListCoercian(List<Object> args, Class[] parameterTypes) {
+    private static Object[] getArgsWithListCoercion(List<Object> args, Class[] parameterTypes) {
         if (parameterTypes.length != args.size()) {
             throw new IllegalArgumentException("Contructor parameter count does not egual argument size.");
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-core/src/test/java/org/apache/storm/flux/test/OnlyValueRecordTranslator.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/test/OnlyValueRecordTranslator.java b/flux/flux-core/src/test/java/org/apache/storm/flux/test/OnlyValueRecordTranslator.java
new file mode 100644
index 0000000..1d48c22
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/test/OnlyValueRecordTranslator.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.flux.test;
+
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class OnlyValueRecordTranslator<K, V> implements RecordTranslator<K, V> {
+
+    @Override
+    public List<Object> apply(ConsumerRecord<K, V> record) {
+        return new Values(record.value());
+    }
+
+    @Override
+    public Fields getFieldsFor(String stream) {
+        return new Fields("value");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java b/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java
index 36b272b..b39d771 100644
--- a/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java
@@ -21,7 +21,6 @@ import org.apache.storm.Config;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.kafka.StringScheme;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.operation.BaseFunction;
 import org.apache.storm.trident.operation.TridentCollector;

http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-core/src/test/resources/configs/kafka_test.yaml
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/resources/configs/kafka_test.yaml b/flux/flux-core/src/test/resources/configs/kafka_test.yaml
index 1fb59ca..76d2ee8 100644
--- a/flux/flux-core/src/test/resources/configs/kafka_test.yaml
+++ b/flux/flux-core/src/test/resources/configs/kafka_test.yaml
@@ -25,46 +25,31 @@ name: "kafka-topology"
 #
 # for the time being, components must be declared in the order they are referenced
 components:
-  - id: "stringScheme"
-    className: "org.apache.storm.kafka.StringScheme"
-
-  - id: "stringMultiScheme"
-    className: "org.apache.storm.spout.SchemeAsMultiScheme"
-    constructorArgs:
-      - ref: "stringScheme"
-
-  - id: "zkHosts"
-    className: "org.apache.storm.kafka.ZkHosts"
+  - id: "onlyValueRecordTranslator"
+    className: "org.apache.storm.flux.test.OnlyValueRecordTranslator"
+    
+  - id: "spoutConfigBuilder"
+    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
     constructorArgs:
-      - "localhost:2181"
-
-# Alternative kafka config
-#  - id: "kafkaConfig"
-#    className: "org.apache.storm.kafka.KafkaConfig"
-#    constructorArgs:
-#      # brokerHosts
-#      - ref: "zkHosts"
-#      # topic
-#      - "myKafkaTopic"
-#      # clientId (optional)
-#      - "myKafkaClientId"
-
+      - "localhost:9092"
+      - ["myKafkaTopic"]
+    properties:
+      - name: "firstPollOffsetStrategy"
+        value: EARLIEST
+      - name: "recordTranslator"
+        ref: "onlyValueRecordTranslator"
+    configMethods:
+      - name: "setProp"
+        args:
+          - {
+              "key.deserializer" : "org.apache.kafka.common.serialization.StringDeserializer",
+              "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
+            }
+                
   - id: "spoutConfig"
-    className: "org.apache.storm.kafka.SpoutConfig"
+    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig"
     constructorArgs:
-      # brokerHosts
-      - ref: "zkHosts"
-      # topic
-      - "myKafkaTopic"
-      # zkRoot
-      - "/kafkaSpout"
-      # id
-      - "myId"
-    properties:
-      - name: "ignoreZkOffsets"
-        value: true
-      - name: "scheme"
-        ref: "stringMultiScheme"
+      - ref: "spoutConfigBuilder"
 
 # topology configuration
 # this will be passed to the submitter as a map of config options
@@ -76,7 +61,7 @@ config:
 # spout definitions
 spouts:
   - id: "kafka-spout"
-    className: "org.apache.storm.kafka.KafkaSpout"
+    className: "org.apache.storm.kafka.spout.KafkaSpout"
     constructorArgs:
       - ref: "spoutConfig"
 

http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-examples/README.md
----------------------------------------------------------------------
diff --git a/flux/flux-examples/README.md b/flux/flux-examples/README.md
index 3d610b4..ff2b2ac 100644
--- a/flux/flux-examples/README.md
+++ b/flux/flux-examples/README.md
@@ -40,7 +40,7 @@ written in java.
 
 ### [kafka_spout.yaml](src/main/resources/kafka_spout.yaml)
 
-This example illustrates how to configure Storm's `storm-kafka` spout using Flux YAML DSL `components`, `references`,
+This example illustrates how to configure Storm's `storm-kafka-client` spout using Flux YAML DSL `components`, `references`,
 and `constructor arguments` constructs.
 
 ### [simple_hdfs.yaml](src/main/resources/simple_hdfs.yaml)

http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flux/flux-examples/pom.xml b/flux/flux-examples/pom.xml
index a9d9c1e..dca5a83 100644
--- a/flux/flux-examples/pom.xml
+++ b/flux/flux-examples/pom.xml
@@ -94,13 +94,9 @@
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-kafka</artifactId>
+            <artifactId>storm-kafka-client</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>${storm.kafka.artifact.id}</artifactId>
-        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/OnlyValueRecordTranslator.java
----------------------------------------------------------------------
diff --git a/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/OnlyValueRecordTranslator.java b/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/OnlyValueRecordTranslator.java
new file mode 100644
index 0000000..f35b6eb
--- /dev/null
+++ b/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/OnlyValueRecordTranslator.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.flux.examples;
+
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class OnlyValueRecordTranslator<K, V> implements RecordTranslator<K, V> {
+
+    @Override
+    public List<Object> apply(ConsumerRecord<K, V> record) {
+        return new Values(record.value());
+    }
+
+    @Override
+    public Fields getFieldsFor(String stream) {
+        return new Fields("value");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-examples/src/main/resources/kafka_spout.yaml
----------------------------------------------------------------------
diff --git a/flux/flux-examples/src/main/resources/kafka_spout.yaml b/flux/flux-examples/src/main/resources/kafka_spout.yaml
index 7533ce4..37f14f1 100644
--- a/flux/flux-examples/src/main/resources/kafka_spout.yaml
+++ b/flux/flux-examples/src/main/resources/kafka_spout.yaml
@@ -13,9 +13,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
-# Test ability to wire together shell spouts/bolts
 ---
 
 # topology definition
@@ -28,51 +25,31 @@ name: "kafka-topology"
 #
 # for the time being, components must be declared in the order they are referenced
 components:
-  - id: "stringScheme"
-    className: "org.apache.storm.kafka.StringScheme"
-
-  - id: "stringMultiScheme"
-    className: "org.apache.storm.spout.SchemeAsMultiScheme"
+  - id: "onlyValueRecordTranslator"
+    className: "org.apache.storm.flux.examples.OnlyValueRecordTranslator"
+    
+  - id: "spoutConfigBuilder"
+    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
     constructorArgs:
-      - ref: "stringScheme"
-
-  - id: "zkHosts"
-    className: "org.apache.storm.kafka.ZkHosts"
-    constructorArgs:
-      - "localhost:2181"
-
-# Alternative kafka config
-#  - id: "kafkaConfig"
-#    className: "org.apache.storm.kafka.KafkaConfig"
-#    constructorArgs:
-#      # brokerHosts
-#      - ref: "zkHosts"
-#      # topic
-#      - "myKafkaTopic"
-#      # clientId (optional)
-#      - "myKafkaClientId"
-
+      - "localhost:9092"
+      - ["myKafkaTopic"]
+    properties:
+      - name: "firstPollOffsetStrategy"
+        value: EARLIEST
+      - name: "recordTranslator"
+        ref: "onlyValueRecordTranslator"
+    configMethods:
+      - name: "setProp"
+        args:
+          - {
+              "key.deserializer" : "org.apache.kafka.common.serialization.StringDeserializer",
+              "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
+            }
+                
   - id: "spoutConfig"
-    className: "org.apache.storm.kafka.SpoutConfig"
+    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig"
     constructorArgs:
-      # brokerHosts
-      - ref: "zkHosts"
-      # topic
-      - "myKafkaTopic"
-      # zkRoot
-      - "/kafkaSpout"
-      # id
-      - "myId"
-    properties:
-      - name: "ignoreZkOffsets"
-        value: true
-      - name: "scheme"
-        ref: "stringMultiScheme"
-
-
-
-# NOTE: We may want to consider some level of spring integration. For example, allowing component references
-# to a spring `ApplicationContext`.
+      - ref: "spoutConfigBuilder"
 
 # topology configuration
 # this will be passed to the submitter as a map of config options
@@ -84,7 +61,7 @@ config:
 # spout definitions
 spouts:
   - id: "kafka-spout"
-    className: "org.apache.storm.kafka.KafkaSpout"
+    className: "org.apache.storm.kafka.spout.KafkaSpout"
     constructorArgs:
       - ref: "spoutConfig"
 
@@ -98,7 +75,6 @@ bolts:
       # output fields
       - ["word"]
     parallelism: 1
-    # ...
 
   - id: "log"
     className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
@@ -108,7 +84,6 @@ bolts:
   - id: "count"
     className: "org.apache.storm.testing.TestWordCounter"
     parallelism: 1
-    # ...
 
 #stream definitions
 # stream definitions define connections between spouts and bolts.


[2/2] storm git commit: Merge branch 'STORM-2971' of https://github.com/srdo/storm into asfgit-master

Posted by sr...@apache.org.
Merge branch 'STORM-2971' of https://github.com/srdo/storm into asfgit-master


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/21832ad4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/21832ad4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/21832ad4

Branch: refs/heads/master
Commit: 21832ad4098e9795d94403d25ceb5649099c5f9a
Parents: e8e1a4e 288d97c
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Mar 2 12:45:37 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Fri Mar 2 12:45:37 2018 +0100

----------------------------------------------------------------------
 flux/README.md                                  | 137 ++++++++-----------
 flux/flux-core/pom.xml                          |   2 +-
 .../java/org/apache/storm/flux/FluxBuilder.java |  27 ++--
 .../flux/test/OnlyValueRecordTranslator.java    |  37 +++++
 .../storm/flux/test/TridentTopologySource.java  |   1 -
 .../src/test/resources/configs/kafka_test.yaml  |  61 ++++-----
 flux/flux-examples/README.md                    |   2 +-
 flux/flux-examples/pom.xml                      |   6 +-
 .../examples/OnlyValueRecordTranslator.java     |  37 +++++
 .../src/main/resources/kafka_spout.yaml         |  71 ++++------
 10 files changed, 200 insertions(+), 181 deletions(-)
----------------------------------------------------------------------