You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:34 UTC
[09/15] flink git commit: [Storm Compatibility] Maven module
restucturing and cleanup - removed storm-parent;
renamed storm-core and storm-examples - updated internal Java package
structure * renamed package "stormcompatibility" to "storm" *
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
deleted file mode 100644
index 2f3c02d..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
+++ /dev/null
@@ -1,362 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-storm-compatibility-parent</artifactId>
- <version>0.10-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-storm-compatibility-examples</artifactId>
- <name>flink-storm-compatibility-examples</name>
-
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-storm-compatibility-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java-examples</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-core</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <!-- get default data from flink-java-examples package -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <version>2.9</version><!--$NO-MVN-MAN-VER$-->
- <executions>
- <execution>
- <id>unpack</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>unpack</goal>
- </goals>
- <configuration>
- <artifactItems>
- <artifactItem>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java-examples</artifactId>
- <version>${project.version}</version>
- <type>jar</type>
- <overWrite>false</overWrite>
- <outputDirectory>${project.build.directory}/classes</outputDirectory>
- <includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
- </artifactItem>
- <artifactItem>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-storm-compatibility-core</artifactId>
- <version>${project.version}</version>
- <type>jar</type>
- <overWrite>false</overWrite>
- <outputDirectory>${project.build.directory}/classes</outputDirectory>
- </artifactItem>
- <artifactItem>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>0.9.4</version>
- <type>jar</type>
- <overWrite>false</overWrite>
- <outputDirectory>${project.build.directory}/classes</outputDirectory>
- <!-- need to exclude to be able to run
- * StormWordCountRemoteByClient and
- * StormWordCountRemoteBySubmitter
- within Eclipse -->
- <excludes>defaults.yaml</excludes>
- </artifactItem>
- <artifactItem>
- <groupId>com.googlecode.json-simple</groupId>
- <artifactId>json-simple</artifactId>
- <version>1.1</version>
- <type>jar</type>
- <overWrite>false</overWrite>
- <outputDirectory>${project.build.directory}/classes</outputDirectory>
- </artifactItem>
- <artifactItem>
- <groupId>org.yaml</groupId>
- <artifactId>snakeyaml</artifactId>
- <version>1.11</version>
- <type>jar</type>
- <overWrite>false</overWrite>
- <outputDirectory>${project.build.directory}/classes</outputDirectory>
- </artifactItem>
- </artifactItems>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <!-- self-contained jars for each example -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
-
- <executions>
-
- <!-- WordCount Spout source-->
- <!-- example for embedded spout - for whole topologies see "WordCount Storm topology" example below -->
- <execution>
- <id>WordCount-SpoutSource</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <finalName>WordCount</finalName>
- <classifier>SpoutSource</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.stormcompatibility.wordcount.SpoutSourceWordCount</program-class>
- </manifestEntries>
- </archive>
-
- <includes>
- <!-- from storm-core -->
- <include>backtype/storm/topology/*.class</include>
- <include>backtype/storm/spout/*.class</include>
- <include>backtype/storm/task/*.class</include>
- <include>backtype/storm/tuple/*.class</include>
- <include>backtype/storm/generated/*.class</include>
- <include>backtype/storm/metric/**/*.class</include>
- <include>org/apache/thrift7/**/*.class</include>
- <!-- Storm's recursive dependencies -->
- <include>org/json/simple/**/*.class</include>
- <!-- compatibility layer -->
- <include>org/apache/flink/stormcompatibility/api/*.class</include>
- <include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
- <!-- Word Count -->
- <include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.class</include>
- <include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount$*.class</include>
- <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountFileSpout.class</include>
- <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountInMemorySpout.class</include>
- <include>org/apache/flink/stormcompatibility/util/AbstractStormSpout.class</include>
- <include>org/apache/flink/stormcompatibility/util/StormFileSpout.class</include>
- <include>org/apache/flink/stormcompatibility/util/StormInMemorySpout.class</include>
- <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
- </includes>
- </configuration>
- </execution>
-
- <!-- WordCount Bolt tokenizer-->
- <!-- example for embedded bolt - for whole topologies see "WordCount Storm topology" example below -->
- <execution>
- <id>WordCount-BoltTokenizer</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <finalName>WordCount</finalName>
- <classifier>BoltTokenizer</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.stormcompatibility.wordcount.BoltTokenizerWordCount</program-class>
- </manifestEntries>
- </archive>
-
- <includes>
- <!-- from storm-core -->
- <include>backtype/storm/topology/*.class</include>
- <include>backtype/storm/spout/*.class</include>
- <include>backtype/storm/task/*.class</include>
- <include>backtype/storm/tuple/*.class</include>
- <include>backtype/storm/generated/*.class</include>
- <include>backtype/storm/metric/**/*.class</include>
- <include>org/apache/thrift7/**/*.class</include>
- <!-- Storm's recursive dependencies -->
- <include>org/json/simple/**/*.class</include>
- <!-- compatibility layer -->
- <include>org/apache/flink/stormcompatibility/api/*.class</include>
- <include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
- <!-- Word Count -->
- <include>org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.class</include>
- <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.class</include>
- <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
- </includes>
- </configuration>
- </execution>
-
- <!-- WordCount Storm topology-->
- <!-- Example for whole topologies (ie, if FlinkTopologyBuilder is used) -->
- <!-- We cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar.
- However, we excluded 'defaults.yaml' in dependency-plugin to get clean Eclipse environment.
- Thus, 'defaults.yaml' is not available for maven-jar-plugin.
- Nevertheless, we register an empty jar with corresponding name, such that the final jar can be installed to local maven repository.
- We use maven-shade-plugin to build the actual jar (which will replace the empty jar). -->
- <execution>
- <id>WordCount-StormTopology</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <finalName>WordCount</finalName>
- <classifier>StormTopology</classifier>
- </configuration>
- </execution>
-
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <!-- WordCount Storm topology-->
- <!-- Cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar -->
- <!-- Build StormTopolgy jar to overwrite empty jar created with maven-jar-plugin. -->
- <plugin>
- <artifactId>maven-shade-plugin</artifactId>
- <groupId>org.apache.maven.plugins</groupId>
- <version>2.4.1</version>
- <executions>
- <execution>
- <id>WordCount-StormTopology</id>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <finalName>WordCount-StormTopology</finalName>
-
- <artifactSet>
- <includes>
- <include>org.apache.storm:storm-core</include>
- <!-- Storm's recursive dependencies -->
- <include>org.yaml:snakeyaml</include>
- <include>com.googlecode.json-simple:json-simple</include>
- <include>org.apache.flink:flink-storm-compatibility-core</include>
- <include>org.apache.flink:flink-storm-compatibility-examples</include>
- </includes>
- </artifactSet>
- <filters>
- <filter>
- <artifact>org.apache.storm:storm-core</artifact>
- <includes>
- <include>defaults.yaml</include>
- <include>backtype/storm/*.class</include>
- <include>backtype/storm/topology/*.class</include>
- <include>backtype/storm/spout/*.class</include>
- <include>backtype/storm/task/*.class</include>
- <include>backtype/storm/tuple/*.class</include>
- <include>backtype/storm/generated/*.class</include>
- <include>backtype/storm/metric/**/*.class</include>
- <include>backtype/storm/utils/*.class</include>
- <include>backtype/storm/serialization/*.class</include>
- <include>org/apache/storm/curator/**/*.class</include>
- <include>org/apache/thrift7/**/*.class</include>
- <!-- Storm's recursive dependencies -->
- <include>org/json/simple/**/*.class</include>
- <include>org/yaml/snakeyaml/**/*.class</include>
- </includes>
- </filter>
- <filter>
- <artifact>org.apache.flink:flink-storm-compatibility-examples</artifact>
- <includes>
- <include>org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.class</include>
- <include>org/apache/flink/stormcompatibility/wordcount/WordCountTopology.class</include>
- <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/*.class</include>
- <include>org/apache/flink/stormcompatibility/util/*.class</include>
- <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
- </includes>
- </filter>
- <filter>
- <artifact>org.apache.flink:flink-storm-compatibility-core</artifact>
- <includes>
- <include>org/apache/flink/stormcompatibility/api/*.class</include>
- <include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
- </includes>
- </filter>
- </filters>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter</mainClass>
- </transformer>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
-
- <pluginManagement>
- <plugins>
- <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <versionRange>[2.9,)</versionRange>
- <goals>
- <goal>unpack</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
-
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
deleted file mode 100644
index d8d620b..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation;
-
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
-import org.apache.flink.stormcompatibility.util.FiniteStormFileSpout;
-import org.apache.flink.stormcompatibility.util.FiniteStormInMemorySpout;
-import org.apache.flink.stormcompatibility.util.OutputFormatter;
-import org.apache.flink.stormcompatibility.util.SimpleOutputFormatter;
-import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
-import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
-
-/**
- * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
- * files in a streaming fashion. The program is constructed as a regular {@link StormTopology}.
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>StormExclamation[Local|RemoteByClient|RemoteBySubmitter] <text path>
- * <result path></code><br/>
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>construct a regular Storm topology as Flink program</li>
- * <li>make use of the FiniteStormSpout interface</li>
- * </ul>
- */
-public class ExclamationTopology {
-
- public final static String spoutId = "source";
- public final static String firstBoltId = "exclamation1";
- public final static String secondBoltId = "exclamation2";
- public final static String sinkId = "sink";
- private final static OutputFormatter formatter = new SimpleOutputFormatter();
-
- public static FlinkTopologyBuilder buildTopology() {
- final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
-
- // get input data
- if (fileInputOutput) {
- // read the text file from given input path
- final String[] tokens = textPath.split(":");
- final String inputFile = tokens[tokens.length - 1];
- builder.setSpout(spoutId, new FiniteStormFileSpout(inputFile));
- } else {
- builder.setSpout(spoutId, new FiniteStormInMemorySpout(WordCountData.WORDS));
- }
-
- builder.setBolt(firstBoltId, new ExclamationBolt(), 3).shuffleGrouping(spoutId);
- builder.setBolt(secondBoltId, new ExclamationBolt(), 2).shuffleGrouping(firstBoltId);
-
- // emit result
- if (fileInputOutput) {
- // read the text file from given input path
- final String[] tokens = outputPath.split(":");
- final String outputFile = tokens[tokens.length - 1];
- builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter))
- .shuffleGrouping(secondBoltId);
- } else {
- builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4)
- .shuffleGrouping(secondBoltId);
- }
-
- return builder;
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileInputOutput = false;
- private static String textPath;
- private static String outputPath;
- private static int exclamationNum = 3;
-
- static int getExclamation() {
- return exclamationNum;
- }
-
- static boolean parseParameters(final String[] args) {
-
- if (args.length > 0) {
- // parse input arguments
- fileInputOutput = true;
- if (args.length == 3) {
- textPath = args[0];
- outputPath = args[1];
- exclamationNum = Integer.parseInt(args[2]);
- } else {
- System.err.println(
- "Usage: StormExclamation[Local|RemoteByClient|RemoteBySubmitter] <text " +
- "path> <result path> <number of exclamation marks>");
- return false;
- }
- } else {
- System.out.println("Executing StormExclamation example with built-in default data");
- System.out.println(" Provide parameters to read input data from a file");
- System.out.println(
- " Usage: StormExclamation[Local|RemoteByClient|RemoteBySubmitter] <text path>" +
- " <result path> <number of exclamation marks>");
- }
-
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
deleted file mode 100644
index c8af3a6..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import backtype.storm.utils.Utils;
-
-/**
- * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
- * files in a streaming fashion. The program is constructed as a regular {@link StormTopology}.
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>StormExclamationWithStormBolt <text path> <result path> <number of exclamation marks></code><br/>
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>use a Storm bolt within a Flink Streaming program</li>
- * </ul>
- */
-public class ExclamationWithStormBolt {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(final String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- // set up the execution environment
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // set Storm configuration
- StormConfig config = new StormConfig();
- config.put(ExclamationBolt.EXCLAMATION_COUNT, new Integer(exclamationNum));
- env.getConfig().setGlobalJobParameters(config);
-
- // get input data
- final DataStream<String> text = getTextDataStream(env);
-
- final DataStream<String> exclaimed = text
- .transform("StormBoltTokenizer",
- TypeExtractor.getForObject(""),
- new StormBoltWrapper<String, String>(new ExclamationBolt(),
- new String[] { Utils.DEFAULT_STREAM_ID }))
- .map(new ExclamationMap());
-
- // emit result
- if (fileOutput) {
- exclaimed.writeAsText(outputPath);
- } else {
- exclaimed.print();
- }
-
- // execute program
- env.execute("Streaming WordCount with Storm bolt tokenizer");
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- private static class ExclamationMap implements MapFunction<String, String> {
- private static final long serialVersionUID = 4614754344067170619L;
-
- @Override
- public String map(String value) throws Exception {
- return value + "!!!";
- }
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String textPath;
- private static String outputPath;
- private static int exclamationNum = 3;
-
- private static boolean parseParameters(final String[] args) {
-
- if (args.length > 0) {
- // parse input arguments
- fileOutput = true;
- if (args.length == 3) {
- textPath = args[0];
- outputPath = args[1];
- exclamationNum = Integer.parseInt(args[2]);
- } else {
- System.err.println("Usage: ExclamationWithStormBolt <text path> <result path> <number of exclamation marks>");
- return false;
- }
- } else {
- System.out.println("Executing ExclamationWithStormBolt example with built-in default data");
- System.out.println(" Provide parameters to read input data from a file");
- System.out.println(" Usage: ExclamationWithStormBolt <text path> <result path> <number of exclamation marks>");
- }
- return true;
- }
-
- private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
- if (fileOutput) {
- // read the text file from given input path
- return env.readTextFile(textPath);
- }
-
- return env.fromElements(WordCountData.WORDS);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
deleted file mode 100644
index 99c816d..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.util.FiniteStormFileSpout;
-import org.apache.flink.stormcompatibility.util.FiniteStormInMemorySpout;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import backtype.storm.utils.Utils;
-
-/**
- * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
- * files in a streaming fashion. The program is constructed as a regular {@link StormTopology}.
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>StormExclamationWithStormSpout <text path> <result path></code><br/>
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>use a Storm spout within a Flink Streaming program</li>
- * <li>make use of the FiniteStormSpout interface</li>
- * </ul>
- */
-public class ExclamationWithStormSpout {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(final String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- // set up the execution environment
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // get input data
- final DataStream<String> text = getTextDataStream(env);
-
- final DataStream<String> exclaimed = text
- .map(new ExclamationMap())
- .map(new ExclamationMap());
-
- // emit result
- if (fileOutput) {
- exclaimed.writeAsText(outputPath);
- } else {
- exclaimed.print();
- }
-
- // execute program
- env.execute("Streaming Exclamation with Storm spout source");
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- private static class ExclamationMap implements MapFunction<String, String> {
- private static final long serialVersionUID = -684993133807698042L;
-
- @Override
- public String map(String value) throws Exception {
- return value + "!!!";
- }
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String textPath;
- private static String outputPath;
-
- private static boolean parseParameters(final String[] args) {
-
- if (args.length > 0) {
- // parse input arguments
- fileOutput = true;
- if (args.length == 2) {
- textPath = args[0];
- outputPath = args[1];
- } else {
- System.err.println("Usage: ExclamationWithStormSpout <text path> <result path>");
- return false;
- }
- } else {
- System.out.println("Executing ExclamationWithStormSpout example with built-in default " +
- "data");
- System.out.println(" Provide parameters to read input data from a file");
- System.out.println(" Usage: ExclamationWithStormSpout <text path> <result path>");
- }
- return true;
- }
-
- private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
- if (fileOutput) {
- final String[] tokens = textPath.split(":");
- final String inputFile = tokens[tokens.length - 1];
-
- // set Storm configuration
- StormConfig config = new StormConfig();
- config.put(FiniteStormFileSpout.INPUT_FILE_PATH, inputFile);
- env.getConfig().setGlobalJobParameters(config);
-
- return env.addSource(
- new FiniteStormSpoutWrapper<String>(new FiniteStormFileSpout(),
- new String[] { Utils.DEFAULT_STREAM_ID }),
- TypeExtractor.getForClass(String.class)).setParallelism(1);
- }
-
- return env.addSource(
- new FiniteStormSpoutWrapper<String>(new FiniteStormInMemorySpout(
- WordCountData.WORDS), new String[] { Utils.DEFAULT_STREAM_ID }),
- TypeExtractor.getForClass(String.class)).setParallelism(1);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
deleted file mode 100644
index c87b3a5..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation;
-
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
-
-/**
- * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
- * files in a streaming fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology}
- * and submitted to Flink for execution in the same way as to a Storm {@link backtype.storm.LocalCluster}.
- * <p/>
- * This example shows how to run program directly within Java, thus it cannot be used to submit a
- * {@link backtype.storm.generated.StormTopology} via Flink command line clients (ie, bin/flink).
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>StormExclamationLocal <text path> <result path></code><br/>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>run a regular Storm program locally on Flink</li>
- * </ul>
- */
-public class StormExclamationLocal {
-
- public final static String topologyId = "Streaming Exclamation";
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(final String[] args) throws Exception {
-
- if (!ExclamationTopology.parseParameters(args)) {
- return;
- }
-
- // build Topology the Storm way
- final FlinkTopologyBuilder builder = ExclamationTopology.buildTopology();
-
- // execute program locally
- Config conf = new Config();
- conf.put(ExclamationBolt.EXCLAMATION_COUNT, ExclamationTopology.getExclamation());
- final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
- cluster.submitTopology(topologyId, conf, builder.createTopology());
-
- Utils.sleep(10 * 1000);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java
deleted file mode 100644
index 0f64301..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation;
-
-import backtype.storm.Config;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.utils.Utils;
-import org.apache.flink.stormcompatibility.api.FlinkClient;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-
-/**
- * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
- * files in a streaming fashion. The program is constructed as a regular {@link StormTopology} and
- * submitted to Flink for execution in the same way as to a Storm cluster similar to
- * {@link NimbusClient}. The Flink cluster can be local or remote.
- * <p/>
- * This example shows how to submit the program via Java, thus it cannot be used to submit a
- * {@link StormTopology} via Flink command line clients (ie, bin/flink).
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>StormExclamationRemoteByClient <text path> <result path></code><br/>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
- * </ul>
- */
-public class StormExclamationRemoteByClient {
-
- public final static String topologyId = "Streaming Exclamation";
- private final static String uploadedJarLocation = "target/flink-storm-examples-0.9-SNAPSHOT-ExclamationStorm.jar";
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(final String[] args) throws AlreadyAliveException, InvalidTopologyException,
- NotAliveException {
-
- if (!ExclamationTopology.parseParameters(args)) {
- return;
- }
-
- // build Topology the Storm way
- final FlinkTopologyBuilder builder = ExclamationTopology.buildTopology();
-
- // execute program on Flink cluster
- final Config conf = new Config();
- // can be changed to remote address
- conf.put(Config.NIMBUS_HOST, "localhost");
- // use default flink jobmanger.rpc.port
- conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
-
- final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);
- cluster.submitTopology(topologyId, uploadedJarLocation, builder.createTopology());
-
- Utils.sleep(5 * 1000);
-
- cluster.killTopology(topologyId);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java
deleted file mode 100644
index d580520..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation;
-
-import backtype.storm.Config;
-import org.apache.flink.stormcompatibility.api.FlinkClient;
-import org.apache.flink.stormcompatibility.api.FlinkSubmitter;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-
-/**
- * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
- * files in a streaming fashion. The program is constructed as a regular {@link StormTopology} and
- * submitted to Flink for execution in the same way as to a Storm cluster similar to
- * {@link StormSubmitter}. The Flink cluster can be local or remote.
- * <p/>
- * This example shows how to submit the program via Java as well as Flink's command line client (ie, bin/flink).
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>StormExclamationRemoteByClient <text path> <result path></code><br/>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
- * </ul>
- */
-public class StormExclamationRemoteBySubmitter {
-
- public final static String topologyId = "Streaming Exclamation";
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(final String[] args) throws Exception {
-
- if (!ExclamationTopology.parseParameters(args)) {
- return;
- }
-
- // build Topology the Storm way
- final FlinkTopologyBuilder builder = ExclamationTopology.buildTopology();
-
- // execute program on Flink cluster
- final Config conf = new Config();
- // We can set Jobmanager host/port values manually or leave them blank
- // if not set and
- // - executed within Java, default values "localhost" and "6123" are set by FlinkSubmitter
- // - executed via bin/flink values from flink-conf.yaml are set by FlinkSubmitter.
- // conf.put(Config.NIMBUS_HOST, "localhost");
- // conf.put(Config.NIMBUS_THRIFT_PORT, new Integer(6123));
-
- // The user jar file must be specified via JVM argument if executed via Java.
- // => -Dstorm.jar=target/flink-storm-examples-0.9-SNAPSHOT-WordCountStorm.jar
- // If bin/flink is used, the jar file is detected automatically.
- FlinkSubmitter.submitTopology(topologyId, conf, builder.createTopology());
-
- Thread.sleep(5 * 1000);
-
- FlinkClient.getConfiguredClient(conf).killTopology(topologyId);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
deleted file mode 100644
index 2709eff..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation.stormoperators;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.util.Map;
-
-public class ExclamationBolt implements IRichBolt {
- private final static long serialVersionUID = -6364882114201311380L;
-
- public final static String EXCLAMATION_COUNT = "exclamation.count";
-
- private OutputCollector collector;
- private String exclamation;
-
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
-
- Object count = conf.get(EXCLAMATION_COUNT);
- if (count != null) {
- int exclamationNum = (Integer) count;
- StringBuilder builder = new StringBuilder();
- for (int index = 0; index < exclamationNum; ++index) {
- builder.append('!');
- }
- this.exclamation = builder.toString();
- } else {
- this.exclamation = "!";
- }
- }
-
- @Override
- public void cleanup() {
- }
-
- @Override
- public void execute(Tuple tuple) {
- collector.emit(tuple, new Values(tuple.getString(0) + this.exclamation));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
deleted file mode 100644
index 79c7125..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.singlejoin;
-
-import backtype.storm.tuple.Fields;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-import org.apache.flink.stormcompatibility.singlejoin.stormoperators.AgeSpout;
-import org.apache.flink.stormcompatibility.singlejoin.stormoperators.GenderSpout;
-import org.apache.flink.stormcompatibility.singlejoin.stormoperators.SingleJoinBolt;
-import org.apache.flink.stormcompatibility.util.OutputFormatter;
-import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
-import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
-import org.apache.flink.stormcompatibility.util.TupleOutputFormatter;
-
-public class SingleJoinTopology {
-
- public final static String spoutId1 = "gender";
- public final static String spoutId2 = "age";
- public final static String boltId = "singleJoin";
- public final static String sinkId = "sink";
- private final static OutputFormatter formatter = new TupleOutputFormatter();
-
- public static FlinkTopologyBuilder buildTopology() {
-
- final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
-
- // get input data
- builder.setSpout(spoutId1, new GenderSpout(new Fields("id", "gender")));
- builder.setSpout(spoutId2, new AgeSpout(new Fields("id", "age")));
-
- builder.setBolt(boltId, new SingleJoinBolt(new Fields("gender", "age")))
- .fieldsGrouping(spoutId1, new Fields("id"))
- .fieldsGrouping(spoutId2, new Fields("id"));
- //.shuffleGrouping(spoutId1)
- //.shuffleGrouping(spoutId2);
-
- // emit result
- if (fileInputOutput) {
- // read the text file from given input path
- final String[] tokens = outputPath.split(":");
- final String outputFile = tokens[tokens.length - 1];
- builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter)).shuffleGrouping(boltId);
- } else {
- builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4).shuffleGrouping(boltId);
- }
-
- return builder;
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileInputOutput = false;
- private static String outputPath;
-
- static boolean parseParameters(final String[] args) {
-
- if (args.length > 0) {
- // parse input arguments
- fileInputOutput = true;
- if (args.length == 1) {
- outputPath = args[0];
- } else {
- System.err.println("Usage: StormSingleJoin* <result path>");
- return false;
- }
- } else {
- System.out.println("Executing StormSingleJoin* example with built-in default data");
- System.out.println(" Provide parameters to read input data from a file");
- System.out.println(" Usage: StormSingleJoin* <result path>");
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
deleted file mode 100644
index d70914a..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.singlejoin;
-
-import backtype.storm.utils.Utils;
-import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-
-public class StormSingleJoinLocal {
- public final static String topologyId = "Streaming SingleJoin";
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(final String[] args) throws Exception {
-
- if (!SingleJoinTopology.parseParameters(args)) {
- return;
- }
-
- // build Topology the Storm way
- final FlinkTopologyBuilder builder = SingleJoinTopology.buildTopology();
-
- // execute program locally
- final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
- cluster.submitTopology(topologyId, null, builder.createTopology());
-
- Utils.sleep(5 * 1000);
-
- // TODO kill does no do anything so far
- cluster.killTopology(topologyId);
- cluster.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
deleted file mode 100644
index 49761c3..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import org.apache.flink.stormcompatibility.util.AbstractStormSpout;
-
-public class AgeSpout extends AbstractStormSpout {
- private static final long serialVersionUID = -4008858647468647019L;
-
- private int counter = 0;
- private String gender;
- private Fields outFields;
-
- public AgeSpout(Fields outFields) {
- this.outFields = outFields;
- }
-
- @Override
- public void nextTuple() {
- if (this.counter < 10) {
- if (counter % 2 == 0) {
- gender = "male";
- } else {
- gender = "female";
- }
- this.collector.emit(new Values(counter, gender));
- counter++;
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(outFields);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
deleted file mode 100644
index 238b6db..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import org.apache.flink.stormcompatibility.util.AbstractStormSpout;
-
-public class GenderSpout extends AbstractStormSpout {
- private static final long serialVersionUID = -5079110197950743927L;
-
- private int counter = 9;
- private Fields outFields;
-
- public GenderSpout(Fields outFields) {
- this.outFields = outFields;
- }
-
- @Override
- public void nextTuple() {
- if (counter >= 0) {
- this.collector.emit(new Values(counter, counter + 20));
- counter--;
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(outFields);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
deleted file mode 100644
index cd53140..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.TimeCacheMap;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-@SuppressWarnings("deprecation")
-public class SingleJoinBolt implements IRichBolt {
- OutputCollector collector;
- Fields idFields;
- Fields outFields;
- int numSources = 2;
- TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> pending;
- Map<String, GlobalStreamId> fieldLocations;
-
- public SingleJoinBolt(Fields outFields) {
- this.outFields = outFields;
- }
-
- @SuppressWarnings({"rawtypes", "null"})
- @Override
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- fieldLocations = new HashMap<String, GlobalStreamId>();
- this.collector = collector;
- int timeout = 100;
- pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
- // numSources = context.getThisSources().size();
- Set<String> idFields = null;
- for (GlobalStreamId source : context.getThisSources().keySet()) {
- Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
- Set<String> setFields = new HashSet<String>(fields.toList());
- if (idFields == null) {
- idFields = setFields;
- } else {
- idFields.retainAll(setFields);
- }
-
- for (String outfield : outFields) {
- for (String sourcefield : fields) {
- if (outfield.equals(sourcefield)) {
- fieldLocations.put(outfield, source);
- }
- }
- }
- }
- this.idFields = new Fields(new ArrayList<String>(idFields));
-
- if (fieldLocations.size() != outFields.size()) {
- throw new RuntimeException("Cannot find all outfields among sources");
- }
- }
-
- @Override
- public void execute(Tuple tuple) {
- List<Object> id = tuple.select(idFields);
- GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
- if (!pending.containsKey(id)) {
- pending.put(id, new HashMap<GlobalStreamId, Tuple>());
- }
- Map<GlobalStreamId, Tuple> parts = pending.get(id);
- if (parts.containsKey(streamId)) {
- throw new RuntimeException("Received same side of single join twice");
- }
- parts.put(streamId, tuple);
- if (parts.size() == numSources) {
- pending.remove(id);
- List<Object> joinResult = new ArrayList<Object>();
- for (String outField : outFields) {
- GlobalStreamId loc = fieldLocations.get(outField);
- joinResult.add(parts.get(loc).getValueByField(outField));
- }
- collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);
-
- for (Tuple part : parts.values()) {
- collector.ack(part);
- }
- }
- }
-
- @Override
- public void cleanup() {
- /* nothing to do */
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(outFields);
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {
- @Override
- public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {
- for (Tuple tuple : tuples.values()) {
- collector.fail(tuple);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java
deleted file mode 100644
index 18251d4..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.split;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.stormcompatibility.split.stormoperators.RandomSpout;
-import org.apache.flink.stormcompatibility.split.stormoperators.VerifyAndEnrichBolt;
-import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
-import org.apache.flink.stormcompatibility.util.SplitStreamMapper;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
-import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * Implements a simple example with two declared output streams for the embedded Spout.
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>handle multiple output stream of a spout</li>
- * <li>accessing each stream by .split(...) and .select(...)</li>
- * <li>strip wrapper data type SplitStreamType for furhter processing in Flink</li>
- * </ul>
- * <p/>
- * This example would work the same way for multiple bolt output streams.
- */
-public class SpoutSplitExample {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(final String[] args) throws Exception {
-
- // set up the execution environment
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- String[] rawOutputs = new String[] { RandomSpout.EVEN_STREAM, RandomSpout.ODD_STREAM };
-
- final DataStream<SplitStreamType<Integer>> numbers = env.addSource(
- new StormSpoutWrapper<SplitStreamType<Integer>>(new RandomSpout(true, 0),
- rawOutputs), TypeExtractor.getForObject(new SplitStreamType<Integer>()));
-
- SplitStream<SplitStreamType<Integer>> splitStream = numbers
- .split(new FlinkStormStreamSelector<Integer>());
-
- DataStream<SplitStreamType<Integer>> evenStream = splitStream.select(RandomSpout.EVEN_STREAM);
- DataStream<SplitStreamType<Integer>> oddStream = splitStream.select(RandomSpout.ODD_STREAM);
-
- evenStream.map(new SplitStreamMapper<Integer>()).returns(Integer.class).map(new Enrich("even")).print();
- oddStream.transform("oddBolt",
- TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
- new StormBoltWrapper<SplitStreamType<Integer>, Tuple2<String, Integer>>(
- new VerifyAndEnrichBolt(false)))
- .print();
-
- // execute program
- env.execute("Spout split stream example");
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- /**
- * Same as {@link VerifyAndEnrichBolt}.
- */
- private final static class Enrich implements MapFunction<Integer, Tuple2<String, Integer>> {
- private static final long serialVersionUID = 5213888269197438892L;
- private final Tuple2<String, Integer> out;
-
- public Enrich(String token) {
- this.out = new Tuple2<String, Integer>(token, 0);
- }
-
- @Override
- public Tuple2<String, Integer> map(Integer value) throws Exception {
- this.out.setField(value, 1);
- return this.out;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/RandomSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/RandomSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/RandomSpout.java
deleted file mode 100644
index 75d710e..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/RandomSpout.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.split.stormoperators;
-
-import java.util.Map;
-import java.util.Random;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-public class RandomSpout extends BaseRichSpout {
- private static final long serialVersionUID = -3978554318742509334L;
-
- public static final String EVEN_STREAM = "even";
- public static final String ODD_STREAM = "odd";
-
- private final boolean split;
- private Random r = new Random();
- private SpoutOutputCollector collector;
-
- public RandomSpout(boolean split, long seed) {
- this.split = split;
- this.r = new Random(seed);
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void nextTuple() {
- int i = r.nextInt();
- if (split) {
- if (i % 2 == 0) {
- this.collector.emit(EVEN_STREAM, new Values(i));
- } else {
- this.collector.emit(ODD_STREAM, new Values(i));
- }
- } else {
- this.collector.emit(new Values(i));
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- Fields schema = new Fields("number");
- if (split) {
- declarer.declareStream(EVEN_STREAM, schema);
- declarer.declareStream(ODD_STREAM, schema);
- } else {
- declarer.declare(schema);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/VerifyAndEnrichBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/VerifyAndEnrichBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/VerifyAndEnrichBolt.java
deleted file mode 100644
index 5853705..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/VerifyAndEnrichBolt.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.split.stormoperators;
-
-import java.util.Map;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-public class VerifyAndEnrichBolt extends BaseRichBolt {
- private static final long serialVersionUID = -7277395570966328721L;
-
- private final boolean evenOrOdd; // true: even -- false: odd
- private final String token;
- private OutputCollector collector;
-
- public VerifyAndEnrichBolt(boolean evenOrOdd) {
- this.evenOrOdd = evenOrOdd;
- this.token = evenOrOdd ? "even" : "odd";
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- if ((input.getInteger(0) % 2 == 0) != this.evenOrOdd) {
- throw new RuntimeException("Invalid number detected.");
- }
- this.collector.emit(new Values(this.token, input.getInteger(0)));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("evenOrOdd", "number"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java
deleted file mode 100644
index b121744..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-
-import java.util.Map;
-
-/**
- * Implements a sink that write the received data so some external output. The result is formatted like
- * {@code (a1, a2, ..., an)} with {@code Object.toString()} for each attribute).
- */
-public abstract class AbstractStormBoltSink implements IRichBolt {
- private static final long serialVersionUID = -1626323806848080430L;
-
- private StringBuilder lineBuilder;
- private String prefix = "";
- private final OutputFormatter formatter;
-
- public AbstractStormBoltSink(final OutputFormatter formatter) {
- this.formatter = formatter;
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public final void prepare(final Map stormConf, final TopologyContext context,
- final OutputCollector collector) {
- this.prepareSimple(stormConf, context);
- if (context.getComponentCommon(context.getThisComponentId()).get_parallelism_hint() > 1) {
- this.prefix = context.getThisTaskId() + "> ";
- }
- }
-
- protected abstract void prepareSimple(final Map<?, ?> stormConf, final TopologyContext context);
-
- @Override
- public final void execute(final Tuple input) {
- this.lineBuilder = new StringBuilder();
- this.lineBuilder.append(this.prefix);
- this.lineBuilder.append(this.formatter.format(input));
- this.writeExternal(this.lineBuilder.toString());
- }
-
- protected abstract void writeExternal(final String line);
-
- @Override
- public void cleanup() {/* nothing to do */}
-
- @Override
- public final void declareOutputFields(final OutputFieldsDeclarer declarer) {/* nothing to do */}
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormSpout.java
deleted file mode 100644
index 4739a2c..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormSpout.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-
-import java.util.Map;
-
-/**
- * Base class for Storm Spout that reads data line by line from an arbitrary source. The declared output schema has a
- * single attribute called {@code line} and should be of type {@link String}.
- */
-public abstract class AbstractStormSpout implements IRichSpout {
- private static final long serialVersionUID = 8876828403487806771L;
-
- public final static String ATTRIBUTE_LINE = "line";
-
- protected SpoutOutputCollector collector;
-
- @SuppressWarnings("rawtypes")
- @Override
- public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void close() {/* noting to do */}
-
- @Override
- public void activate() {/* noting to do */}
-
- @Override
- public void deactivate() {/* noting to do */}
-
- @Override
- public void ack(final Object msgId) {/* noting to do */}
-
- @Override
- public void fail(final Object msgId) {/* noting to do */}
-
- @Override
- public void declareOutputFields(final OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields(ATTRIBUTE_LINE));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
deleted file mode 100644
index d3776fb..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Values;
-
-import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Implements a Storm Spout that reads data from a given local file. The spout stops automatically
- * when it reached the end of the file.
- */
-public class FiniteStormFileSpout extends StormFileSpout implements FiniteStormSpout {
- private static final long serialVersionUID = -1472978008607215864L;
-
- private String line;
- private boolean newLineRead;
-
- public FiniteStormFileSpout() {}
-
- public FiniteStormFileSpout(String path) {
- super(path);
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
- super.open(conf, context, collector);
- newLineRead = false;
- }
-
- @Override
- public void nextTuple() {
- this.collector.emit(new Values(line));
- newLineRead = false;
- }
-
- /**
- * Can be called before nextTuple() any times including 0.
- */
- @Override
- public boolean reachedEnd() {
- try {
- readLine();
- } catch (IOException e) {
- throw new RuntimeException("Exception occured while reading file " + path);
- }
- return line == null;
- }
-
- private void readLine() throws IOException {
- if (!newLineRead) {
- line = reader.readLine();
- newLineRead = true;
- }
- }
-
-}