You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/07/22 13:48:16 UTC
[2/2] flink git commit: [FLINK-2304] Add named attribute access to
Storm compatibility layer - extended FlinkTuple to enable named attribute
access - extended BoltWrapper for user defined input schema - extended
FlinkTopologyBuilder to handle decla
[FLINK-2304] Add named attribute access to Storm compatibility layer
- extended FlinkTuple to enable named attribute access
- extended BoltWrapper for user defined input schema
- extended FlinkTopologyBuilder to handle declared output schemas
- adapted JUnit tests
- added new examples and ITCases
- updated READMEs
- updated documentation
Closes #878
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/03320503
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/03320503
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/03320503
Branch: refs/heads/master
Commit: 03320503e20c2038412c889d9d8b61821d4963af
Parents: 148395b
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Mon Jun 29 17:50:07 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Jul 22 13:47:22 2015 +0200
----------------------------------------------------------------------
docs/apis/storm_compatibility.md | 33 +-
.../flink-storm-compatibility-core/README.md | 1 -
.../api/FlinkOutputFieldsDeclarer.java | 2 +-
.../api/FlinkTopologyBuilder.java | 8 +-
.../wrappers/AbstractStormCollector.java | 2 +-
.../wrappers/StormBoltWrapper.java | 75 +++-
.../stormcompatibility/wrappers/StormTuple.java | 110 ++++-
.../wrappers/StormBoltWrapperTest.java | 10 +-
.../wrappers/StormTupleTest.java | 444 ++++++++++++++++---
.../README.md | 6 +-
.../stormcompatibility/util/StormFileSpout.java | 2 +-
.../util/StormInMemorySpout.java | 2 +-
.../util/StormWordCountFileSpout.java | 38 ++
.../util/StormWordCountInMemorySpout.java | 39 ++
.../wordcount/BoltTokenizerWordCount.java | 4 +-
.../wordcount/BoltTokenizerWordCountPojo.java | 135 ++++++
.../BoltTokenizerWordCountWithNames.java | 138 ++++++
.../wordcount/SpoutSourceWordCount.java | 4 +-
.../wordcount/StormWordCountNamedLocal.java | 76 ++++
.../wordcount/WordCountTopology.java | 34 +-
.../stormoperators/StormBoltCounterByName.java | 88 ++++
.../stormoperators/StormBoltTokenizer.java | 2 +
.../StormBoltTokenizerByName.java | 78 ++++
.../stormoperators/WordCountDataPojos.java | 59 +++
.../stormoperators/WordCountDataTuple.java | 34 ++
.../BoltTokenizerWordCountPojoITCase.java | 45 ++
.../BoltTokenizerWordCountWithNamesITCase.java | 45 ++
.../StormWordCountLocalNamedITCase.java | 48 ++
28 files changed, 1430 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
index 0f6b17b..b8fe66e 100644
--- a/docs/apis/storm_compatibility.md
+++ b/docs/apis/storm_compatibility.md
@@ -34,7 +34,7 @@ This document shows how to use existing Storm code with Flink.
* This will be replaced by the TOC
{:toc}
-### Project Configuration
+# Project Configuration
Support for Storm is contained in the `flink-storm-compatibility-core` Maven module.
The code resides in the `org.apache.flink.stormcompatibility` package.
@@ -51,7 +51,7 @@ Add the following dependency to your `pom.xml` if you want to execute Storm code
**Please note**: `flink-storm-compatibility-core` is not part of the provided binary Flink distribution. Thus, you need to include `flink-storm-compatiblitly-core` classes (and their dependencies) in your program jar that is submitted to Flink's JobManager.
-### Execute Storm Topologies
+# Execute Storm Topologies
Flink provides a Storm compatible API (`org.apache.flink.stormcompatibility.api`) that offers replacements for the following classes:
@@ -88,18 +88,18 @@ if(runLocal) { // submit to test cluster
</div>
</div>
-### Embed Storm Operators in Flink Streaming Programs
+# Embed Storm Operators in Flink Streaming Programs
As an alternative, Spouts and Bolts can be embedded into regular streaming programs.
The Storm compatibility layer offers a wrapper classes for each, namely `StormSpoutWrapper` and `StormBoltWrapper` (`org.apache.flink.stormcompatibility.wrappers`).
-Per default, both wrappers convert Storm output tuples to Flink's `Tuple` types (ie, `Tuple1` to `Tuple25` according to the number of fields of the Storm tuples).
+Per default, both wrappers convert Storm output tuples to Flink's [Tuple](programming_guide.html#tuples-and-case-classes) types (ie, `Tuple1` to `Tuple25` according to the number of fields of the Storm tuples).
For single field output tuples a conversion to the field's data type is also possible (eg, `String` instead of `Tuple1<String>`).
Because Flink cannot infer the output field types of Storm operators, it is required to specify the output type manually.
In order to get the correct `TypeInformation` object, Flink's `TypeExtractor` can be used.
-#### Embed Spouts
+## Embed Spouts
In order to use a Spout as Flink source, use `StreamExecutionEnvironment.addSource(SourceFunction, TypeInformation)`.
The Spout object is handed to the constructor of `StormSpoutWrapper<OUT>` that serves as first argument to `addSource(...)`.
@@ -126,7 +126,7 @@ Using `StormFiniteSpoutWrapper` allows the Flink program to shut down automatica
If `StormSpoutWrapper` is used, the program will run until it is [canceled](cli.html) manually.
-#### Embed Bolts
+## Embed Bolts
In order to use a Bolt as Flink operator, use `DataStream.transform(String, TypeInformation, OneInputStreamOperator)`.
The Bolt object is handed to the constructor of `StormBoltWrapper<IN,OUT>` that serves as last argument to `transform(...)`.
@@ -149,7 +149,26 @@ DataStream<Tuple2<String, Integer>> counts = text.transform(
</div>
</div>
-### Storm Compatibility Examples
+### Named Attribute Access for Embedded Bolts
+
+Bolts can accesses input tuple fields via name (additionally to access via index).
+To use this feature with embedded Bolts, you need to have either a
+
+ 1. [POJO](programming_guide.html#pojos) type input stream or
+ 2. [Tuple](programming_guide.html#tuples-and-case-classes) type input stream and spedify the input schema (ie, name-to-index-mapping)
+
+For POJO input types, Flink accesses the fields via reflection.
+For this case, Flink expects either a corresponding public member variable or public getter method.
+For example, if a Bolt accesses a field via name `sentence` (eg, `String s = input.getStringByField("sentence");`), the input POJO class must have a member variable `public String sentence;` or method `public String getSentence() { ... };` (pay attention to camel-case naming).
+
+For `Tuple` input types, it is required to specify the input schema using Storm's `Fields` class.
+For this case, the constructor of `StormBoltWrapper` takes an additional argument: `new StormBoltWrapper<Tuple1<String>, Tuple2<String, Integer>>(new StormBoltTokenizerByName(), new Fields("sentence"))`.
+The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.getStringByField("sentence")` is equivalent to `input.getString(0)`.
+
+See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java) for examples.
+
+# Storm Compatibility Examples
You can find more examples in Maven module `flink-storm-compatibilty-examples`.
+For the different versions of WordCount, see [README.md](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md).
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
index 0d490a3..04d8934 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
@@ -5,7 +5,6 @@ The Storm compatibility layer allows to embed spouts or bolt unmodified within a
The following Strom features are not (yet/fully) supported by the compatibility layer right now:
* the spout/bolt configuration within `open()`/`prepare()` is not yet supported (ie, `Map conf` parameter)
* topology and tuple meta information (ie, `TopologyContext` not fully supported)
-* access to tuple attributes (ie, fields) only by index (access by name is coming)
* only default stream is supported currently (ie, only a single output stream)
* no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring is ignored)
* for whole Storm topologies the following is not supported by Flink:
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
index 7661ab9..49d73f8 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
@@ -37,7 +37,7 @@ import java.util.List;
final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
/** the declared output schema */
- private Fields outputSchema;
+ Fields outputSchema;
@Override
public void declare(final Fields fields) {
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index 239c5eb..6c39561 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -30,6 +30,8 @@ import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.IRichStateSpout;
import backtype.storm.topology.SpoutDeclarer;
import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
@@ -60,6 +62,8 @@ public class FlinkTopologyBuilder {
private final HashMap<String, IRichSpout> spouts = new HashMap<String, IRichSpout>();
/** All user bolts by their ID */
private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>();
+ /** All declared output schemas by operator ID */
+ private final HashMap<String, Fields> outputSchemas = new HashMap<String, Fields>();
/**
* Creates a Flink program that used the specified spouts and bolts.
@@ -79,6 +83,7 @@ public class FlinkTopologyBuilder {
final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
userSpout.declareOutputFields(declarer);
+ this.outputSchemas.put(spoutId, declarer.outputSchema);
/* TODO in order to support multiple output streams, use an additional wrapper (or modify StormSpoutWrapper
* and StormCollector)
@@ -118,6 +123,7 @@ public class FlinkTopologyBuilder {
final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
userBolt.declareOutputFields(declarer);
+ this.outputSchemas.put(boltId, declarer.outputSchema);
final ComponentCommon common = stormTopolgoy.get_bolts().get(boltId).get_common();
@@ -162,7 +168,7 @@ public class FlinkTopologyBuilder {
final TypeInformation<?> outType = declarer.getOutputType();
final SingleOutputStreamOperator operator = inputDataStream.transform(boltId, outType,
- new StormBoltWrapper(userBolt));
+ new StormBoltWrapper(userBolt, this.outputSchemas.get(producerId)));
if (outType != null) {
// only for non-sink nodes
availableOperators.put(boltId, operator);
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
index e8048b0..dc77ca1 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
@@ -68,7 +68,7 @@ abstract class AbstractStormCollector<OUT> {
}
} else {
throw new UnsupportedOperationException(
- "SimpleStormBoltWrapper can handle not more then 25 attributes, but "
+ "Flink cannot handle more then 25 attributes, but "
+ this.numberOfAttributes + " are declared by the given bolt");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
index c7b87ba..8bcdae0 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.flink.stormcompatibility.wrappers;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
+import backtype.storm.tuple.Fields;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
@@ -29,7 +29,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
@@ -53,6 +52,8 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
private final IRichBolt bolt;
/** Number of attributes of the bolt's output tuples */
private final int numberOfAttributes;
+ /** The schema (ie, ordered field names) of the input stream. */
+ private final Fields inputSchema;
/**
* We have to use this because Operators must output
@@ -61,9 +62,10 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
private TimestampedCollector<OUT> flinkCollector;
/**
- * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt}
- * such that it can be used within a Flink streaming program. The output type will be one of
- * {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of attributes.
+ * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
+ * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
+ * for POJO input types. The output type will be one of {@link Tuple1} to {@link Tuple25} depending on the bolt's
+ * declared number of attributes.
*
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
@@ -71,15 +73,33 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
* If the number of declared output attributes is not with range [1;25].
*/
public StormBoltWrapper(final IRichBolt bolt) throws IllegalArgumentException {
- this(bolt, false);
+ this(bolt, null, false);
}
/**
- * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt}
- * such that it can be used within a Flink streaming program. The output type can be any type if
- * parameter {@code rawOutput} is {@code true} and the bolt's number of declared output tuples
- * is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple1} to
- * {@link Tuple25} depending on the bolt's declared number of attributes.
+ * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
+ * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
+ * {@link Tuple1} to {@link Tuple25}. The output type will be one of {@link Tuple1} to {@link Tuple25} depending on
+ * the bolt's declared number of attributes.
+ *
+ * @param bolt
+ * The Storm {@link IRichBolt bolt} to be used.
+ * @param inputSchema
+ * The schema (ie, ordered field names) of the input stream.
+ * @throws IllegalArgumentException
+ * If the number of declared output attributes is not with range [1;25].
+ */
+ public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema)
+ throws IllegalArgumentException {
+ this(bolt, inputSchema, false);
+ }
+
+ /**
+ * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
+ * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
+ * for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the
+ * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one
+ * of {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of attributes.
*
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
@@ -91,8 +111,34 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
* not 1 or if {@code rawOuput} is {@code false} and the number of declared output
* attributes is not with range [1;25].
*/
- public StormBoltWrapper(final IRichBolt bolt, final boolean rawOutput) throws IllegalArgumentException {
+ public StormBoltWrapper(final IRichBolt bolt, final boolean rawOutput)
+ throws IllegalArgumentException {
+ this(bolt, null, rawOutput);
+ }
+
+ /**
+ * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
+ * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
+ * {@link Tuple1} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
+ * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
+ * be one of {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of attributes.
+ *
+ * @param bolt
+ * The Storm {@link IRichBolt bolt} to be used.
+ * @param inputSchema
+ * The schema (ie, ordered field names) of the input stream.
+ * @param rawOutput
+ * Set to {@code true} if a single attribute output stream, should not be of type
+ * {@link Tuple1} but be of a raw type.
+ * @throws IllegalArgumentException
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is
+ * not 1 or if {@code rawOuput} is {@code false} and the number of declared output
+ * attributes is not with range [1;25].
+ */
+ public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema, final boolean rawOutput)
+ throws IllegalArgumentException {
this.bolt = bolt;
+ this.inputSchema = inputSchema;
this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutput);
}
@@ -101,7 +147,7 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
super.open(parameters);
final TopologyContext topologyContext = StormWrapperSetupHelper.convertToTopologyContext(
- (StreamingRuntimeContext)super.runtimeContext, false);
+ super.runtimeContext, false);
flinkCollector = new TimestampedCollector<OUT>(output);
OutputCollector stormCollector = null;
@@ -122,11 +168,12 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
@Override
public void processElement(final StreamRecord<IN> element) throws Exception {
flinkCollector.setTimestamp(element.getTimestamp());
- this.bolt.execute(new StormTuple<IN>(element.getValue()));
+ this.bolt.execute(new StormTuple<IN>(element.getValue(), inputSchema));
}
@Override
public void processWatermark(Watermark mark) throws Exception {
output.emitWatermark(mark);
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
index 51db745..07d94b4 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
@@ -30,6 +30,8 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Values;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.List;
/**
@@ -37,17 +39,23 @@ import java.util.List;
*/
class StormTuple<IN> implements backtype.storm.tuple.Tuple {
- /** The storm representation of the original Flink tuple */
+ /** The Storm representation of the original Flink tuple */
private final Values stormTuple;
+ /** The schema (ie, ordered field names) of the tuple */
+ private final Fields schema;
/**
- * Create a new Storm tuple from the given Flink tuple.
- *
+ * Create a new Storm tuple from the given Flink tuple. The provided {@code nameIndexMap} is ignored for raw input
+ * types.
+ *
* @param flinkTuple
* The Flink tuple to be converted.
+ * @param schema
+ * The schema (ie, ordered field names) of the tuple.
*/
- public StormTuple(final IN flinkTuple) {
+ public StormTuple(final IN flinkTuple, final Fields schema) {
if (flinkTuple instanceof org.apache.flink.api.java.tuple.Tuple) {
+ this.schema = schema;
final org.apache.flink.api.java.tuple.Tuple t = (org.apache.flink.api.java.tuple.Tuple) flinkTuple;
final int numberOfAttributes = t.getArity();
@@ -56,6 +64,7 @@ class StormTuple<IN> implements backtype.storm.tuple.Tuple {
this.stormTuple.add(t.getField(i));
}
} else {
+ this.schema = null;
this.stormTuple = new Values(flinkTuple);
}
}
@@ -67,22 +76,38 @@ class StormTuple<IN> implements backtype.storm.tuple.Tuple {
@Override
public boolean contains(final String field) {
- throw new UnsupportedOperationException("Not implemented yet");
+ if (this.schema != null) {
+ return this.schema.contains(field);
+ }
+
+ try {
+ this.getPublicMemberField(field);
+ return true;
+ } catch (NoSuchFieldException f) {
+ try {
+ this.getGetterMethod(field);
+ return true;
+ } catch (Exception g) {
+ return false;
+ }
+ } catch (Exception e) {
+ return false;
+ }
}
@Override
public Fields getFields() {
- throw new UnsupportedOperationException("Not implemented yet");
+ return this.schema;
}
@Override
public int fieldIndex(final String field) {
- throw new UnsupportedOperationException("Not implemented yet");
+ return this.schema.fieldIndex(field);
}
@Override
public List<Object> select(final Fields selector) {
- throw new UnsupportedOperationException("Not implemented yet");
+ return this.schema.select(selector, this.stormTuple);
}
@Override
@@ -135,54 +160,103 @@ class StormTuple<IN> implements backtype.storm.tuple.Tuple {
return (byte[]) this.stormTuple.get(i);
}
+ private Field getPublicMemberField(final String field) throws Exception {
+ assert (this.stormTuple.size() == 1);
+ return this.stormTuple.get(0).getClass().getField(field);
+ }
+
+ private Method getGetterMethod(final String field) throws Exception {
+ assert (this.stormTuple.size() == 1);
+ return this.stormTuple
+ .get(0)
+ .getClass()
+ .getMethod("get" + Character.toUpperCase(field.charAt(0)) + field.substring(1),
+ (Class<?>[]) null);
+ }
+
+ private Object getValueByPublicMember(final String field) throws Exception {
+ assert (this.stormTuple.size() == 1);
+ return getPublicMemberField(field).get(this.stormTuple.get(0));
+ }
+
+ private Object getValueByGetter(final String field) throws Exception {
+ assert (this.stormTuple.size() == 1);
+ return getGetterMethod(field).invoke(this.stormTuple.get(0), (Object[]) null);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T getValueByName(final String field) {
+ if (this.schema != null) {
+ return (T) this.getValue(this.schema.fieldIndex(field));
+ }
+ assert (this.stormTuple.size() == 1);
+
+ Exception e;
+ try {
+ // try public member
+ return (T) getValueByPublicMember(field);
+ } catch (NoSuchFieldException f) {
+ try {
+ // try getter-method
+ return (T) getValueByGetter(field);
+ } catch (Exception g) {
+ e = g;
+ }
+ } catch (Exception f) {
+ e = f;
+ }
+
+ throw new RuntimeException("Could not access field <" + field + ">", e);
+ }
+
@Override
public Object getValueByField(final String field) {
- throw new UnsupportedOperationException("Not implemented yet");
+ return getValueByName(field);
}
@Override
public String getStringByField(final String field) {
- throw new UnsupportedOperationException("Not implemented yet");
+ return getValueByName(field);
}
@Override
public Integer getIntegerByField(final String field) {
- throw new UnsupportedOperationException("Not implemented yet");
+ return getValueByName(field);
}
@Override
public Long getLongByField(final String field) {
- throw new UnsupportedOperationException("Not implemented yet");
+ return getValueByName(field);
}
@Override
public Boolean getBooleanByField(final String field) {
- throw new UnsupportedOperationException("Not implemented yet");
+ return getValueByName(field);
}
@Override
public Short getShortByField(final String field) {
- throw new UnsupportedOperationException("Not implemented yet");
+ return getValueByName(field);
}
@Override
public Byte getByteByField(final String field) {
- throw new UnsupportedOperationException("Not implemented yet");
+ return getValueByName(field);
}
@Override
public Double getDoubleByField(final String field) {
- throw new UnsupportedOperationException("Not implemented yet");
+ return getValueByName(field);
}
@Override
public Float getFloatByField(final String field) {
- throw new UnsupportedOperationException("Not implemented yet");
+ return getValueByName(field);
}
@Override
public byte[] getBinaryByField(final String field) {
- throw new UnsupportedOperationException("Not implemented yet");
+ return getValueByName(field);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
index dd56c4d..3e55d23 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
@@ -47,7 +47,6 @@ import static org.mockito.Mockito.when;
@PrepareForTest({StreamRecordSerializer.class, StormWrapperSetupHelper.class})
public class StormBoltWrapperTest {
- @SuppressWarnings("unused")
@Test(expected = IllegalArgumentException.class)
public void testWrapperRawType() throws Exception {
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
@@ -57,7 +56,6 @@ public class StormBoltWrapperTest {
new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), true);
}
- @SuppressWarnings("unused")
@Test(expected = IllegalArgumentException.class)
public void testWrapperToManyAttributes1() throws Exception {
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
@@ -71,7 +69,6 @@ public class StormBoltWrapperTest {
new StormBoltWrapper<Object, Object>(mock(IRichBolt.class));
}
- @SuppressWarnings("unused")
@Test(expected = IllegalArgumentException.class)
public void testWrapperToManyAttributes2() throws Exception {
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
@@ -127,16 +124,17 @@ public class StormBoltWrapperTest {
declarer.declare(new Fields(schema));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
- final StormBoltWrapper wrapper = new StormBoltWrapper(bolt);
+ final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, null);
wrapper.setup(mock(Output.class), taskContext);
wrapper.open(new Configuration());
wrapper.processElement(record);
if (numberOfAttributes == 0) {
- verify(bolt).execute(eq(new StormTuple<String>(rawTuple)));
+ verify(bolt).execute(eq(new StormTuple<String>(rawTuple, null)));
} else {
- verify(bolt).execute(eq(new StormTuple<Tuple>(flinkTuple)));
+ verify(bolt).execute(eq(new StormTuple<Tuple>(flinkTuple, null)));
}
+
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
index 14b1c60..96e7b35 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
@@ -21,17 +21,36 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import java.util.ArrayList;
import java.util.List;
+import static org.mockito.Mockito.mock;
+
public class StormTupleTest extends AbstractTest {
+ private static final String fieldName = "fieldName";
+ private static final String fieldNamePojo = "member";
+
+ private int arity, index;
+
+ @Override
+ @Before
+ public void prepare() {
+ super.prepare();
+ this.arity = 1 + r.nextInt(25);
+ this.index = r.nextInt(this.arity);
+ }
@Test
public void nonTupleTest() {
final Object flinkTuple = this.r.nextInt();
- final StormTuple<Object> tuple = new StormTuple<Object>(flinkTuple);
+ final StormTuple<Object> tuple = new StormTuple<Object>(flinkTuple, null);
Assert.assertSame(flinkTuple, tuple.getValue(0));
final List<Object> values = tuple.getValues();
@@ -50,7 +69,7 @@ public class StormTupleTest extends AbstractTest {
flinkTuple.setField(data[i], i);
}
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
final List<Object> values = tuple.getValues();
Assert.assertEquals(numberOfAttributes, values.size());
@@ -70,7 +89,7 @@ public class StormTupleTest extends AbstractTest {
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index));
}
@@ -78,7 +97,7 @@ public class StormTupleTest extends AbstractTest {
public void testBoolean() {
final Boolean flinkTuple = this.r.nextBoolean();
- final StormTuple<Boolean> tuple = new StormTuple<Boolean>(flinkTuple);
+ final StormTuple<Boolean> tuple = new StormTuple<Boolean>(flinkTuple, null);
Assert.assertEquals(flinkTuple, tuple.getBoolean(0));
}
@@ -86,7 +105,7 @@ public class StormTupleTest extends AbstractTest {
public void testByte() {
final Byte flinkTuple = (byte) this.r.nextInt();
- final StormTuple<Byte> tuple = new StormTuple<Byte>(flinkTuple);
+ final StormTuple<Byte> tuple = new StormTuple<Byte>(flinkTuple, null);
Assert.assertEquals(flinkTuple, tuple.getByte(0));
}
@@ -94,7 +113,7 @@ public class StormTupleTest extends AbstractTest {
public void testDouble() {
final Double flinkTuple = this.r.nextDouble();
- final StormTuple<Double> tuple = new StormTuple<Double>(flinkTuple);
+ final StormTuple<Double> tuple = new StormTuple<Double>(flinkTuple, null);
Assert.assertEquals(flinkTuple, tuple.getDouble(0));
}
@@ -102,7 +121,7 @@ public class StormTupleTest extends AbstractTest {
public void testFloat() {
final Float flinkTuple = this.r.nextFloat();
- final StormTuple<Float> tuple = new StormTuple<Float>(flinkTuple);
+ final StormTuple<Float> tuple = new StormTuple<Float>(flinkTuple, null);
Assert.assertEquals(flinkTuple, tuple.getFloat(0));
}
@@ -110,7 +129,7 @@ public class StormTupleTest extends AbstractTest {
public void testInteger() {
final Integer flinkTuple = this.r.nextInt();
- final StormTuple<Integer> tuple = new StormTuple<Integer>(flinkTuple);
+ final StormTuple<Integer> tuple = new StormTuple<Integer>(flinkTuple, null);
Assert.assertEquals(flinkTuple, tuple.getInteger(0));
}
@@ -118,7 +137,7 @@ public class StormTupleTest extends AbstractTest {
public void testLong() {
final Long flinkTuple = this.r.nextLong();
- final StormTuple<Long> tuple = new StormTuple<Long>(flinkTuple);
+ final StormTuple<Long> tuple = new StormTuple<Long>(flinkTuple, null);
Assert.assertEquals(flinkTuple, tuple.getLong(0));
}
@@ -126,7 +145,7 @@ public class StormTupleTest extends AbstractTest {
public void testShort() {
final Short flinkTuple = (short) this.r.nextInt();
- final StormTuple<Short> tuple = new StormTuple<Short>(flinkTuple);
+ final StormTuple<Short> tuple = new StormTuple<Short>(flinkTuple, null);
Assert.assertEquals(flinkTuple, tuple.getShort(0));
}
@@ -136,7 +155,7 @@ public class StormTupleTest extends AbstractTest {
this.r.nextBytes(data);
final String flinkTuple = new String(data);
- final StormTuple<String> tuple = new StormTuple<String>(flinkTuple);
+ final StormTuple<String> tuple = new StormTuple<String>(flinkTuple, null);
Assert.assertEquals(flinkTuple, tuple.getString(0));
}
@@ -149,7 +168,7 @@ public class StormTupleTest extends AbstractTest {
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index));
}
@@ -161,7 +180,7 @@ public class StormTupleTest extends AbstractTest {
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
Assert.assertEquals(flinkTuple.getField(index), tuple.getBoolean(index));
}
@@ -173,7 +192,7 @@ public class StormTupleTest extends AbstractTest {
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
Assert.assertEquals(flinkTuple.getField(index), tuple.getByte(index));
}
@@ -185,7 +204,7 @@ public class StormTupleTest extends AbstractTest {
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
Assert.assertEquals(flinkTuple.getField(index), tuple.getDouble(index));
}
@@ -197,7 +216,7 @@ public class StormTupleTest extends AbstractTest {
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
Assert.assertEquals(flinkTuple.getField(index), tuple.getFloat(index));
}
@@ -209,7 +228,7 @@ public class StormTupleTest extends AbstractTest {
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
Assert.assertEquals(flinkTuple.getField(index), tuple.getInteger(index));
}
@@ -221,7 +240,7 @@ public class StormTupleTest extends AbstractTest {
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
Assert.assertEquals(flinkTuple.getField(index), tuple.getLong(index));
}
@@ -233,7 +252,7 @@ public class StormTupleTest extends AbstractTest {
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
Assert.assertEquals(flinkTuple.getField(index), tuple.getShort(index));
}
@@ -247,103 +266,394 @@ public class StormTupleTest extends AbstractTest {
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
flinkTuple.setField(data, index);
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
Assert.assertEquals(flinkTuple.getField(index), tuple.getString(index));
}
- @Test(expected = UnsupportedOperationException.class)
- public void testContains() {
- new StormTuple<Object>(null).contains(null);
+ @Test
+ public void testContains() throws Exception {
+ Fields schema = new Fields("a1", "a2");
+ StormTuple<Object> tuple = new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
+ schema);
+
+ Assert.assertTrue(tuple.contains("a1"));
+ Assert.assertTrue(tuple.contains("a2"));
+ Assert.assertFalse(tuple.contains("a3"));
}
- @Test(expected = UnsupportedOperationException.class)
- public void testGetFields() {
- new StormTuple<Object>(null).getFields();
+ @Test
+ public void testGetFields() throws Exception {
+ Fields schema = new Fields();
+
+ Assert.assertSame(schema, new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
+ schema).getFields());
+
+ Assert.assertSame(null, new StormTuple<Object>(null, schema).getFields());
+
}
- @Test(expected = UnsupportedOperationException.class)
- public void testFieldIndex() {
- new StormTuple<Object>(null).fieldIndex(null);
+ @Test
+ public void testFieldIndex() throws Exception {
+ Fields schema = new Fields("a1", "a2");
+ StormTuple<Object> tuple = new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
+ schema);
+
+ Assert.assertEquals(0, tuple.fieldIndex("a1"));
+ Assert.assertEquals(1, tuple.fieldIndex("a2"));
}
- @Test(expected = UnsupportedOperationException.class)
- public void testSelect() {
- new StormTuple<Object>(null).select(null);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testSelect() throws Exception {
+ Tuple tuple = Tuple.getTupleClass(arity).newInstance();
+ Values values = new Values();
+
+ ArrayList<String> attributeNames = new ArrayList<String>(arity);
+ ArrayList<String> filterNames = new ArrayList<String>(arity);
+
+ for (int i = 0; i < arity; ++i) {
+ tuple.setField(i, i);
+ attributeNames.add("a" + i);
+
+ if (r.nextBoolean()) {
+ filterNames.add("a" + i);
+ values.add(i);
+ }
+ }
+ Fields schema = new Fields(attributeNames);
+ Fields selector = new Fields(filterNames);
+
+ Assert.assertEquals(values, new StormTuple(tuple, schema).select(selector));
}
- @Test(expected = UnsupportedOperationException.class)
- public void testGetValueByField() {
- new StormTuple<Object>(null).getValueByField(null);
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetValueByField() throws Exception {
+ Object value = mock(Object.class);
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertSame(value, tuple.getValueByField(fieldName));
+
}
- @Test(expected = UnsupportedOperationException.class)
- public void testGetStringByField() {
- new StormTuple<Object>(null).getStringByField(null);
+ @Test
+ public void testGetValueByFieldPojo() throws Exception {
+ Object value = mock(Object.class);
+ TestPojoMember<Object> pojo = new TestPojoMember<Object>(value);
+ StormTuple<TestPojoMember<Object>> tuple = new StormTuple<TestPojoMember<Object>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getValueByField(fieldNamePojo));
}
- @Test(expected = UnsupportedOperationException.class)
- public void testGetIntegerByField() {
- new StormTuple<Object>(null).getIntegerByField(null);
+ @Test
+ public void testGetValueByFieldPojoGetter() throws Exception {
+ Object value = mock(Object.class);
+ TestPojoGetter<Object> pojo = new TestPojoGetter<Object>(value);
+ StormTuple<TestPojoGetter<Object>> tuple = new StormTuple<TestPojoGetter<Object>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getValueByField(fieldNamePojo));
}
- @Test(expected = UnsupportedOperationException.class)
- public void testGetLongByField() {
- new StormTuple<Object>(null).getLongByField(null);
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetStringByField() throws Exception {
+ String value = "stringValue";
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertSame(value, tuple.getStringByField(fieldName));
}
- @Test(expected = UnsupportedOperationException.class)
- public void testGetBooleanByField() {
- new StormTuple<Object>(null).getBooleanByField(null);
+ @Test
+ public void testGetStringByFieldPojo() throws Exception {
+ String value = "stringValue";
+ TestPojoMember<String> pojo = new TestPojoMember<String>(value);
+ StormTuple<TestPojoMember<String>> tuple = new StormTuple<TestPojoMember<String>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getStringByField(fieldNamePojo));
}
- @Test(expected = UnsupportedOperationException.class)
- public void testGetShortByField() {
- new StormTuple<Object>(null).getShortByField(null);
+ @Test
+ public void testGetStringByFieldPojoGetter() throws Exception {
+ String value = "stringValue";
+ TestPojoGetter<String> pojo = new TestPojoGetter<String>(value);
+ StormTuple<TestPojoGetter<String>> tuple = new StormTuple<TestPojoGetter<String>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getStringByField(fieldNamePojo));
}
- @Test(expected = UnsupportedOperationException.class)
- public void testGetByteByField() {
- new StormTuple<Object>(null).getByteByField(null);
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetIntegerByField() throws Exception {
+ Integer value = r.nextInt();
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertSame(value, tuple.getIntegerByField(fieldName));
}
- @Test(expected = UnsupportedOperationException.class)
- public void testGetDoubleByField() {
- new StormTuple<Object>(null).getDoubleByField(null);
+ @Test
+ public void testGetIntegerByFieldPojo() throws Exception {
+ Integer value = r.nextInt();
+ TestPojoMember<Integer> pojo = new TestPojoMember<Integer>(value);
+ StormTuple<TestPojoMember<Integer>> tuple = new StormTuple<TestPojoMember<Integer>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getIntegerByField(fieldNamePojo));
}
- @Test(expected = UnsupportedOperationException.class)
- public void testGetFloatByField() {
- new StormTuple<Object>(null).getFloatByField(null);
+ @Test
+ public void testGetIntegerByFieldPojoGetter() throws Exception {
+ Integer value = r.nextInt();
+ TestPojoGetter<Integer> pojo = new TestPojoGetter<Integer>(value);
+ StormTuple<TestPojoGetter<Integer>> tuple = new StormTuple<TestPojoGetter<Integer>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getIntegerByField(fieldNamePojo));
}
- @Test(expected = UnsupportedOperationException.class)
- public void testGetBinaryByField() {
- new StormTuple<Object>(null).getBinaryByField(null);
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetLongByField() throws Exception {
+ Long value = r.nextLong();
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertSame(value, tuple.getLongByField(fieldName));
+ }
+
+ @Test
+ public void testGetLongByFieldPojo() throws Exception {
+ Long value = r.nextLong();
+ TestPojoMember<Long> pojo = new TestPojoMember<Long>(value);
+ StormTuple<TestPojoMember<Long>> tuple = new StormTuple<TestPojoMember<Long>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getLongByField(fieldNamePojo));
+ }
+
+ @Test
+ public void testGetLongByFieldPojoGetter() throws Exception {
+ Long value = r.nextLong();
+ TestPojoGetter<Long> pojo = new TestPojoGetter<Long>(value);
+ StormTuple<TestPojoGetter<Long>> tuple = new StormTuple<TestPojoGetter<Long>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getLongByField(fieldNamePojo));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetBooleanByField() throws Exception {
+ Boolean value = r.nextBoolean();
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertEquals(value, tuple.getBooleanByField(fieldName));
+ }
+
+ @Test
+ public void testGetBooleanByFieldPojo() throws Exception {
+ Boolean value = r.nextBoolean();
+ TestPojoMember<Boolean> pojo = new TestPojoMember<Boolean>(value);
+ StormTuple<TestPojoMember<Boolean>> tuple = new StormTuple<TestPojoMember<Boolean>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getBooleanByField(fieldNamePojo));
+ }
+
+ @Test
+ public void testGetBooleanByFieldPojoGetter() throws Exception {
+ Boolean value = r.nextBoolean();
+ TestPojoGetter<Boolean> pojo = new TestPojoGetter<Boolean>(value);
+ StormTuple<TestPojoGetter<Boolean>> tuple = new StormTuple<TestPojoGetter<Boolean>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getBooleanByField(fieldNamePojo));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetShortByField() throws Exception {
+ Short value = (short) r.nextInt();
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertSame(value, tuple.getShortByField(fieldName));
+ }
+
+ @Test
+ public void testGetShortByFieldPojo() throws Exception {
+ Short value = (short) r.nextInt();
+ TestPojoMember<Short> pojo = new TestPojoMember<Short>(value);
+ StormTuple<TestPojoMember<Short>> tuple = new StormTuple<TestPojoMember<Short>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getShortByField(fieldNamePojo));
+ }
+
+ @Test
+ public void testGetShortByFieldPojoGetter() throws Exception {
+ Short value = (short) r.nextInt();
+ TestPojoGetter<Short> pojo = new TestPojoGetter<Short>(value);
+ StormTuple<TestPojoGetter<Short>> tuple = new StormTuple<TestPojoGetter<Short>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getShortByField(fieldNamePojo));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetByteByField() throws Exception {
+ Byte value = new Byte((byte) r.nextInt());
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertSame(value, tuple.getByteByField(fieldName));
+ }
+
+ @Test
+ public void testGetByteByFieldPojo() throws Exception {
+ Byte value = new Byte((byte) r.nextInt());
+ TestPojoMember<Byte> pojo = new TestPojoMember<Byte>(value);
+ StormTuple<TestPojoMember<Byte>> tuple = new StormTuple<TestPojoMember<Byte>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getByteByField(fieldNamePojo));
+ }
+
+ @Test
+ public void testGetByteByFieldPojoGetter() throws Exception {
+ Byte value = new Byte((byte) r.nextInt());
+ TestPojoGetter<Byte> pojo = new TestPojoGetter<Byte>(value);
+ StormTuple<TestPojoGetter<Byte>> tuple = new StormTuple<TestPojoGetter<Byte>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getByteByField(fieldNamePojo));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetDoubleByField() throws Exception {
+ Double value = r.nextDouble();
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertSame(value, tuple.getDoubleByField(fieldName));
+ }
+
+ @Test
+ public void testGetDoubleByFieldPojo() throws Exception {
+ Double value = r.nextDouble();
+ TestPojoMember<Double> pojo = new TestPojoMember<Double>(value);
+ StormTuple<TestPojoMember<Double>> tuple = new StormTuple<TestPojoMember<Double>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getDoubleByField(fieldNamePojo));
+ }
+
+ @Test
+ public void testGetDoubleByFieldPojoGetter() throws Exception {
+ Double value = r.nextDouble();
+ TestPojoGetter<Double> pojo = new TestPojoGetter<Double>(value);
+ StormTuple<TestPojoGetter<Double>> tuple = new StormTuple<TestPojoGetter<Double>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getDoubleByField(fieldNamePojo));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetFloatByField() throws Exception {
+ Float value = r.nextFloat();
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertSame(value, tuple.getFloatByField(fieldName));
+ }
+
+ @Test
+ public void testGetFloatByFieldPojo() throws Exception {
+ Float value = r.nextFloat();
+ TestPojoMember<Float> pojo = new TestPojoMember<Float>(value);
+ StormTuple<TestPojoMember<Float>> tuple = new StormTuple<TestPojoMember<Float>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getFloatByField(fieldNamePojo));
+ }
+
+ @Test
+ public void testGetFloatByFieldPojoGetter() throws Exception {
+ Float value = r.nextFloat();
+ TestPojoGetter<Float> pojo = new TestPojoGetter<Float>(value);
+ StormTuple<TestPojoGetter<Float>> tuple = new StormTuple<TestPojoGetter<Float>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getFloatByField(fieldNamePojo));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetBinaryByField() throws Exception {
+ byte[] data = new byte[1 + r.nextInt(20)];
+ r.nextBytes(data);
+ StormTuple tuple = testGetByField(arity, index, data);
+ Assert.assertSame(data, tuple.getBinaryByField(fieldName));
+ }
+
+ @Test
+ public void testGetBinaryFieldPojo() throws Exception {
+ byte[] data = new byte[1 + r.nextInt(20)];
+ r.nextBytes(data);
+ TestPojoMember<byte[]> pojo = new TestPojoMember<byte[]>(data);
+ StormTuple<TestPojoMember<byte[]>> tuple = new StormTuple<TestPojoMember<byte[]>>(pojo,
+ null);
+ Assert.assertSame(data, tuple.getBinaryByField(fieldNamePojo));
+ }
+
+ @Test
+ public void testGetBinaryByFieldPojoGetter() throws Exception {
+ byte[] data = new byte[1 + r.nextInt(20)];
+ r.nextBytes(data);
+ TestPojoGetter<byte[]> pojo = new TestPojoGetter<byte[]>(data);
+ StormTuple<TestPojoGetter<byte[]>> tuple = new StormTuple<TestPojoGetter<byte[]>>(pojo,
+ null);
+ Assert.assertSame(data, tuple.getBinaryByField(fieldNamePojo));
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private <T> StormTuple testGetByField(int arity, int index, T value)
+ throws Exception {
+
+ assert (index < arity);
+
+ Tuple tuple = Tuple.getTupleClass(arity).newInstance();
+ tuple.setField(value, index);
+
+ ArrayList<String> attributeNames = new ArrayList<String>(arity);
+ for(int i = 0; i < arity; ++i) {
+ if(i == index) {
+ attributeNames.add(fieldName);
+ } else {
+ attributeNames.add("" + i);
+ }
+ }
+ Fields schema = new Fields(attributeNames);
+
+ return new StormTuple(tuple, schema);
}
@Test(expected = UnsupportedOperationException.class)
public void testGetSourceGlobalStreamid() {
- new StormTuple<Object>(null).getSourceGlobalStreamid();
+ new StormTuple<Object>(null, null).getSourceGlobalStreamid();
}
@Test(expected = UnsupportedOperationException.class)
public void testGetSourceComponent() {
- new StormTuple<Object>(null).getSourceComponent();
+ new StormTuple<Object>(null, null).getSourceComponent();
}
@Test(expected = UnsupportedOperationException.class)
public void testGetSourceTask() {
- new StormTuple<Object>(null).getSourceTask();
+ new StormTuple<Object>(null, null).getSourceTask();
}
@Test(expected = UnsupportedOperationException.class)
public void testGetSourceStreamId() {
- new StormTuple<Object>(null).getSourceStreamId();
+ new StormTuple<Object>(null, null).getSourceStreamId();
}
@Test(expected = UnsupportedOperationException.class)
public void testGetMessageId() {
- new StormTuple<Object>(null).getMessageId();
+ new StormTuple<Object>(null, null).getMessageId();
+ }
+
+ public static class TestPojoMember<T> {
+ public T member;
+
+ public TestPojoMember(T value) {
+ this.member = value;
+ }
}
+ public static class TestPojoGetter<T> {
+ private T member;
+
+ public TestPojoGetter(T value) {
+ this.member = value;
+ }
+
+ public T getMember() {
+ return this.member;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
index a4d8885..c5e501b 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
@@ -1,12 +1,16 @@
# flink-storm-examples
-This module contains three versions of a simple word-count-example to illustrate the usage of the compatibility layer:
+This module contains multiple versions of a simple word-count-example to illustrate the usage of the compatibility layer:
* the usage of spouts or bolt within a regular Flink streaming program (ie, embedded spouts or bolts)
1. `SpoutSourceWordCount` uses a spout as data source within a Flink streaming program
2. `BoltTokenizeerWordCount` uses a bolt to split sentences into words within a Flink streaming program
+ * `BoltTokenizeerWordCountWithNames` used Tuple input type and access attributes by field names (rather than index)
+ * `BoltTokenizeerWordCountPOJO` used POJO input type and access attributes by field names (rather then index)
+
* how to submit a whole Storm topology to Flink
3. `WordCountTopology` plugs a Storm topology together
* `StormWordCountLocal` submits the topology to a local Flink cluster (similiar to a `LocalCluster` in Storm)
+ (`StormWordCountNamedLocal` access attributes by field names rather than index)
* `StormWordCountRemoteByClient` submits the topology to a remote Flink cluster (simliar to the usage of `NimbusClient` in Storm)
* `StormWordCountRemoteBySubmitter` submits the topology to a remote Flink cluster (simliar to the usage of `StormSubmitter` in Storm)
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
index c38b599..f52a7bd 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
@@ -30,7 +30,7 @@ import java.util.Map;
/**
* Implements a Storm Spout that reads data from a given local file.
*/
-public final class StormFileSpout extends AbstractStormSpout {
+public class StormFileSpout extends AbstractStormSpout {
private static final long serialVersionUID = -6996907090003590436L;
private final String path;
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
index 3e6081c..99ef324 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
@@ -23,7 +23,7 @@ import org.apache.flink.examples.java.wordcount.util.WordCountData;
/**
* Implements a Storm Spout that reads data from {@link WordCountData#WORDS}.
*/
-public final class StormInMemorySpout extends AbstractStormSpout {
+public class StormInMemorySpout extends AbstractStormSpout {
private static final long serialVersionUID = -4008858647468647019L;
private String[] source;
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountFileSpout.java
new file mode 100644
index 0000000..1fc4023
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountFileSpout.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+
+/**
+ * Implements a Storm Spout that reads data from a given local file.
+ */
+public final class StormWordCountFileSpout extends StormFileSpout {
+ private static final long serialVersionUID = 2372251989250954503L;
+
+ public StormWordCountFileSpout(String path) {
+ super(path);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("sentence"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountInMemorySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountInMemorySpout.java
new file mode 100644
index 0000000..408cbfb
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountInMemorySpout.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+
+/**
+ * Implements a Storm Spout that reads data from {@link WordCountData#WORDS}.
+ */
+public final class StormWordCountInMemorySpout extends StormInMemorySpout {
+ private static final long serialVersionUID = 8832143302409465843L;
+
+ public StormWordCountInMemorySpout(String[] source) {
+ super(source);
+ }
+
+ @Override
+ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("sentence"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
index 606a3ce..8f4503f 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
@@ -67,8 +67,8 @@ public class BoltTokenizerWordCount {
.transform("StormBoltTokenizer",
TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
new StormBoltWrapper<String, Tuple2<String, Integer>>(new StormBoltTokenizer()))
- // split up the lines in pairs (2-tuples) containing: (word,1)
- // group by the tuple field "0" and sum up tuple field "1"
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ // group by the tuple field "0" and sum up tuple field "1"
.groupBy(0).sum(1);
// emit result
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
new file mode 100644
index 0000000..befb18f
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import backtype.storm.topology.IRichBolt;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizerByName;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.WordCountDataPojos;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.WordCountDataPojos.Sentence;
+import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The tokenizer step is performed by a Storm {@link IRichBolt bolt}. In contrast to
+ * {@link BoltTokenizerWordCount} the tokenizer's input is a POJO type and the single field is accessed by name.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount <text path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>how to access attributes by name for POJO type input streams
+ * </ul>
+ */
+public class BoltTokenizerWordCountPojo {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ final DataStream<Sentence> text = getTextDataStream(env);
+
+ final DataStream<Tuple2<String, Integer>> counts = text
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ // this is done by a Storm bolt that is wrapped accordingly
+ .transform("StormBoltTokenizer",
+ TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
+ new StormBoltWrapper<Sentence, Tuple2<String, Integer>>(
+ new StormBoltTokenizerByName()))
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0).sum(1);
+
+ // emit result
+ if (fileOutput) {
+ counts.writeAsText(outputPath);
+ } else {
+ counts.print();
+ }
+
+ // execute program
+ env.execute("Streaming WordCount with Storm bolt tokenizer");
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(final String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: BoltTokenizerWordCount <text path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing BoltTokenizerWordCount example with built-in default data");
+ System.out.println(" Provide parameters to read input data from a file");
+ System.out.println(" Usage: BoltTokenizerWordCount <text path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataStream<Sentence> getTextDataStream(final StreamExecutionEnvironment env) {
+ if (fileOutput) {
+ // read the text file from given input path
+ TypeInformation<Sentence> sourceType = TypeExtractor
+ .getForObject(new Sentence(""));
+ return env.createInput(new CsvInputFormat<Sentence>(new Path(
+ textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
+ CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType),
+ sourceType);
+ }
+
+ return env.fromElements(WordCountDataPojos.SENTENCES);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
new file mode 100644
index 0000000..8483f48
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.tuple.Fields;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizerByName;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.WordCountDataTuple;
+import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The tokenizer step is performed by a Storm {@link IRichBolt bolt}. In contrast to
+ * {@link BoltTokenizerWordCount} the tokenizer's input is a {@link Tuple} type and the single field is accessed by
+ * name.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount <text path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>how to access attributes by name for {@link Tuple} type input streams
+ * </ul>
+ */
+public class BoltTokenizerWordCountWithNames {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ final DataStream<Tuple1<String>> text = getTextDataStream(env);
+
+ final DataStream<Tuple2<String, Integer>> counts = text
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ // this is done by a Storm bolt that is wrapped accordingly
+ .transform("StormBoltTokenizer",
+ TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
+ new StormBoltWrapper<Tuple1<String>, Tuple2<String, Integer>>(
+ new StormBoltTokenizerByName(), new Fields("sentence")))
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0).sum(1);
+
+ // emit result
+ if (fileOutput) {
+ counts.writeAsText(outputPath);
+ } else {
+ counts.print();
+ }
+
+ // execute program
+ env.execute("Streaming WordCount with Storm bolt tokenizer");
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(final String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: BoltTokenizerWordCount <text path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing BoltTokenizerWordCount example with built-in default data");
+ System.out.println(" Provide parameters to read input data from a file");
+ System.out.println(" Usage: BoltTokenizerWordCount <text path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataStream<Tuple1<String>> getTextDataStream(final StreamExecutionEnvironment env) {
+ if (fileOutput) {
+ // read the text file from given input path
+ TypeInformation<Tuple1<String>> sourceType = TypeExtractor
+ .getForObject(new Tuple1<String>(""));
+ return env.createInput(new CsvInputFormat<Tuple1<String>>(new Path(
+ textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
+ CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType),
+ sourceType);
+ }
+
+ return env.fromElements(WordCountDataTuple.TUPLES);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
index 0ae51c6..361d83a 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
@@ -67,8 +67,8 @@ public class SpoutSourceWordCount {
final DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
- // group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0).sum(1);
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0).sum(1);
// emit result
if (fileOutput) {
http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountNamedLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountNamedLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountNamedLocal.java
new file mode 100644
index 0000000..f51afab
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountNamedLocal.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.utils.Utils;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
+ * same way as to a Storm {@link LocalCluster}. In contrast to {@link StormWordCountLocal} all bolts access the field of
+ * input tuples by name instead of index.
+ * <p/>
+ * This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology}
+ * via Flink command line clients (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount <text path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>run a regular Storm program locally on Flink
+ * </ul>
+ */
+public class StormWordCountNamedLocal {
+ public final static String topologyId = "Streaming WordCountName";
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!WordCountTopology.parseParameters(args)) {
+ return;
+ }
+
+ // build Topology the Storm way
+ final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(false);
+
+ // execute program locally
+ final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
+ cluster.submitTopology(topologyId, null, builder.createTopology());
+
+ Utils.sleep(10 * 1000);
+
+ // TODO kill does no do anything so far
+ cluster.killTopology(topologyId);
+ cluster.shutdown();
+ }
+
+}