You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/15 11:32:55 UTC
[05/27] flink git commit: [storm-compat] Added abstract base to Storm
compatibility examples
[storm-compat] Added abstract base to Storm compatibility examples
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56e013f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56e013f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56e013f0
Branch: refs/heads/master
Commit: 56e013f031025a794cb327fdda52b46629dd085c
Parents: 45dbfdd
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Thu May 14 12:55:44 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sun Jun 14 22:59:12 2015 +0200
----------------------------------------------------------------------
.../flink-storm-examples/pom.xml | 283 +++++++++++++++++++
.../src/assembly/word-count-storm.xml | 68 +++++
.../stormoperators/AbstractStormBoltSink.java | 80 ++++++
.../stormoperators/AbstractStormSpout.java | 75 +++++
.../stormoperators/StormBoltCounter.java | 96 +++++++
.../stormoperators/StormBoltFileSink.java | 77 +++++
.../stormoperators/StormBoltPrintSink.java | 43 +++
.../stormoperators/StormBoltTokenizer.java | 80 ++++++
.../stormoperators/StormFileSpout.java | 84 ++++++
.../stormoperators/StormInMemorySpout.java | 44 +++
.../util/StreamingProgramTestBase.java | 9 +-
.../streaming/util/TestStreamEnvironment.java | 77 ++++-
12 files changed, 1009 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/56e013f0/flink-staging/flink-streaming/flink-storm-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/pom.xml b/flink-staging/flink-streaming/flink-storm-examples/pom.xml
new file mode 100644
index 0000000..94dbda5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/pom.xml
@@ -0,0 +1,283 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-parent</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-storm-examples</artifactId>
+ <name>flink-storm-examples</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-storm-compatibility</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java-examples</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-core</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- get default data from flink-java-examples package -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.9</version><!--$NO-MVN-MAN-VER$-->
+ <executions>
+ <execution>
+ <id>unpack</id>
+ <phase>prepare-package</phase>
+ <goals>
+ <goal>unpack</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java-examples</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ <overWrite>false</overWrite>
+ <outputDirectory>${project.build.directory}/classes</outputDirectory>
+ <includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-storm-compatibility</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ <overWrite>false</overWrite>
+ <outputDirectory>${project.build.directory}/classes</outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>0.9.4</version>
+ <type>jar</type>
+ <overWrite>false</overWrite>
+ <outputDirectory>${project.build.directory}/classes</outputDirectory>
+ <excludes>defaults.yaml</excludes>
+ </artifactItem>
+ <artifactItem>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>1.1</version>
+ <type>jar</type>
+ <overWrite>false</overWrite>
+ <outputDirectory>${project.build.directory}/classes</outputDirectory>
+ </artifactItem>snakeyaml
+ <artifactItem>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ <version>1.11</version>
+ <type>jar</type>
+ <overWrite>false</overWrite>
+ <outputDirectory>${project.build.directory}/classes</outputDirectory>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- self-contained jars for each example -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+
+ <executions>
+
+ <!-- WordCount Spout source-->
+ <execution>
+ <id>WordCount-SpoutSource</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>WordCountSpoutSource</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.stormcompatibility.wordcount.SpoutSourceWordCount</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <!-- from storm-core -->
+ <include>backtype/storm/topology/*.class</include>
+ <include>backtype/storm/spout/*.class</include>
+ <include>backtype/storm/task/*.class</include>
+ <include>backtype/storm/tuple/*.class</include>
+ <include>backtype/storm/generated/*.class</include>
+ <include>backtype/storm/metric/**/*.class</include>
+ <include>org/apache/thrift7/**/*.class</include>
+ <!-- Storm's recursive dependencies -->
+ <include>org/json/simple/**/*.class</include>
+ <!-- compatibility layer -->
+ <include>org/apache/flink/stormcompatibility/api/*.class</include>
+ <include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
+ <!-- Word Count -->
+ <include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.class</include>
+ <include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount$*.class</include>
+ <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/AbstractStormSpout.class</include>
+ <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormFileSpout.class</include>
+ <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormInMemorySpout.class</include>
+ <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- WordCount Bolt tokenizer-->
+ <execution>
+ <id>WordCount-BoltTokenizer</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>WordCountBoltTokenizer</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.stormcompatibility.wordcount.BoltTokenizerWordCount</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <!-- from storm-core -->
+ <include>backtype/storm/topology/*.class</include>
+ <include>backtype/storm/spout/*.class</include>
+ <include>backtype/storm/task/*.class</include>
+ <include>backtype/storm/tuple/*.class</include>
+ <include>backtype/storm/generated/*.class</include>
+ <include>backtype/storm/metric/**/*.class</include>
+ <include>org/apache/thrift7/**/*.class</include>
+ <!-- Storm's recursive dependencies -->
+ <include>org/json/simple/**/*.class</include>
+ <!-- compatibility layer -->
+ <include>org/apache/flink/stormcompatibility/api/*.class</include>
+ <include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
+ <!-- Word Count -->
+ <include>org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.class</include>
+ <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.class</include>
+ <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- WordCount Storm topology-->
+ <!-- Cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar -->
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>src/assembly/word-count-storm.xml</descriptor>
+ </descriptors>
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter</program-class>
+ </manifestEntries>
+ </archive>
+ </configuration>
+
+ <executions>
+ <execution>
+ <id>WordCountStorm</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <versionRange>[2.9,)</versionRange>
+ <goals>
+ <goal>unpack</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/56e013f0/flink-staging/flink-streaming/flink-storm-examples/src/assembly/word-count-storm.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/assembly/word-count-storm.xml b/flink-staging/flink-streaming/flink-storm-examples/src/assembly/word-count-storm.xml
new file mode 100644
index 0000000..002d9bc
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/assembly/word-count-storm.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
+
+ <id>WordCountStorm</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+
+ <includeBaseDirectory>false</includeBaseDirectory>
+
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>/</outputDirectory>
+ <unpack>true</unpack>
+ <includes>
+ <!-- need to be added explicitly to get 'defaults.yaml' -->
+ <include>org.apache.storm:storm-core:jar</include>
+ <include>org.apache.flink:flink-storm-examples:jar</include>
+ </includes>
+ <unpackOptions>
+ <includes>
+ <!-- from storm-core -->
+ <include>defaults.yaml</include>
+ <include>backtype/storm/*.class</include>
+ <include>backtype/storm/topology/*.class</include>
+ <include>backtype/storm/spout/*.class</include>
+ <include>backtype/storm/task/*.class</include>
+ <include>backtype/storm/tuple/*.class</include>
+ <include>backtype/storm/generated/*.class</include>
+ <include>backtype/storm/metric/**/*.class</include>
+ <include>backtype/storm/utils/*.class</include>
+ <include>backtype/storm/serialization/*.class</include>
+ <include>org/apache/storm/curator/**/*.class</include>
+ <include>org/apache/thrift7/**/*.class</include>
+ <!-- Storm's recursive dependencies -->
+ <include>org/json/simple/**/*.class</include>
+ <include>org/yaml/snakeyaml/**/*.class</include>
+ <!-- compatibility layer -->
+ <include>org/apache/flink/stormcompatibility/api/*.class</include>
+ <include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
+ <!-- Word Count -->
+ <include>org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.class</include>
+ <include>org/apache/flink/stormcompatibility/wordcount/WordCountTopology.class</include>
+ <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/*.class</include>
+ <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+ </includes>
+ </unpackOptions>
+ </dependencySet>
+ </dependencySets>
+</assembly>
http://git-wip-us.apache.org/repos/asf/flink/blob/56e013f0/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/AbstractStormBoltSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/AbstractStormBoltSink.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/AbstractStormBoltSink.java
new file mode 100644
index 0000000..ee21f7e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/AbstractStormBoltSink.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+
+
+
+
+
+/**
+ * Implements a sink that write the received data so some external output. The result is formatted like
+ * {@code (a1,a2,...,an)} with {@code Object.toString()} for each attribute).
+ */
+public abstract class AbstractStormBoltSink implements IRichBolt {
+ private static final long serialVersionUID = -1626323806848080430L;
+
+ private StringBuilder lineBuilder;
+ private String prefix = "";
+
+
+
+ @Override
+ public final void prepare(@SuppressWarnings("rawtypes") final Map stormConf, final TopologyContext context, final OutputCollector collector) {
+ this.prepareSimple(stormConf, context);
+ if(context.getComponentCommon(context.getThisComponentId()).get_parallelism_hint() > 1) {
+ this.prefix = context.getThisTaskId() + "> ";
+ }
+ }
+
+ protected abstract void prepareSimple(final Map<?, ?> stormConf, final TopologyContext context);
+
+ @Override
+ public final void execute(final Tuple input) {
+ this.lineBuilder = new StringBuilder();
+ this.lineBuilder.append(this.prefix);
+ this.lineBuilder.append("(");
+ for(final Object attribute : input.getValues()) {
+ this.lineBuilder.append(attribute);
+ this.lineBuilder.append(",");
+ }
+ this.lineBuilder.replace(this.lineBuilder.length() - 1, this.lineBuilder.length(), ")");
+
+ this.writeExternal(this.lineBuilder.toString());
+ }
+
+ protected abstract void writeExternal(final String line);
+
+ @Override
+ public void cleanup() {/* nothing to do */}
+
+ @Override
+ public final void declareOutputFields(final OutputFieldsDeclarer declarer) {/* nothing to do */}
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/56e013f0/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/AbstractStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/AbstractStormSpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/AbstractStormSpout.java
new file mode 100644
index 0000000..3639b3c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/AbstractStormSpout.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import java.util.Map;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+
+
+
+
+
+/**
+ * Base class for Storm Spout that reads data line by line from an arbitrary source. The declared output schema has a
+ * single attribute calle {@code line} and should be of type {@link String}.
+ */
+public abstract class AbstractStormSpout implements IRichSpout {
+ private static final long serialVersionUID = 8876828403487806771L;
+
+ public final static String ATTRIBUTE_LINE = "line";
+ public final static int ATTRIBUTE_LINE_INDEX = 0;
+
+ protected SpoutOutputCollector collector;
+
+
+
+ @Override
+ public void open(@SuppressWarnings("rawtypes") final Map conf, final TopologyContext context, @SuppressWarnings("hiding") final SpoutOutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void close() {/* noting to do */}
+
+ @Override
+ public void activate() {/* noting to do */}
+
+ @Override
+ public void deactivate() {/* noting to do */}
+
+ @Override
+ public void ack(final Object msgId) {/* noting to do */}
+
+ @Override
+ public void fail(final Object msgId) {/* noting to do */}
+
+ @Override
+ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(ATTRIBUTE_LINE));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/56e013f0/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java
new file mode 100644
index 0000000..8b9777b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+
+
+
+
+/**
+ * Implements the word counter that the occurrence of each unique word. The bolt takes a pair (input tuple schema:
+ * {@code <String,Integer>}) and sums the given word count for each unique word (output tuple schema:
+ * {@code <String,Integer>} ).
+ */
+public class StormBoltCounter implements IRichBolt {
+ private static final long serialVersionUID = 399619605462625934L;
+
+ public static final String ATTRIBUTE_WORD = "word";
+ public static final String ATTRIBUTE_COUNT = "count";
+
+ public static final int ATTRIBUTE_WORD_INDEX = 0;
+ public static final int ATTRIBUTE_COUNT_INDEX = 1;
+
+ private final HashMap<String, Count> counts = new HashMap<String, Count>();
+ private OutputCollector collector;
+
+
+
+ @Override
+ public void prepare(@SuppressWarnings("rawtypes") final Map stormConf, final TopologyContext context, @SuppressWarnings("hiding") final OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(final Tuple input) {
+ final String word = input.getString(StormBoltTokenizer.ATTRIBUTE_WORD_INDEX);
+
+ Count currentCount = this.counts.get(word);
+ if(currentCount == null) {
+ currentCount = new Count();
+ this.counts.put(word, currentCount);
+ }
+ currentCount.count += input.getInteger(StormBoltTokenizer.ATTRIBUTE_COUNT_INDEX).intValue();
+
+ this.collector.emit(new Values(word, new Integer(currentCount.count)));
+ }
+
+ @Override
+ public void cleanup() {/* nothing to do */}
+
+ @Override
+ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ /**
+ * A counter helper to emit immutable tuples to the given collector and avoid unnecessary object creating/deletion.
+ *
+ * @author mjsax
+ */
+ private static final class Count {
+ public int count;
+
+ public Count() {/* nothing to do */}
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/56e013f0/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltFileSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltFileSink.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltFileSink.java
new file mode 100644
index 0000000..2be8c5d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltFileSink.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Map;
+
+import backtype.storm.task.TopologyContext;
+
+
+
+
+
+/**
+ * Implements a sink that write the received data to the given file (as a result of {@code Object.toString()} for each
+ * attribute).
+ */
+public final class StormBoltFileSink extends AbstractStormBoltSink {
+ private static final long serialVersionUID = 2014027288631273666L;
+
+ private final String path;
+ private BufferedWriter writer;
+
+
+
+ public StormBoltFileSink(final String path) {
+ this.path = path;
+ }
+
+
+
+ @Override
+ public void prepareSimple(@SuppressWarnings("rawtypes") final Map stormConf, final TopologyContext context) {
+ try {
+ this.writer = new BufferedWriter(new FileWriter(this.path));
+ } catch(final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void writeExternal(final String line) {
+ try {
+ this.writer.write(line + "\n");
+ } catch(final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ if(this.writer != null) {
+ try {
+ this.writer.close();
+ } catch(final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/56e013f0/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltPrintSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltPrintSink.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltPrintSink.java
new file mode 100644
index 0000000..2d61c99
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltPrintSink.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import java.util.Map;
+
+import backtype.storm.task.TopologyContext;
+
+
+
+
+
+/**
+ * Implements a sink that prints the received data to {@code stdout}.
+ */
+public final class StormBoltPrintSink extends AbstractStormBoltSink {
+ private static final long serialVersionUID = -6650011223001009519L;
+
+ @Override
+ public void prepareSimple(@SuppressWarnings("rawtypes") final Map stormConf, final TopologyContext context) {
+ /* nothing to do */
+ }
+
+ @Override
+ public void writeExternal(final String line) {
+ System.out.println(line);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/56e013f0/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
new file mode 100644
index 0000000..6e30665
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+
+
+
+
+/**
+ * Implements the string tokenizer that splits sentences into words as a Storm bolt. The bolt takes a line (input tuple
+ * schema: {@code <String>}) and splits it into multiple pairs in the form of "(word,1)" (output tuple schema:
+ * {@code <String,Integer>}).
+ */
+public final class StormBoltTokenizer implements IRichBolt {
+ private static final long serialVersionUID = -8589620297208175149L;
+
+ public static final String ATTRIBUTE_WORD = "word";
+ public static final String ATTRIBUTE_COUNT = "count";
+
+ public static final int ATTRIBUTE_WORD_INDEX = 0;
+ public static final int ATTRIBUTE_COUNT_INDEX = 1;
+
+ private OutputCollector collector;
+
+
+
+ @Override
+ public void prepare(@SuppressWarnings("rawtypes") final Map stormConf, final TopologyContext context, @SuppressWarnings("hiding") final OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(final Tuple input) {
+ final String[] tokens = input.getString(0).toLowerCase().split("\\W+");
+
+ for(final String token : tokens) {
+ if(token.length() > 0) {
+ this.collector.emit(new Values(token, new Integer(1)));
+ }
+ }
+ }
+
+ @Override
+ public void cleanup() {/* nothing to do */}
+
+ @Override
+ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/56e013f0/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormFileSpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormFileSpout.java
new file mode 100644
index 0000000..2ac8314
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormFileSpout.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Map;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Values;
+
+
+
+
+
+/**
+ * Implements a Storm Spout that reads data from a given local file.
+ */
+public final class StormFileSpout extends AbstractStormSpout {
+ private static final long serialVersionUID = -6996907090003590436L;
+
+ private final String path;
+ private BufferedReader reader;
+
+
+
+ public StormFileSpout(final String path) {
+ this.path = path;
+ }
+
+
+
+ @Override
+ public void open(@SuppressWarnings("rawtypes") final Map conf, final TopologyContext context, @SuppressWarnings("hiding") final SpoutOutputCollector collector) {
+ super.open(conf, context, collector);
+ try {
+ this.reader = new BufferedReader(new FileReader(this.path));
+ } catch(final FileNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if(this.reader != null) {
+ try {
+ this.reader.close();
+ } catch(final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void nextTuple() {
+ String line;
+ try {
+ line = this.reader.readLine();
+ if(line != null) {
+ this.collector.emit(new Values(line));
+ }
+ } catch(final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/56e013f0/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormInMemorySpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormInMemorySpout.java
new file mode 100644
index 0000000..fbf7ae3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormInMemorySpout.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+
+import backtype.storm.tuple.Values;
+
+
+
+
+
+/**
+ * Implements a Storm Spout that reads data from {@link WordCountData#WORDS}.
+ */
+public final class StormInMemorySpout extends AbstractStormSpout {
+ private static final long serialVersionUID = -4008858647468647019L;
+
+ private int counter = 0;
+
+
+
+ @Override
+ public void nextTuple() {
+ if(this.counter < WordCountData.WORDS.length) {
+ this.collector.emit(new Values(WordCountData.WORDS[this.counter++]));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/56e013f0/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
index 23be327..92f8301 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
@@ -29,6 +29,8 @@ public abstract class StreamingProgramTestBase extends AbstractTestBase {
private static final int DEFAULT_PARALLELISM = 4;
+ private TestStreamEnvironment env;
+
private JobExecutionResult latestExecutionResult;
private int parallelism = DEFAULT_PARALLELISM;
@@ -86,7 +88,7 @@ public abstract class StreamingProgramTestBase extends AbstractTestBase {
}
// prepare the test environment
- TestStreamEnvironment env = new TestStreamEnvironment(this.executor, this.parallelism);
+ env = new TestStreamEnvironment(this.executor, this.parallelism);
env.setAsContext();
// call the test program
@@ -112,7 +114,10 @@ public abstract class StreamingProgramTestBase extends AbstractTestBase {
Assert.fail("Post-submit work caused an error: " + e.getMessage());
}
} finally {
- stopCluster();
+ if(env.clusterRunsSynchronous()) {
+ stopCluster();
+ }
}
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/56e013f0/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 5cf529a..1a6ef0f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -56,8 +56,10 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
@Override
public JobExecutionResult execute(String jobName) throws Exception {
- JobGraph jobGraph = streamGraph.getJobGraph(jobName);
-
+ return execute(streamGraph.getJobGraph(jobName));
+ }
+
+ public JobExecutionResult execute(JobGraph jobGraph) throws Exception {
if (internalExecutor) {
Configuration configuration = jobGraph.getJobConfiguration();
@@ -68,12 +70,11 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
executor = new ForkableFlinkMiniCluster(configuration);
}
try {
-
+ sync = true;
SerializedJobExecutionResult result = executor.submitJobAndWait(jobGraph, false);
latestResult = result.toJobExecutionResult(getClass().getClassLoader());
return latestResult;
- }
- catch (JobExecutionException e) {
+ } catch (JobExecutionException e) {
if (e.getMessage().contains("GraphConversionException")) {
throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
} else {
@@ -86,6 +87,72 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
}
}
+ private ForkableFlinkMiniCluster cluster = null;
+ private Thread jobRunner = null;
+ private boolean sync = true;
+
+ public void start(final JobGraph jobGraph) throws Exception {
+ if(cluster != null) {
+ throw new IllegalStateException("The cluster is already running");
+ }
+
+ if (internalExecutor) {
+ Configuration configuration = jobGraph.getJobConfiguration();
+
+ configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+ getParallelism());
+ configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
+
+ cluster = new ForkableFlinkMiniCluster(configuration);
+ } else {
+ cluster = executor;
+ }
+ try {
+ sync = false;
+
+ jobRunner = new Thread() {
+ public void run() {
+ try {
+ SerializedJobExecutionResult result = cluster.submitJobAndWait(jobGraph, false);
+ latestResult = result.toJobExecutionResult(getClass().getClassLoader());
+ } catch (JobExecutionException e) {
+ // TODO remove: hack to make ITCase succeed because .submitJobAndWait() throws exception on .stop() (see this.shutdown())
+ latestResult = new JobExecutionResult(null, 0, null);
+ e.printStackTrace();
+ //throw new RuntimeException(e);
+ } catch (Exception e) {
+ new RuntimeException(e);
+ }
+ }
+ };
+ jobRunner.start();
+ } catch(RuntimeException e) {
+ if (e.getCause().getMessage().contains("GraphConversionException")) {
+ throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ public JobExecutionResult shutdown() throws InterruptedException {
+ if(!sync) {
+ cluster.stop();
+ cluster = null;
+
+ jobRunner.join();
+ jobRunner = null;
+
+ return latestResult;
+ }
+
+ throw new IllegalStateException("Cluster was not started via .start(...)");
+ }
+
+ public boolean clusterRunsSynchronous() {
+ return sync;
+ }
+
protected void setAsContext() {
StreamExecutionEnvironmentFactory factory = new StreamExecutionEnvironmentFactory() {
@Override