You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/03/02 11:46:21 UTC
[1/2] storm git commit: STORM-2971: Replace storm-kafka with
storm-kafka-client in Flux examples,
fix Flux bug where setter argument types are not checked or coerced
Repository: storm
Updated Branches:
refs/heads/master e8e1a4e8f -> 21832ad40
STORM-2971: Replace storm-kafka with storm-kafka-client in Flux examples, fix Flux bug where setter argument types are not checked or coerced
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/288d97cb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/288d97cb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/288d97cb
Branch: refs/heads/master
Commit: 288d97cb5497f27b69b2fb126c2b0097cd02605a
Parents: a38f0c5
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sun Feb 25 16:10:35 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Thu Mar 1 08:16:21 2018 +0100
----------------------------------------------------------------------
flux/README.md | 137 ++++++++-----------
flux/flux-core/pom.xml | 2 +-
.../java/org/apache/storm/flux/FluxBuilder.java | 27 ++--
.../flux/test/OnlyValueRecordTranslator.java | 37 +++++
.../storm/flux/test/TridentTopologySource.java | 1 -
.../src/test/resources/configs/kafka_test.yaml | 61 ++++-----
flux/flux-examples/README.md | 2 +-
flux/flux-examples/pom.xml | 6 +-
.../examples/OnlyValueRecordTranslator.java | 37 +++++
.../src/main/resources/kafka_spout.yaml | 71 ++++------
10 files changed, 200 insertions(+), 181 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/README.md
----------------------------------------------------------------------
diff --git a/flux/README.md b/flux/README.md
index 5aa76ae..58ed25d 100644
--- a/flux/README.md
+++ b/flux/README.md
@@ -57,7 +57,7 @@ the layout and configuration of your topologies.
in your topology code
* Support for existing topology code (see below)
* Define Storm Core API (Spouts/Bolts) using a flexible YAML DSL
- * YAML DSL support for most Storm components (storm-kafka, storm-hdfs, storm-hbase, etc.)
+ * YAML DSL support for most Storm components (storm-kafka-client, storm-hdfs, storm-hbase, etc.)
* Convenient support for multi-lang components
* External property substitution/filtering for easily switching between configurations/environments (similar to Maven-style
`${variable.name}` substitution)
@@ -354,19 +354,20 @@ storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_co
With the following `dev.properties` file:
```properties
-kafka.zookeeper.hosts: localhost:2181
+kafka.bootstrap.hosts: localhost:9092
```
You would then be able to reference those properties by key in your `.yaml` file using `${}` syntax:
```yaml
- - id: "zkHosts"
- className: "org.apache.storm.kafka.ZkHosts"
+ - id: "spoutConfigBuilder"
+ className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
constructorArgs:
- - "${kafka.zookeeper.hosts}"
+ - "${kafka.bootstrap.hosts}"
+ - ["myKafkaTopic"]
```
-In this case, Flux would replace `${kafka.zookeeper.hosts}` with `localhost:2181` before parsing the YAML contents.
+In this case, Flux would replace `${kafka.bootstrap.hosts}` with `localhost:9092` before parsing the YAML contents.
### Environment Variable Substitution/Filtering
Flux also allows environment variable substitution. For example, if an environment variable named `ZK_HOSTS` if defined,
@@ -378,16 +379,16 @@ ${ENV-ZK_HOSTS}
## Components
Components are essentially named object instances that are made available as configuration options for spouts and
-bolts. If you are familiar with the Spring framework, components are roughly analagous to Spring beans.
+bolts. If you are familiar with the Spring framework, components are roughly analogous to Spring beans.
Every component is identified, at a minimum, by a unique identifier (String) and a class name (String). For example,
-the following will make an instance of the `org.apache.storm.kafka.StringScheme` class available as a reference under the key
-`"stringScheme"` . This assumes the `org.apache.storm.kafka.StringScheme` has a default constructor.
+the following will make an instance of the `org.apache.storm.flux.examples.OnlyValueRecordTranslator` class available as a reference under the key
+`"recordTranslator"` . This assumes the `org.apache.storm.flux.examples.OnlyValueRecordTranslator` has a default constructor.
```yaml
components:
- - id: "stringScheme"
- className: "org.apache.storm.kafka.StringScheme"
+ - id: "recordTranslator"
+ className: "org.apache.storm.flux.examples.OnlyValueRecordTranslator"
```
### Contructor Arguments, References, Properties and Configuration Methods
@@ -395,32 +396,36 @@ components:
####Constructor Arguments
Arguments to a class constructor can be configured by adding a `contructorArgs` element to a components.
`constructorArgs` is a list of objects that will be passed to the class' constructor. The following example creates an
-object by calling the constructor that takes a single string as an argument:
+object by calling the constructor that takes a string and an array of strings as arguments:
```yaml
- - id: "zkHosts"
- className: "org.apache.storm.kafka.ZkHosts"
+ - id: "spoutConfigBuilder"
+ className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
constructorArgs:
- - "localhost:2181"
- - true
+ - "${kafka.bootstrap.hosts}"
+ - ["myKafkaTopic"]
```
####References
Each component instance is identified by a unique id that allows it to be used/reused by other components. To
reference an existing component, you specify the id of the component with the `ref` tag.
-In the following example, a component with the id `"stringScheme"` is created, and later referenced, as a an argument
+In the following example, a component with the id `"recordTranslator"` is created, and later referenced, as a an argument
to another component's constructor:
```yaml
components:
- - id: "stringScheme"
- className: "org.apache.storm.kafka.StringScheme"
+ - id: "recordTranslator"
+ className: "org.apache.storm.flux.examples.OnlyValueRecordTranslator"
- - id: "stringMultiScheme"
- className: "org.apache.storm.spout.SchemeAsMultiScheme"
+ - id: "spoutConfigBuilder"
+ className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
constructorArgs:
- - ref: "stringScheme" # component with id "stringScheme" must be declared above.
+ - "localhost:9092"
+ - ["myKafkaTopic"]
+ properties:
+ - name: "recordTranslator"
+ ref: "onlyValueRecordTranslator" #Component with id "recordTranslator" must be declared above
```
You can also reference existing components in list via specifying the id of the components with the `reflist` tag.
@@ -448,27 +453,20 @@ In addition to calling constructors with different arguments, Flux also allows y
JavaBean-like setter methods and fields declared as `public`:
```yaml
- - id: "spoutConfig"
- className: "org.apache.storm.kafka.SpoutConfig"
+ - id: "spoutConfigBuilder"
+ className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
constructorArgs:
- # brokerHosts
- - ref: "zkHosts"
- # topic
- - "myKafkaTopic"
- # zkRoot
- - "/kafkaSpout"
- # id
- - "myId"
+ - "localhost:9092"
+ - ["myKafkaTopic"]
properties:
- - name: "ignoreZkOffsets"
- value: true
- - name: "scheme"
- ref: "stringMultiScheme"
+ - name: "pollTimeoutMs"
+ value: 5000
```
-In the example above, the `properties` declaration will cause Flux to look for a public method in the `SpoutConfig` with
-the signature `setForceFromStart(boolean b)` and attempt to invoke it. If a setter method is not found, Flux will then
-look for a public instance variable with the name `ignoreZkOffsets` and attempt to set its value.
+In the example above, the `properties` declaration will cause Flux to look for a public method in the `KafkaSpoutConfig$Builder` with
+the signature `setPollTimeoutMs(int i)` and attempt to invoke it. If a setter method is not found, Flux will then
+look for a public instance variable with the name `pollTimeoutMs` and attempt to set its value.
+Note that Flux will attempt to coerce actual parameter types to fit the setter parameter types, so e.g. calling `setMyFloat(float f)` with `value: 10` is possible.
References may also be used as property values.
@@ -623,46 +621,31 @@ Kafka spout example:
```yaml
components:
- - id: "stringScheme"
- className: "org.apache.storm.kafka.StringScheme"
-
- - id: "stringMultiScheme"
- className: "org.apache.storm.spout.SchemeAsMultiScheme"
- constructorArgs:
- - ref: "stringScheme"
-
- - id: "zkHosts"
- className: "org.apache.storm.kafka.ZkHosts"
+ - id: "onlyValueRecordTranslator"
+ className: "org.apache.storm.flux.examples.OnlyValueRecordTranslator"
+
+ - id: "spoutConfigBuilder"
+ className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
constructorArgs:
- - "localhost:2181"
-
-# Alternative kafka config
-# - id: "kafkaConfig"
-# className: "org.apache.storm.kafka.KafkaConfig"
-# constructorArgs:
-# # brokerHosts
-# - ref: "zkHosts"
-# # topic
-# - "myKafkaTopic"
-# # clientId (optional)
-# - "myKafkaClientId"
-
+ - "localhost:9092"
+ - ["myKafkaTopic"]
+ properties:
+ - name: "firstPollOffsetStrategy"
+ value: EARLIEST
+ - name: "recordTranslator"
+ ref: "onlyValueRecordTranslator"
+ configMethods:
+ - name: "setProp"
+ args:
+ - {
+ "key.deserializer" : "org.apache.kafka.common.serialization.StringDeserializer",
+ "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
+ }
+
- id: "spoutConfig"
- className: "org.apache.storm.kafka.SpoutConfig"
+ className: "org.apache.storm.kafka.spout.KafkaSpoutConfig"
constructorArgs:
- # brokerHosts
- - ref: "zkHosts"
- # topic
- - "myKafkaTopic"
- # zkRoot
- - "/kafkaSpout"
- # id
- - "myId"
- properties:
- - name: "ignoreZkOffsets"
- value: true
- - name: "scheme"
- ref: "stringMultiScheme"
+ - ref: "spoutConfigBuilder"
config:
topology.workers: 1
@@ -670,7 +653,7 @@ config:
# spout definitions
spouts:
- id: "kafka-spout"
- className: "org.apache.storm.kafka.KafkaSpout"
+ className: "org.apache.storm.kafka.spout.KafkaSpout"
constructorArgs:
- ref: "spoutConfig"
http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-core/pom.xml
----------------------------------------------------------------------
diff --git a/flux/flux-core/pom.xml b/flux/flux-core/pom.xml
index acdf805..d29e619 100644
--- a/flux/flux-core/pom.xml
+++ b/flux/flux-core/pom.xml
@@ -38,7 +38,7 @@
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-kafka</artifactId>
+ <artifactId>storm-kafka-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
index 20bde18..e511add 100644
--- a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
@@ -25,6 +25,7 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -277,9 +278,10 @@ public class FluxBuilder {
Object value = prop.isReference() ? context.getComponent(prop.getRef()) : prop.getValue();
Method setter = findSetter(clazz, prop.getName(), value);
if (setter != null) {
- LOG.debug("found setter, attempting to invoke");
+ Object[] methodArgs = getArgsWithListCoercion(Collections.singletonList(value), setter.getParameterTypes());
+ LOG.debug("found setter, attempting to invoke with {}", methodArgs);
// invoke setter
- setter.invoke(instance, new Object[]{value});
+ setter.invoke(instance, methodArgs);
} else {
// look for a public instance variable
LOG.debug("no setter found. Looking for a public instance variable...");
@@ -299,15 +301,20 @@ public class FluxBuilder {
private static Method findSetter(Class clazz, String property, Object arg) {
String setterName = toSetterName(property);
- Method retval = null;
Method[] methods = clazz.getMethods();
+ LOG.debug("Target setter: {}, arg: {}", setterName, arg);
for (Method method : methods) {
if (setterName.equals(method.getName())) {
- LOG.debug("Found setter method: " + method.getName());
- retval = method;
+ Class<?>[] parameterTypes = method.getParameterTypes();
+ LOG.debug("Found setter method: {}, parameter types: {}", method.getName(), parameterTypes);
+ boolean invokable = canInvokeWithArgs(Collections.singletonList(arg), method.getParameterTypes());
+ LOG.debug("** invokable --> {}", invokable);
+ if (invokable) {
+ return method;
+ }
}
}
- return retval;
+ return null;
}
private static String toSetterName(String name) {
@@ -350,7 +357,7 @@ public class FluxBuilder {
Constructor con = findCompatibleConstructor(constructorArgs, clazz);
if (con != null) {
LOG.debug("Found something seemingly compatible, attempting invocation...");
- obj = con.newInstance(getArgsWithListCoercian(constructorArgs, con.getParameterTypes()));
+ obj = con.newInstance(getArgsWithListCoercion(constructorArgs, con.getParameterTypes()));
} else {
String msg = String.format("Couldn't find a suitable constructor for class '%s' with arguments '%s'.",
clazz.getName(),
@@ -368,7 +375,7 @@ public class FluxBuilder {
}
method = findCompatibleMethod(methodArgs, clazz, def.getFactory());
if (method != null) {
- obj = method.invoke(null, getArgsWithListCoercian(methodArgs, method.getParameterTypes()));
+ obj = method.invoke(null, getArgsWithListCoercion(methodArgs, method.getParameterTypes()));
} else {
String msg = String.format("Couldn't find a suitable static method '%s' for class '%s' with arguments '%s'.",
def.getFactory(),
@@ -530,7 +537,7 @@ public class FluxBuilder {
String methodName = methodDef.getName();
Method method = findCompatibleMethod(args, clazz, methodName);
if (method != null) {
- Object[] methodArgs = getArgsWithListCoercian(args, method.getParameterTypes());
+ Object[] methodArgs = getArgsWithListCoercion(args, method.getParameterTypes());
method.invoke(instance, methodArgs);
} else {
String msg = String.format("Unable to find configuration method '%s' in class '%s' with arguments %s.",
@@ -580,7 +587,7 @@ public class FluxBuilder {
* list to an java.lang.Object array that can be used to invoke the constructor. If an argument needs
* to be coerced from a List to an Array, do so.
*/
- private static Object[] getArgsWithListCoercian(List<Object> args, Class[] parameterTypes) {
+ private static Object[] getArgsWithListCoercion(List<Object> args, Class[] parameterTypes) {
if (parameterTypes.length != args.size()) {
throw new IllegalArgumentException("Contructor parameter count does not egual argument size.");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-core/src/test/java/org/apache/storm/flux/test/OnlyValueRecordTranslator.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/test/OnlyValueRecordTranslator.java b/flux/flux-core/src/test/java/org/apache/storm/flux/test/OnlyValueRecordTranslator.java
new file mode 100644
index 0000000..1d48c22
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/test/OnlyValueRecordTranslator.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.flux.test;
+
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class OnlyValueRecordTranslator<K, V> implements RecordTranslator<K, V> {
+
+ @Override
+ public List<Object> apply(ConsumerRecord<K, V> record) {
+ return new Values(record.value());
+ }
+
+ @Override
+ public Fields getFieldsFor(String stream) {
+ return new Fields("value");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java b/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java
index 36b272b..b39d771 100644
--- a/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java
@@ -21,7 +21,6 @@ import org.apache.storm.Config;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
-import org.apache.storm.kafka.StringScheme;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-core/src/test/resources/configs/kafka_test.yaml
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/resources/configs/kafka_test.yaml b/flux/flux-core/src/test/resources/configs/kafka_test.yaml
index 1fb59ca..76d2ee8 100644
--- a/flux/flux-core/src/test/resources/configs/kafka_test.yaml
+++ b/flux/flux-core/src/test/resources/configs/kafka_test.yaml
@@ -25,46 +25,31 @@ name: "kafka-topology"
#
# for the time being, components must be declared in the order they are referenced
components:
- - id: "stringScheme"
- className: "org.apache.storm.kafka.StringScheme"
-
- - id: "stringMultiScheme"
- className: "org.apache.storm.spout.SchemeAsMultiScheme"
- constructorArgs:
- - ref: "stringScheme"
-
- - id: "zkHosts"
- className: "org.apache.storm.kafka.ZkHosts"
+ - id: "onlyValueRecordTranslator"
+ className: "org.apache.storm.flux.test.OnlyValueRecordTranslator"
+
+ - id: "spoutConfigBuilder"
+ className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
constructorArgs:
- - "localhost:2181"
-
-# Alternative kafka config
-# - id: "kafkaConfig"
-# className: "org.apache.storm.kafka.KafkaConfig"
-# constructorArgs:
-# # brokerHosts
-# - ref: "zkHosts"
-# # topic
-# - "myKafkaTopic"
-# # clientId (optional)
-# - "myKafkaClientId"
-
+ - "localhost:9092"
+ - ["myKafkaTopic"]
+ properties:
+ - name: "firstPollOffsetStrategy"
+ value: EARLIEST
+ - name: "recordTranslator"
+ ref: "onlyValueRecordTranslator"
+ configMethods:
+ - name: "setProp"
+ args:
+ - {
+ "key.deserializer" : "org.apache.kafka.common.serialization.StringDeserializer",
+ "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
+ }
+
- id: "spoutConfig"
- className: "org.apache.storm.kafka.SpoutConfig"
+ className: "org.apache.storm.kafka.spout.KafkaSpoutConfig"
constructorArgs:
- # brokerHosts
- - ref: "zkHosts"
- # topic
- - "myKafkaTopic"
- # zkRoot
- - "/kafkaSpout"
- # id
- - "myId"
- properties:
- - name: "ignoreZkOffsets"
- value: true
- - name: "scheme"
- ref: "stringMultiScheme"
+ - ref: "spoutConfigBuilder"
# topology configuration
# this will be passed to the submitter as a map of config options
@@ -76,7 +61,7 @@ config:
# spout definitions
spouts:
- id: "kafka-spout"
- className: "org.apache.storm.kafka.KafkaSpout"
+ className: "org.apache.storm.kafka.spout.KafkaSpout"
constructorArgs:
- ref: "spoutConfig"
http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-examples/README.md
----------------------------------------------------------------------
diff --git a/flux/flux-examples/README.md b/flux/flux-examples/README.md
index 3d610b4..ff2b2ac 100644
--- a/flux/flux-examples/README.md
+++ b/flux/flux-examples/README.md
@@ -40,7 +40,7 @@ written in java.
### [kafka_spout.yaml](src/main/resources/kafka_spout.yaml)
-This example illustrates how to configure Storm's `storm-kafka` spout using Flux YAML DSL `components`, `references`,
+This example illustrates how to configure Storm's `storm-kafka-client` spout using Flux YAML DSL `components`, `references`,
and `constructor arguments` constructs.
### [simple_hdfs.yaml](src/main/resources/simple_hdfs.yaml)
http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flux/flux-examples/pom.xml b/flux/flux-examples/pom.xml
index a9d9c1e..dca5a83 100644
--- a/flux/flux-examples/pom.xml
+++ b/flux/flux-examples/pom.xml
@@ -94,13 +94,9 @@
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-kafka</artifactId>
+ <artifactId>storm-kafka-client</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>${storm.kafka.artifact.id}</artifactId>
- </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/OnlyValueRecordTranslator.java
----------------------------------------------------------------------
diff --git a/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/OnlyValueRecordTranslator.java b/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/OnlyValueRecordTranslator.java
new file mode 100644
index 0000000..f35b6eb
--- /dev/null
+++ b/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/OnlyValueRecordTranslator.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.flux.examples;
+
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class OnlyValueRecordTranslator<K, V> implements RecordTranslator<K, V> {
+
+ @Override
+ public List<Object> apply(ConsumerRecord<K, V> record) {
+ return new Values(record.value());
+ }
+
+ @Override
+ public Fields getFieldsFor(String stream) {
+ return new Fields("value");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-examples/src/main/resources/kafka_spout.yaml
----------------------------------------------------------------------
diff --git a/flux/flux-examples/src/main/resources/kafka_spout.yaml b/flux/flux-examples/src/main/resources/kafka_spout.yaml
index 7533ce4..37f14f1 100644
--- a/flux/flux-examples/src/main/resources/kafka_spout.yaml
+++ b/flux/flux-examples/src/main/resources/kafka_spout.yaml
@@ -13,9 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-
-# Test ability to wire together shell spouts/bolts
---
# topology definition
@@ -28,51 +25,31 @@ name: "kafka-topology"
#
# for the time being, components must be declared in the order they are referenced
components:
- - id: "stringScheme"
- className: "org.apache.storm.kafka.StringScheme"
-
- - id: "stringMultiScheme"
- className: "org.apache.storm.spout.SchemeAsMultiScheme"
+ - id: "onlyValueRecordTranslator"
+ className: "org.apache.storm.flux.examples.OnlyValueRecordTranslator"
+
+ - id: "spoutConfigBuilder"
+ className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
constructorArgs:
- - ref: "stringScheme"
-
- - id: "zkHosts"
- className: "org.apache.storm.kafka.ZkHosts"
- constructorArgs:
- - "localhost:2181"
-
-# Alternative kafka config
-# - id: "kafkaConfig"
-# className: "org.apache.storm.kafka.KafkaConfig"
-# constructorArgs:
-# # brokerHosts
-# - ref: "zkHosts"
-# # topic
-# - "myKafkaTopic"
-# # clientId (optional)
-# - "myKafkaClientId"
-
+ - "localhost:9092"
+ - ["myKafkaTopic"]
+ properties:
+ - name: "firstPollOffsetStrategy"
+ value: EARLIEST
+ - name: "recordTranslator"
+ ref: "onlyValueRecordTranslator"
+ configMethods:
+ - name: "setProp"
+ args:
+ - {
+ "key.deserializer" : "org.apache.kafka.common.serialization.StringDeserializer",
+ "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
+ }
+
- id: "spoutConfig"
- className: "org.apache.storm.kafka.SpoutConfig"
+ className: "org.apache.storm.kafka.spout.KafkaSpoutConfig"
constructorArgs:
- # brokerHosts
- - ref: "zkHosts"
- # topic
- - "myKafkaTopic"
- # zkRoot
- - "/kafkaSpout"
- # id
- - "myId"
- properties:
- - name: "ignoreZkOffsets"
- value: true
- - name: "scheme"
- ref: "stringMultiScheme"
-
-
-
-# NOTE: We may want to consider some level of spring integration. For example, allowing component references
-# to a spring `ApplicationContext`.
+ - ref: "spoutConfigBuilder"
# topology configuration
# this will be passed to the submitter as a map of config options
@@ -84,7 +61,7 @@ config:
# spout definitions
spouts:
- id: "kafka-spout"
- className: "org.apache.storm.kafka.KafkaSpout"
+ className: "org.apache.storm.kafka.spout.KafkaSpout"
constructorArgs:
- ref: "spoutConfig"
@@ -98,7 +75,6 @@ bolts:
# output fields
- ["word"]
parallelism: 1
- # ...
- id: "log"
className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
@@ -108,7 +84,6 @@ bolts:
- id: "count"
className: "org.apache.storm.testing.TestWordCounter"
parallelism: 1
- # ...
#stream definitions
# stream definitions define connections between spouts and bolts.
[2/2] storm git commit: Merge branch 'STORM-2971' of
https://github.com/srdo/storm into asfgit-master
Posted by sr...@apache.org.
Merge branch 'STORM-2971' of https://github.com/srdo/storm into asfgit-master
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/21832ad4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/21832ad4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/21832ad4
Branch: refs/heads/master
Commit: 21832ad4098e9795d94403d25ceb5649099c5f9a
Parents: e8e1a4e 288d97c
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Mar 2 12:45:37 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Fri Mar 2 12:45:37 2018 +0100
----------------------------------------------------------------------
flux/README.md | 137 ++++++++-----------
flux/flux-core/pom.xml | 2 +-
.../java/org/apache/storm/flux/FluxBuilder.java | 27 ++--
.../flux/test/OnlyValueRecordTranslator.java | 37 +++++
.../storm/flux/test/TridentTopologySource.java | 1 -
.../src/test/resources/configs/kafka_test.yaml | 61 ++++-----
flux/flux-examples/README.md | 2 +-
flux/flux-examples/pom.xml | 6 +-
.../examples/OnlyValueRecordTranslator.java | 37 +++++
.../src/main/resources/kafka_spout.yaml | 71 ++++------
10 files changed, 200 insertions(+), 181 deletions(-)
----------------------------------------------------------------------