You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/09/23 01:35:00 UTC
[2/4] storm git commit: STORM-2090: Add integration test for storm
windowing
STORM-2090: Add integration test for storm windowing
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1b812b7b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1b812b7b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1b812b7b
Branch: refs/heads/1.0.x-branch
Commit: 1b812b7bd3078bd52d5447579a8041e990b88028
Parents: ffcd615
Author: Raghav Kumar Gautam <ra...@apache.org>
Authored: Mon Sep 19 10:26:56 2016 -0700
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Sep 23 10:34:22 2016 +0900
----------------------------------------------------------------------
.gitignore | 3 +
.travis.yml | 8 +-
integration-test/README.md | 59 +++
integration-test/config/Vagrantfile | 183 +++++++++
integration-test/config/cluster.xml | 96 +++++
integration-test/config/common.sh | 21 ++
integration-test/config/etc-hosts | 18 +
integration-test/config/install-storm.sh | 39 ++
integration-test/config/install-zookeeper.sh | 20 +
integration-test/config/storm.yaml | 33 ++
integration-test/pom.xml | 250 ++++++++++++
integration-test/run-it.sh | 83 ++++
.../org/apache/storm/ExclamationTopology.java | 93 +++++
.../org/apache/storm/debug/DebugHelper.java | 39 ++
.../storm/st/topology/TestableTopology.java | 30 ++
.../topology/window/SlidingTimeCorrectness.java | 170 +++++++++
.../window/SlidingWindowCorrectness.java | 157 ++++++++
.../window/TumblingTimeCorrectness.java | 167 ++++++++
.../window/TumblingWindowCorrectness.java | 154 ++++++++
.../storm/st/topology/window/data/FromJson.java | 22 ++
.../storm/st/topology/window/data/TimeData.java | 110 ++++++
.../st/topology/window/data/TimeDataWindow.java | 91 +++++
.../apache/storm/st/utils/StringDecorator.java | 37 ++
.../org/apache/storm/st/utils/TimeUtil.java | 54 +++
.../test/java/org/apache/storm/st/DemoTest.java | 85 +++++
.../apache/storm/st/helper/AbstractTest.java | 27 ++
.../apache/storm/st/meta/TestngListener.java | 97 +++++
.../st/tests/window/SlidingWindowTest.java | 187 +++++++++
.../st/tests/window/TumblingWindowTest.java | 99 +++++
.../org/apache/storm/st/utils/AssertUtil.java | 71 ++++
.../org/apache/storm/st/wrapper/LogData.java | 66 ++++
.../apache/storm/st/wrapper/StormCluster.java | 118 ++++++
.../org/apache/storm/st/wrapper/TopoWrap.java | 378 +++++++++++++++++++
.../src/test/resources/storm-conf/storm.yaml | 33 ++
pom.xml | 3 +
35 files changed, 3099 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 5c66aba..636570f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -54,3 +54,6 @@ logs
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
+
+# ignore vagrant files
+/integration-test/config/.vagrant/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 05e24fe..44942e1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -9,6 +9,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+addons:
+ hosts:
+ - node1
env:
- MODULES=storm-core
@@ -23,8 +26,9 @@ before_install:
- nvm install 0.12.2
- nvm use 0.12.2
install: /bin/bash ./dev-tools/travis/travis-install.sh `pwd`
-script: /bin/bash ./dev-tools/travis/travis-script.sh `pwd` $MODULES
-sudo: false
+script:
+ - /bin/bash ./dev-tools/travis/travis-script.sh `pwd` $MODULES && /bin/bash ./integration-test/run-it.sh
+sudo: true
cache:
directories:
- "$HOME/.m2/repository"
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/README.md
----------------------------------------------------------------------
diff --git a/integration-test/README.md b/integration-test/README.md
new file mode 100644
index 0000000..36c70a1
--- /dev/null
+++ b/integration-test/README.md
@@ -0,0 +1,59 @@
+End to end storm integration tests
+==================================
+
+Running tests end-to-end
+------------------------
+Assumption:
+A single version of storm binary zip such as `storm-dist/binary/target/apache-storm-2.0.0-SNAPSHOT.zip` is present
+The following command will bring up a vagrant cluster.
+```sh
+cd integration-test/config
+vagrant up
+```
+This automatically will run `integration-test/run-it.sh`.
+This brings up a vagrant machine, with storm and zookeeper daemons.
+And runs all the tests against it.
+
+Running tests for development & debugging
+=========================================
+```vagrant up``` command is steup as a complete auto-pilot.
+Following describes how we can run individual tests against this vagrant cluster or any other cluster.
+
+Configs for running
+-------------------
+The supplied configuration will run tests against vagrant setup. However, it can be changed to use a different cluster.
+Change `integration-test/src/test/resources/storm.yaml` as necessary.
+
+Running all tests manually
+--------------------------
+To run all tests:
+```sh
+mvn clean package -DskipTests && mvn test
+```
+
+To run a single test:
+```sh
+mvn clean package -DskipTests && mvn test -Dtest=SlidingWindowCountTest
+```
+
+Running tests from IDE
+----------------------
+You might have to enable intellij profile to make your IDE happy.
+Make sure that the following is run before tests are launched.
+```sh
+mvn package -DskipTests
+```
+
+Running tests with custom storm version
+---------------------------------------
+You can supply custom storm version using `-Dstorm.version=<storm-version>` property to all the maven commands.
+```sh
+mvn clean package -DskipTests -Dstorm.version=<storm-version>
+mvn test -Dtest=DemoTest -Dstorm.version=<storm-version>
+```
+
+To find version of the storm that you are running run `storm version` command.
+
+Code
+----
+Start off by looking at file [DemoTest.java](https://github.com/apache/storm/integration-test/blob/master/src/test/java/org/apache/storm/st/DemoTest.java).
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/config/Vagrantfile
----------------------------------------------------------------------
diff --git a/integration-test/config/Vagrantfile b/integration-test/config/Vagrantfile
new file mode 100644
index 0000000..7898d9b
--- /dev/null
+++ b/integration-test/config/Vagrantfile
@@ -0,0 +1,183 @@
+# -*- mode: ruby; compile-command: "vagrant destroy -f; vagrant up" -*-
+# vi: set ft=ruby :
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+require 'uri'
+# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
+VAGRANTFILE_API_VERSION = "2"
+STORM_BOX_TYPE = "hashicorp/precise64"
+STORM_ZIP = Dir.glob("../../storm-dist/binary/target/**/*.zip")
+if(STORM_ZIP.length != 1)
+ raise "expected one storm-binary found: " + STORM_ZIP.join(",")
+end
+STORM_ARCHIVE = STORM_ZIP[0]
+STORM_VERSION = File.basename(STORM_ARCHIVE, '.*')
+STORM_SUPERVISOR_COUNT = 2
+
+Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
+
+ config.vm.box = STORM_BOX_TYPE
+ #config.hostmanager.manage_host = true
+ #config.hostmanager.enabled = true
+
+ if Vagrant.has_plugin?("vagrant-cachier")
+ # Configure cached packages to be shared between instances of the same base box.
+ # More info on the "Usage" link above
+ config.cache.scope = :box
+
+ # OPTIONAL: If you are using VirtualBox, you might want to use that to enable
+ # NFS for shared folders. This is also very useful for vagrant-libvirt if you
+ # want bi-directional sync
+ config.cache.synced_folder_opts = {
+ type: :nfs,
+ # The nolock option can be useful for an NFSv3 client that wants to avoid the
+ # NLM sideband protocol. Without this option, apt-get might hang if it tries
+ # to lock files needed for /var/cache/* operations. All of this can be avoided
+ # by using NFSv4 everywhere. Please note that the tcp option is not the default.
+ mount_options: ['rw', 'vers=3', 'tcp', 'nolock']
+ }
+ end
+
+ if(!File.exist?(STORM_ARCHIVE))
+ `wget -N #{STORM_DIST_URL}`
+ end
+
+ config.vm.synced_folder "../../", "/home/vagrant/build/vagrant/storm"
+ config.vm.synced_folder "~/.m2", "/home/vagrant/.m2"
+
+ config.vm.define "node1" do |node1|
+ node1.vm.provider "virtualbox" do |v|
+ v.customize ["modifyvm", :id, "--natdnshostresolver1", "on"]
+ end
+ node1.vm.network "private_network", ip: "192.168.50.3"
+ node1.vm.hostname = "node1"
+ node1.vm.provision :shell, :inline => "echo run integration test; whoami; env; cd /home/vagrant/build/vagrant/storm/; pwd; bash integration-test/run-it.sh", privileged: false
+ #node1.vm.provision :shell, :inline => "sudo ln -fs /vagrant/etc-hosts /etc/hosts"
+ end
+
+
+ # All Vagrant configuration is done here. The most common configuration
+ # options are documented and commented below. For a complete reference,
+ # please see the online documentation at vagrantup.com.
+
+ # Every Vagrant virtual environment requires a box to build off of.
+ #config.vm.box = "precise32"
+
+ # The url from where the 'config.vm.box' box will be fetched if it
+ # doesn't already exist on the user's system.
+ #config.vm.box_url = "http://files.vagrantup.com/precise32.box"
+
+ # Create a forwarded port mapping which allows access to a specific port
+ # within the machine from a port on the host machine. In the example below,
+ # accessing "localhost:8080" will access port 80 on the guest machine.
+ # config.vm.network :forwarded_port, guest: 8080, host: 8080
+
+ # Create a private network, which allows host-only access to the machine
+ # using a specific IP.
+ config.vm.network :private_network, ip: "192.168.100.100"
+
+ # Create a public network, which generally matched to bridged network.
+ # Bridged networks make the machine appear as another physical device on
+ # your network.
+ # config.vm.network :public_network
+
+ # If true, then any SSH connections made will enable agent forwarding.
+ # Default value: false
+ # config.ssh.forward_agent = true
+
+ # Share an additional folder to the guest VM. The first argument is
+ # the path on the host to the actual folder. The second argument is
+ # the path on the guest to mount the folder. And the optional third
+ # argument is a set of non-required options.
+ # config.vm.synced_folder "../data", "/vagrant_data"
+
+ # Provider-specific configuration so you can fine-tune various
+ # backing providers for Vagrant. These expose provider-specific options.
+ # Example for VirtualBox:
+ #
+ config.vm.provider :virtualbox do |vb|
+ # # Don't boot with headless mode
+ vb.gui = false
+ #
+ # # Use VBoxManage to customize the VM. For example to change memory:
+ vb.customize ["modifyvm", :id, "--memory", "3072"]
+ end
+ #
+ # View the documentation for the provider you're using for more
+ # information on available options.
+
+ # Enable provisioning with Puppet stand alone. Puppet manifests
+ # are contained in a directory path relative to this Vagrantfile.
+ # You will need to create the manifests directory and a manifest in
+ # the file precise32.pp in the manifests_path directory.
+ #
+ # An example Puppet manifest to provision the message of the day:
+ #
+ # # group { "puppet":
+ # # ensure => "present",
+ # # }
+ # #
+ # # File { owner => 0, group => 0, mode => 0644 }
+ # #
+ # # file { '/etc/motd':
+ # # content => "Welcome to your Vagrant-built virtual machine!
+ # # Managed by Puppet.\n"
+ # # }
+ #
+ # config.vm.provision :puppet do |puppet|
+ # puppet.manifests_path = "manifests"
+ # puppet.manifest_file = "site.pp"
+ # end
+
+ # Enable provisioning with chef solo, specifying a cookbooks path, roles
+ # path, and data_bags path (all relative to this Vagrantfile), and adding
+ # some recipes and/or roles.
+ #
+ # config.vm.provision :chef_solo do |chef|
+ # chef.cookbooks_path = "../my-recipes/cookbooks"
+ # chef.roles_path = "../my-recipes/roles"
+ # chef.data_bags_path = "../my-recipes/data_bags"
+ # chef.add_recipe "mysql"
+ # chef.add_role "web"
+ #
+ # # You may also specify custom JSON attributes:
+ # chef.json = { :mysql_password => "foo" }
+ # end
+
+ # Enable provisioning with chef server, specifying the chef server URL,
+ # and the path to the validation key (relative to this Vagrantfile).
+ #
+ # The Opscode Platform uses HTTPS. Substitute your organization for
+ # ORGNAME in the URL and validation key.
+ #
+ # If you have your own Chef Server, use the appropriate URL, which may be
+ # HTTP instead of HTTPS depending on your configuration. Also change the
+ # validation key to validation.pem.
+ #
+ # config.vm.provision :chef_client do |chef|
+ # chef.chef_server_url = "https://api.opscode.com/organizations/ORGNAME"
+ # chef.validation_key_path = "ORGNAME-validator.pem"
+ # end
+ #
+ # If you're using the Opscode platform, your validator client is
+ # ORGNAME-validator, replacing ORGNAME with your organization name.
+ #
+ # If you have your own Chef Server, the default validation client name is
+ # chef-validator, unless you changed the configuration.
+ #
+ # chef.validation_client_name = "ORGNAME-validator"
+end
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/config/cluster.xml
----------------------------------------------------------------------
diff --git a/integration-test/config/cluster.xml b/integration-test/config/cluster.xml
new file mode 100644
index 0000000..d37e6e6
--- /dev/null
+++ b/integration-test/config/cluster.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration scan="true" scanPeriod="60 seconds">
+ <appender name="A1" class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>/var/log/storm/${logfile.name}</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+ <fileNamePattern>/var/log/storm/${logfile.name}.%i</fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>9</maxIndex>
+ </rollingPolicy>
+
+ <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>100MB</maxFileSize>
+ </triggeringPolicy>
+
+ <encoder>
+ <pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ACCESS" class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>/var/log/storm/access.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+ <fileNamePattern>/var/log/storm/access.log.%i</fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>9</maxIndex>
+ </rollingPolicy>
+
+ <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>100MB</maxFileSize>
+ </triggeringPolicy>
+
+ <encoder>
+ <pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="METRICS" class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>/var/log/storm/metrics.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+ <fileNamePattern>metrics.log.%i</fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>9</maxIndex>
+ </rollingPolicy>
+
+ <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>2MB</maxFileSize>
+ </triggeringPolicy>
+
+ <encoder>
+ <pattern>%d %-8r %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="INFO">
+ <appender-ref ref="A1"/>
+ </root>
+
+ <logger name="backtype.storm.messaging.netty">
+ <level value="WARN" />
+ <appender-ref ref="A1" />
+ </logger>
+
+ <logger name="backtype.storm">
+ <level value="DEBUG" />
+ <appender-ref ref="A1" />
+ </logger>
+
+ <logger name="backtype.storm.security.auth.authorizer" additivity="false">
+ <level value="INFO" />
+ <appender-ref ref="ACCESS" />
+ </logger>
+
+ <logger name="backtype.storm.metric.LoggingMetricsConsumer" additivity="false" >
+ <level value="INFO"/>
+ <appender-ref ref="METRICS"/>
+ </logger>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/config/common.sh
----------------------------------------------------------------------
diff --git a/integration-test/config/common.sh b/integration-test/config/common.sh
new file mode 100644
index 0000000..c89319c
--- /dev/null
+++ b/integration-test/config/common.sh
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+USER_SCRIPT="user-script.sh"
+[[ -f $USER_SCRIPT ]] && echo "Running ${USER_SCRIPT}" && bash ${USER_SCRIPT} || echo "${USER_SCRIPT} not found/executed, continuing."
+#apt-get update
+#apt-get --yes remove openjdk-6-jre-headless
+#apt-get --yes install openjdk-7-jdk
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/config/etc-hosts
----------------------------------------------------------------------
diff --git a/integration-test/config/etc-hosts b/integration-test/config/etc-hosts
new file mode 100644
index 0000000..45e2aa6
--- /dev/null
+++ b/integration-test/config/etc-hosts
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+127.0.0.1 localhost localhost
+192.168.100.100 node-1
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/config/install-storm.sh
----------------------------------------------------------------------
diff --git a/integration-test/config/install-storm.sh b/integration-test/config/install-storm.sh
new file mode 100644
index 0000000..2731abe
--- /dev/null
+++ b/integration-test/config/install-storm.sh
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# $1 is the storm binary zip file
+SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+
+groupadd storm
+useradd --gid storm --home-dir /home/storm --create-home --shell /bin/bash storm
+
+unzip -o "$1" -d /usr/share/
+chown -R storm:storm /usr/share/apache-storm*
+ln -s /usr/share/apache-storm* /usr/share/storm
+ln -s /usr/share/storm/bin/storm /usr/bin/storm
+
+mkdir /etc/storm
+chown storm:storm /etc/storm
+
+rm /usr/share/storm/conf/storm.yaml
+cp "${SCRIPT_DIR}/storm.yaml" /usr/share/storm/conf/
+cp "${SCRIPT_DIR}/cluster.xml" /usr/share/storm/logback/
+ln -s /usr/share/storm/conf/storm.yaml /etc/storm/storm.yaml
+
+mkdir /var/log/storm
+chown storm:storm /var/log/storm
+
+#sed -i 's/${storm.home}\/logs/\/var\/log\/storm/g' /usr/share/storm/logback/cluster.xml
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/config/install-zookeeper.sh
----------------------------------------------------------------------
diff --git a/integration-test/config/install-zookeeper.sh b/integration-test/config/install-zookeeper.sh
new file mode 100644
index 0000000..a81a07c
--- /dev/null
+++ b/integration-test/config/install-zookeeper.sh
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+apt-get --yes install zookeeper=3.3.5* zookeeperd=3.3.5*
+service zookeeper stop
+echo maxClientCnxns=200 >> /etc/zookeeper/conf/zoo.cfg
+service zookeeper start
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/config/storm.yaml
----------------------------------------------------------------------
diff --git a/integration-test/config/storm.yaml b/integration-test/config/storm.yaml
new file mode 100644
index 0000000..eca352f
--- /dev/null
+++ b/integration-test/config/storm.yaml
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+storm.zookeeper.servers:
+ - "node1"
+
+nimbus.seeds: ["node1"]
+
+# netty transport
+storm.messaging.transport: "org.apache.storm.messaging.netty.Context"
+storm.messaging.netty.buffer_size: 16384
+storm.messaging.netty.max_retries: 10
+storm.messaging.netty.min_wait_ms: 1000
+storm.messaging.netty.max_wait_ms: 5000
+
+drpc.servers:
+ - "node1"
+
+supervisor.slots.ports: [6700, 6701, 6702, 6703, 6704, 6705, 6706, 6707, 6708, 6709]
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
new file mode 100755
index 0000000..47eb244
--- /dev/null
+++ b/integration-test/pom.xml
@@ -0,0 +1,250 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <name>storm-integration-test</name>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-integration-test</artifactId>
+ <version>0.3</version>
+ <packaging>jar</packaging>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <!-- see comment below... This fixes an annoyance with intellij -->
+ <provided.scope>provided</provided.scope>
+ <storm.version>1.0.1.2.5.0.0-781</storm.version>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <regression.downloadWorkerLogs>false</regression.downloadWorkerLogs>
+ <storm.conf.dir>/etc/storm/conf</storm.conf.dir>
+ <hadoop.conf.dir>/etc/hadoop/conf</hadoop.conf.dir>
+ </properties>
+
+ <profiles>
+ <profile>
+ <id>intellij</id>
+ <properties>
+ <provided.scope>compile</provided.scope>
+ <regression.downloadWorkerLogs>true</regression.downloadWorkerLogs>
+ <storm.conf.dir>src/test/resources/storm-conf/</storm.conf.dir>
+ <hadoop.conf.dir>src/main/resources/hadoop-conf/</hadoop.conf.dir>
+ </properties>
+ </profile>
+ </profiles>
+
+ <repositories>
+ <repository>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ <id>central</id>
+ <url>http://repo1.maven.org/maven2/</url>
+ </repository>
+ <repository>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ <id>clojars</id>
+ <url>https://clojars.org/repo/</url>
+ </repository>
+ <repository>
+ <id>hortonworks</id>
+ <url>http://nexus-private.hortonworks.com/nexus/content/groups/public/</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <version>6.8.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.seleniumhq.selenium</groupId>
+ <artifactId>selenium-firefox-driver</artifactId>
+ <version>2.45.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.seleniumhq.selenium</groupId>
+ <artifactId>selenium-support</artifactId>
+ <version>2.45.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${storm.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-solr</artifactId>
+ <version>${storm.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-starter</artifactId>
+ <version>${storm.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hdfs</artifactId>
+ <version>${storm.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.6</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.18.1</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.1</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.6</version>
+ <configuration>
+ <encoding>UTF-8</encoding>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <redirectTestOutputToFile>${redirectTestOutputToFile}</redirectTestOutputToFile>
+ <forkCount>1C</forkCount>
+ <argLine>-Xmx1024m</argLine>
+ <properties>
+ <property>
+ <name>listener</name>
+ <value>org.apache.storm.st.meta.TestngListener</value>
+ </property>
+ </properties>
+ <systemPropertyVariables>
+ <regression.downloadWorkerLogs>${regression.downloadWorkerLogs}</regression.downloadWorkerLogs>
+ </systemPropertyVariables>
+ <additionalClasspathElements>
+ <!-- Hack to get the dir in CP through CLI and idea -->
+ <additionalClasspathElement>${storm.conf.dir}</additionalClasspathElement>
+ <additionalClasspathElement>${hadoop.conf.dir}</additionalClasspathElement>
+ <additionalClasspathElement>${extra.classpath.1}</additionalClasspathElement>
+ <additionalClasspathElement>${extra.classpath.2}</additionalClasspathElement>
+ <additionalClasspathElement>${extra.classpath.3}</additionalClasspathElement>
+ <additionalClasspathElement>${extra.classpath.4}</additionalClasspathElement>
+ <additionalClasspathElement>${extra.classpath.5}</additionalClasspathElement>
+ <additionalClasspathElement>${extra.classpath.6}</additionalClasspathElement>
+ </additionalClasspathElements>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.sf</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.dsa</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>META-INF/*.rsa</exclude>
+ <exclude>META-INF/*.EC</exclude>
+ <exclude>META-INF/*.ec</exclude>
+ <exclude>META-INF/MSFTSIG.SF</exclude>
+ <exclude>META-INF/MSFTSIG.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>${hadoop.conf.dir}</directory>
+ </resource>
+ </resources>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/run-it.sh
----------------------------------------------------------------------
diff --git a/integration-test/run-it.sh b/integration-test/run-it.sh
new file mode 100755
index 0000000..3b4a861
--- /dev/null
+++ b/integration-test/run-it.sh
@@ -0,0 +1,83 @@
+#!/usr/bin/env bash
+# -*- compile-command: "cd config/ && vagrant destroy -f; vagrant up" -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+set -x
+set -e
+SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+echo SCRIPT_DIR="${SCRIPT_DIR}"
+STORM_SRC_DIR=$(dirname ${SCRIPT_DIR})
+echo SCRIPT_SRC_DIR="${SCRIPT_SRC_DIR}"
+function die() {
+ echo $*
+ exit 1
+}
+function list_storm_processes() {
+ (ps -ef | grep -i -e zookeeper | grep -v grep) && (ps -ef | grep -i -e storm.home | grep -v grep)
+}
+
+list_storm_processes || true
+# increasing swap space so we can run lots of workers
+sudo dd if=/dev/zero of=/swapfile.img bs=8192 count=1M
+sudo mkswap /swapfile.img
+sudo swapon /swapfile.img
+
+if [[ "${USER}" == "vagrant" ]]; then # install oracle jdk8
+ sudo apt-get update
+ sudo apt-get -y install python-software-properties
+ sudo apt-add-repository -y ppa:webupd8team/java
+ sudo apt-get update
+ echo "oracle-java8-installer shared/accepted-oracle-license-v1-1 select true" | sudo debconf-set-selections
+ sudo apt-get install -y oracle-java8-installer
+ sudo apt-get -y install maven
+ java -version
+ mvn --version
+ export MAVEN_OPTS="-Xmx3000m"
+else
+ ( while true; do echo "heartbeat"; sleep 300; done ) & #heartbeat needed by travis ci
+ (cd ${STORM_SRC_DIR} && mvn clean install -DskipTests=true) || die "maven install command failed"
+ (cd ${STORM_SRC_DIR}/storm-dist/binary && mvn package -Dgpg.skip=true) || die "maven package command failed"
+fi
+storm_binary_zip=$(find ${STORM_SRC_DIR}/storm-dist -iname '*.zip')
+storm_binary_name=$(basename ${storm_binary_zip})
+export STORM_VERSION=$(grep -oPe '\d.*(?=.zip)' <<<${storm_binary_name})
+echo "Using storm version:" ${STORM_VERSION}
+
+# setup storm cluster
+list_storm_processes || true
+sudo bash ${SCRIPT_DIR}/config/common.sh
+sudo bash ${SCRIPT_DIR}/config/install-zookeeper.sh
+sudo bash ${SCRIPT_DIR}/config/install-storm.sh $storm_binary_zip
+export JAVA_HOME
+env
+function start_storm_process() {
+ echo starting: storm $1
+ sudo su storm -c "export JAVA_HOME=${JAVA_HOME} && cd /home/storm && storm $1" &
+}
+start_storm_process nimbus
+start_storm_process ui
+start_storm_process supervisor
+start_storm_process logviewer
+#start_storm_process drpc
+pushd "${SCRIPT_DIR}"
+mvn clean package -DskipTests -Dstorm.version=${STORM_VERSION}
+for i in {1..20} ; do
+ list_storm_processes && break
+ sleep 6
+done
+list_storm_processes
+mvn test -DfailIfNoTests=false -Dstorm.version=${STORM_VERSION} -Dui.url=http://localhost:8744
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java b/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java
new file mode 100644
index 0000000..d464608
--- /dev/null
+++ b/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+
+/**
+ * This is a basic example of a Storm topology.
+ */
+public class ExclamationTopology {
+
+ public static final String WORD = "word";
+ public static final String EXCLAIM_1 = "exclaim1";
+ public static final String EXCLAIM_2 = "exclaim2";
+
+ public static class ExclamationBolt extends BaseRichBolt {
+ OutputCollector _collector;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+ _collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+ _collector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ StormTopology topology = getStormTopology();
+
+ Config conf = new Config();
+ conf.setDebug(true);
+
+ if (args != null && args.length > 0) {
+ conf.setNumWorkers(3);
+
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, topology);
+ }
+ else {
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", conf, topology);
+ Utils.sleep(10000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ }
+ }
+
+ public static StormTopology getStormTopology() {
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(WORD, new TestWordSpout(), 10);
+ builder.setBolt(EXCLAIM_1, new ExclamationTopology.ExclamationBolt(), 3).shuffleGrouping(WORD);
+ builder.setBolt(EXCLAIM_2, new ExclamationTopology.ExclamationBolt(), 2).shuffleGrouping(EXCLAIM_1);
+ return builder.createTopology();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/debug/DebugHelper.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/debug/DebugHelper.java b/integration-test/src/main/java/org/apache/storm/debug/DebugHelper.java
new file mode 100644
index 0000000..97c0554
--- /dev/null
+++ b/integration-test/src/main/java/org/apache/storm/debug/DebugHelper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.debug;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+
+public class DebugHelper {
+ private static final Logger LOG = LoggerFactory.getLogger(DebugHelper.class);
+
+ public static void printClassPath() {
+ URL[] urls = getClassPaths();
+ LOG.info("classpath:" + StringUtils.join(urls, ':'));
+ }
+
+ public static URL[] getClassPaths() {
+ ClassLoader cl = ClassLoader.getSystemClassLoader();
+ return ((URLClassLoader)cl).getURLs();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/topology/TestableTopology.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/TestableTopology.java b/integration-test/src/main/java/org/apache/storm/st/topology/TestableTopology.java
new file mode 100644
index 0000000..3823310
--- /dev/null
+++ b/integration-test/src/main/java/org/apache/storm/st/topology/TestableTopology.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.topology;
+
+import org.apache.storm.generated.StormTopology;
+
+import java.util.List;
+
+public interface TestableTopology {
+ String DUMMY_FIELD = "dummy";
+ List<String> getExpectedOutput();
+ StormTopology newTopology();
+ String getBoltName();
+ String getSpoutName();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java
new file mode 100644
index 0000000..430449b
--- /dev/null
+++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.topology.window;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.st.topology.TestableTopology;
+import org.apache.storm.st.topology.window.data.TimeData;
+import org.apache.storm.st.utils.StringDecorator;
+import org.apache.storm.st.utils.TimeUtil;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Computes sliding window sum
+ */
+public class SlidingTimeCorrectness implements TestableTopology {
+ private static final Logger LOG = LoggerFactory.getLogger(SlidingTimeCorrectness.class);
+ private final int windowSec;
+ private final int slideSec;
+ private final String spoutName;
+ private final String boltName;
+
+ public SlidingTimeCorrectness(int windowSec, int slideSec) {
+ this.windowSec = windowSec;
+ this.slideSec = slideSec;
+ final String prefix = this.getClass().getSimpleName() + "-winSec" + windowSec + "slideSec" + slideSec;
+ spoutName = prefix + "IncrementingSpout";
+ boltName = prefix + "VerificationBolt";
+ }
+
+ public String getBoltName() {
+ return boltName;
+ }
+
+ public String getSpoutName() {
+ return spoutName;
+ }
+
+ public StormTopology newTopology() {
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(getSpoutName(), new IncrementingSpout(), 2);
+ builder.setBolt(getBoltName(),
+ new VerificationBolt()
+ .withWindow(new BaseWindowedBolt.Duration(windowSec, TimeUnit.SECONDS),
+ new BaseWindowedBolt.Duration(slideSec, TimeUnit.SECONDS))
+ .withTimestampField(TimeData.getTimestampFieldName())
+ .withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)),
+ 1)
+ .globalGrouping(getSpoutName());
+ return builder.createTopology();
+ }
+
+ public List<String> getExpectedOutput() {
+ return Lists.newArrayList(
+ StringDecorator.decorate(getBoltName(), "tuplesInWindow.size() = " + windowSec),
+ StringDecorator.decorate(getBoltName(), "newTuples.size() = " + slideSec),
+ StringDecorator.decorate(getBoltName(), "expiredTuples.size() = " + slideSec)
+ );
+ }
+
+ public static class IncrementingSpout extends BaseRichSpout {
+ private static final Logger LOG = LoggerFactory.getLogger(IncrementingSpout.class);
+ private SpoutOutputCollector collector;
+ private static int currentNum;
+ private static Random rng = new Random();
+ private String componentId;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(TimeData.getFields());
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ componentId = context.getThisComponentId();
+ this.collector = collector;
+ }
+
+ @Override
+ public void nextTuple() {
+ TimeUtil.sleepMilliSec(rng.nextInt(800));
+ currentNum++;
+ TimeData data = TimeData.newData(currentNum);
+ final Values tuple = data.getValues();
+ collector.emit(tuple);
+ LOG.info(StringDecorator.decorate(componentId, data.toString()));
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ LOG.info("Received ACK for msgId : " + msgId);
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ LOG.info("Received FAIL for msgId : " + msgId);
+ }
+ }
+
+ public static class VerificationBolt extends BaseWindowedBolt {
+ private OutputCollector collector;
+ private String componentId;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ componentId = context.getThisComponentId();
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(TupleWindow inputWindow) {
+ List<Tuple> tuplesInWindow = inputWindow.get();
+ List<Tuple> newTuples = inputWindow.getNew();
+ List<Tuple> expiredTuples = inputWindow.getExpired();
+ LOG.info("tuplesInWindow.size() = " + tuplesInWindow.size());
+ LOG.info("newTuples.size() = " + newTuples.size());
+ LOG.info("expiredTuples.size() = " + expiredTuples.size());
+ Collection<TimeData> dataInWindow = Collections2.transform(tuplesInWindow, new Function<Tuple, TimeData>() {
+ @Nullable
+ @Override
+ public TimeData apply(@Nullable Tuple input) {
+ return TimeData.fromTuple(input);
+ }
+ });
+ final String jsonData = TimeData.toString(dataInWindow);
+ LOG.info(StringDecorator.decorate(componentId, jsonData));
+ collector.emit(new Values("dummyValue"));
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(DUMMY_FIELD));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java
new file mode 100644
index 0000000..33ee004
--- /dev/null
+++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.topology.window;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+import org.apache.storm.st.topology.TestableTopology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.st.utils.StringDecorator;
+import org.apache.storm.st.utils.TimeUtil;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Computes sliding window sum
+ */
+public class SlidingWindowCorrectness implements TestableTopology {
+ private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowCorrectness.class);
+ private static final String NUMBER_FIELD = "number";
+ private static final String STRING_FIELD = "numAsStr";
+ private final int windowSize;
+ private final int slideSize;
+ private final String spoutName;
+ private final String boltName;
+
+ public SlidingWindowCorrectness(int windowSize, int slideSize) {
+ this.windowSize = windowSize;
+ this.slideSize = slideSize;
+ final String prefix = this.getClass().getSimpleName() + "-winSize" + windowSize + "slideSize" + slideSize;
+ spoutName = prefix + "IncrementingSpout";
+ boltName = prefix + "VerificationBolt";
+ }
+
+ public String getBoltName() {
+ return boltName;
+ }
+
+ public String getSpoutName() {
+ return spoutName;
+ }
+
+ public StormTopology newTopology() {
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(getSpoutName(), new IncrementingSpout(), 1);
+ builder.setBolt(getBoltName(),
+ new VerificationBolt()
+ .withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))
+ .withWindow(new BaseWindowedBolt.Count(windowSize), new BaseWindowedBolt.Count(slideSize)),
+ 1)
+ .shuffleGrouping(getSpoutName());
+ return builder.createTopology();
+ }
+
+ public List<String> getExpectedOutput() {
+ return Lists.newArrayList(
+ StringDecorator.decorate(getBoltName(), "tuplesInWindow.size() = " + windowSize),
+ StringDecorator.decorate(getBoltName(), "newTuples.size() = " + slideSize),
+ StringDecorator.decorate(getBoltName(), "expiredTuples.size() = " + slideSize)
+ );
+ }
+
+ public static class IncrementingSpout extends BaseRichSpout {
+ private static final Logger LOG = LoggerFactory.getLogger(IncrementingSpout.class);
+ private SpoutOutputCollector collector;
+ private static int currentNum;
+ private static Random rng = new Random();
+ private String componentId;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(NUMBER_FIELD, STRING_FIELD));
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ componentId = context.getThisComponentId();
+ this.collector = collector;
+ }
+
+ @Override
+ public void nextTuple() {
+ TimeUtil.sleepMilliSec(rng.nextInt(10));
+ currentNum++;
+ final String numAsStr = "str(" + currentNum + ")str";
+ final Values tuple = new Values(currentNum, numAsStr);
+ LOG.info(StringDecorator.decorate(componentId, tuple.toString()));
+ collector.emit(tuple, currentNum);
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ LOG.info("Received ACK for msgId : " + msgId);
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ LOG.info("Received FAIL for msgId : " + msgId);
+ }
+ }
+
+ public static class VerificationBolt extends BaseWindowedBolt {
+ private OutputCollector collector;
+ private String componentId;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ componentId = context.getThisComponentId();
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(TupleWindow inputWindow) {
+ List<Tuple> tuplesInWindow = inputWindow.get();
+ List<Tuple> newTuples = inputWindow.getNew();
+ List<Tuple> expiredTuples = inputWindow.getExpired();
+ LOG.info("tuplesInWindow.size() = " + tuplesInWindow.size());
+ LOG.info("newTuples.size() = " + newTuples.size());
+ LOG.info("expiredTuples.size() = " + expiredTuples.size());
+ LOG.info(StringDecorator.decorate(componentId, "tuplesInWindow = " + tuplesInWindow.toString()));
+ collector.emit(new Values("dummyValue"));
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(DUMMY_FIELD));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
new file mode 100644
index 0000000..a77836f
--- /dev/null
+++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.topology.window;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.st.topology.TestableTopology;
+import org.apache.storm.st.topology.window.data.TimeData;
+import org.apache.storm.st.utils.TimeUtil;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+import org.apache.storm.st.utils.StringDecorator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Computes sliding window sum
+ */
+public class TumblingTimeCorrectness implements TestableTopology {
+ private static final Logger LOG = LoggerFactory.getLogger(TumblingTimeCorrectness.class);
+ private final int tumbleSec;
+ private final String spoutName;
+ private final String boltName;
+
+ public TumblingTimeCorrectness(int timbleSec) {
+ this.tumbleSec = timbleSec;
+ final String prefix = this.getClass().getSimpleName() + "-timbleSec" + timbleSec;
+ spoutName = prefix + "IncrementingSpout";
+ boltName = prefix + "VerificationBolt";
+ }
+
+ public String getBoltName() {
+ return boltName;
+ }
+
+ public String getSpoutName() {
+ return spoutName;
+ }
+
+ public StormTopology newTopology() {
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(getSpoutName(), new IncrementingSpout(), 2);
+ builder.setBolt(getBoltName(),
+ new VerificationBolt()
+ .withTumblingWindow(new BaseWindowedBolt.Duration(tumbleSec, TimeUnit.SECONDS))
+ .withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))
+ .withTimestampField(TimeData.getTimestampFieldName()),
+ 1)
+ .globalGrouping(getSpoutName());
+ return builder.createTopology();
+ }
+
+ public List<String> getExpectedOutput() {
+ return Lists.newArrayList(
+ StringDecorator.decorate(getBoltName(), "tuplesInWindow.size() = " + tumbleSec),
+ StringDecorator.decorate(getBoltName(), "newTuples.size() = " + tumbleSec),
+ StringDecorator.decorate(getBoltName(), "expiredTuples.size() = " + tumbleSec)
+ );
+ }
+
+ public static class IncrementingSpout extends BaseRichSpout {
+ private static final Logger LOG = LoggerFactory.getLogger(IncrementingSpout.class);
+ private SpoutOutputCollector collector;
+ private static int currentNum;
+ private static Random rng = new Random();
+ private String componentId;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(TimeData.getFields());
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ componentId = context.getThisComponentId();
+ this.collector = collector;
+ }
+
+ @Override
+ public void nextTuple() {
+ TimeUtil.sleepMilliSec(rng.nextInt(800));
+ currentNum++;
+ TimeData data = TimeData.newData(currentNum);
+ final Values tuple = data.getValues();
+ collector.emit(tuple);
+ LOG.info(StringDecorator.decorate(componentId, data.toString()));
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ LOG.info("Received ACK for msgId : " + msgId);
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ LOG.info("Received FAIL for msgId : " + msgId);
+ }
+ }
+
+ public static class VerificationBolt extends BaseWindowedBolt {
+ private OutputCollector collector;
+ private String componentId;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ componentId = context.getThisComponentId();
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(TupleWindow inputWindow) {
+ List<Tuple> tuplesInWindow = inputWindow.get();
+ List<Tuple> newTuples = inputWindow.getNew();
+ List<Tuple> expiredTuples = inputWindow.getExpired();
+ LOG.info("tuplesInWindow.size() = " + tuplesInWindow.size());
+ LOG.info("newTuples.size() = " + newTuples.size());
+ LOG.info("expiredTuples.size() = " + expiredTuples.size());
+ Collection<TimeData> dataInWindow = Collections2.transform(tuplesInWindow, new Function<Tuple, TimeData>() {
+ @Nullable
+ @Override
+ public TimeData apply(@Nullable Tuple input) {
+ return TimeData.fromTuple(input);
+ }
+ });
+ final String jsonData = TimeData.toString(dataInWindow);
+ LOG.info(StringDecorator.decorate(componentId, jsonData));
+ collector.emit(new Values("dummyValue"));
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(DUMMY_FIELD));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
new file mode 100644
index 0000000..22c6d75
--- /dev/null
+++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.topology.window;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+import org.apache.storm.st.topology.TestableTopology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.st.utils.StringDecorator;
+import org.apache.storm.st.utils.TimeUtil;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Computes sliding window sum
+ */
+public class TumblingWindowCorrectness implements TestableTopology {
+ private static final Logger LOG = LoggerFactory.getLogger(TumblingWindowCorrectness.class);
+ private static final String NUMBER_FIELD = "number";
+ private static final String STRING_FIELD = "numAsStr";
+ private final int tumbleSize;
+ private final String spoutName;
+ private final String boltName;
+
+ public TumblingWindowCorrectness(final int tumbleSize) {
+ this.tumbleSize = tumbleSize;
+ final String prefix = this.getClass().getSimpleName() + "-tubleSize" + tumbleSize;
+ spoutName = prefix + "IncrementingSpout";
+ boltName = prefix + "VerificationBolt";
+ }
+
+ public String getBoltName() {
+ return boltName;
+ }
+
+ public String getSpoutName() {
+ return spoutName;
+ }
+
+ public StormTopology newTopology() {
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(getSpoutName(), new IncrementingSpout(), 1);
+ builder.setBolt(getBoltName(),
+ new VerificationBolt()
+ .withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))
+ .withTumblingWindow(new BaseWindowedBolt.Count(tumbleSize)), 1)
+ .shuffleGrouping(getSpoutName());
+ return builder.createTopology();
+ }
+
+ public List<String> getExpectedOutput() {
+ return Lists.newArrayList(
+ StringDecorator.decorate(getBoltName(), "tuplesInWindow.size() = " + tumbleSize),
+ StringDecorator.decorate(getBoltName(), "newTuples.size() = " + tumbleSize),
+ StringDecorator.decorate(getBoltName(), "expiredTuples.size() = " + tumbleSize)
+ );
+ }
+
+ public static class IncrementingSpout extends BaseRichSpout {
+ private static final Logger LOG = LoggerFactory.getLogger(IncrementingSpout.class);
+ private SpoutOutputCollector collector;
+ private static int currentNum;
+ private static Random rng = new Random();
+ private String componentId;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(NUMBER_FIELD, STRING_FIELD));
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ componentId = context.getThisComponentId();
+ this.collector = collector;
+ }
+
+ @Override
+ public void nextTuple() {
+ TimeUtil.sleepMilliSec(rng.nextInt(10));
+ currentNum++;
+ final String numAsStr = "str(" + currentNum + ")str";
+ final Values tuple = new Values(currentNum, numAsStr);
+ LOG.info(StringDecorator.decorate(componentId, tuple.toString()));
+ collector.emit(tuple, currentNum);
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ LOG.info("Received ACK for msgId : " + msgId);
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ LOG.info("Received FAIL for msgId : " + msgId);
+ }
+ }
+
+ public static class VerificationBolt extends BaseWindowedBolt {
+ private OutputCollector collector;
+ private String componentId;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ componentId = context.getThisComponentId();
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(TupleWindow inputWindow) {
+ List<Tuple> tuplesInWindow = inputWindow.get();
+ List<Tuple> newTuples = inputWindow.getNew();
+ List<Tuple> expiredTuples = inputWindow.getExpired();
+ LOG.info("tuplesInWindow.size() = " + tuplesInWindow.size());
+ LOG.info("newTuples.size() = " + newTuples.size());
+ LOG.info("expiredTuples.size() = " + expiredTuples.size());
+ LOG.info(StringDecorator.decorate(componentId, "tuplesInWindow = " + tuplesInWindow.toString()));
+ collector.emit(new Values("dummyValue"));
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(DUMMY_FIELD));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/FromJson.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/data/FromJson.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/FromJson.java
new file mode 100644
index 0000000..d749abf
--- /dev/null
+++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/FromJson.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.topology.window.data;
+
+public interface FromJson<T> {
+ T fromJson(String jsonStr);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeData.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeData.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeData.java
new file mode 100644
index 0000000..cd2c7a5
--- /dev/null
+++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeData.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.topology.window.data;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.Collection;
+import java.util.Date;
+
+public class TimeData implements Comparable<TimeData>, FromJson<TimeData> {
+ public static final TimeData CLS = new TimeData(-1);
+ private static final String NUMBER_FIELD_NAME = "number";
+ private static final String STRING_FIELD_NAME = "dateAsStr";
+ private static final String TIMESTAMP_FIELD_NAME = "date";
+ static final Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").create();
+ private final int num;
+ private final Date now;
+ private final long timestamp;
+
+ private TimeData(int num) {
+ this(num, new Date());
+ }
+
+ private TimeData(int num, Date date) {
+ this.num = num;
+ this.now = date;
+ this.timestamp = date.getTime();
+ }
+
+ public static TimeData newData(int num) {
+ return new TimeData(num);
+ }
+
+ public static TimeData fromTuple(Tuple tuple) {
+ return new TimeData(tuple.getIntegerByField(NUMBER_FIELD_NAME), new Date(tuple.getLongByField(TIMESTAMP_FIELD_NAME)));
+ }
+
+ public TimeData fromJson(String jsonStr) {
+ return gson.fromJson(jsonStr, TimeData.class);
+ }
+
+ public String toString() {
+ return gson.toJson(this);
+ }
+
+ public static String toString(Collection<TimeData> elements) {
+ return gson.toJson(elements);
+ }
+
+ public Values getValues() {
+ return new Values(num, now.toString(), timestamp);
+ }
+
+ public static String getTimestampFieldName() {
+ return TIMESTAMP_FIELD_NAME;
+ }
+
+ public Date getDate() {
+ return now;
+ }
+
+ public static Fields getFields() {
+ return new Fields(NUMBER_FIELD_NAME, STRING_FIELD_NAME, TIMESTAMP_FIELD_NAME);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TimeData data = (TimeData) o;
+
+ if (num != data.num) return false;
+ if (timestamp != data.timestamp) return false;
+ return now.equals(data.now);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = num;
+ result = 31 * result + now.hashCode();
+ result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public int compareTo(TimeData o) {
+ return Long.compare(timestamp, o.timestamp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeDataWindow.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeDataWindow.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeDataWindow.java
new file mode 100644
index 0000000..d6cb9d6
--- /dev/null
+++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/data/TimeDataWindow.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.topology.window.data;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.gson.reflect.TypeToken;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class TimeDataWindow extends ArrayList<TimeData> implements FromJson<TimeDataWindow> {
+ public static final TimeDataWindow CLS = new TimeDataWindow();
+ private static final Type listType = new TypeToken<List<TimeData>>() {}.getType();
+
+ private TimeDataWindow() {
+ }
+
+ private TimeDataWindow(List<TimeData> data) {
+ super(data);
+ }
+
+ public static TimeDataWindow newInstance(Collection<TimeData> data) {
+ final List<TimeData> dataCopy = new ArrayList<>(data);
+ Collections.sort(dataCopy);
+ return new TimeDataWindow(dataCopy);
+ }
+
+ public static TimeDataWindow newInstance(Collection<TimeData> data, Predicate<TimeData> predicate) {
+ return newInstance(Collections2.filter(data, predicate));
+ }
+
+ public static TimeDataWindow newInstance(Collection<TimeData> data, final DateTime fromDate, final DateTime toDate) {
+ return TimeDataWindow.newInstance(data, new Predicate<TimeData>() {
+ @Override
+ public boolean apply(@Nullable TimeData input) {
+ if (input == null) {
+ return false;
+ }
+ final DateTime inputDate = new DateTime(input.getDate());
+ return inputDate.isAfter(fromDate) && inputDate.isBefore(toDate.plusMillis(1));
+ }
+ });
+ }
+
+ public TimeData first() {
+ return get(0);
+ }
+
+ public TimeData last() {
+ return get(size()-1);
+ }
+
+ public String getDescription() {
+ final int size = size();
+ if (size > 0) {
+ final TimeData first = first();
+ final TimeData last = last();
+ return "Total " + size + " items: " + first + " to " + last;
+ }
+ return "Total " + size + " items.";
+ }
+
+ public TimeDataWindow fromJson(String jsonStr) {
+ final List<TimeData> dataList = TimeData.gson.fromJson(jsonStr, listType);
+ Collections.sort(dataList);
+ return TimeDataWindow.newInstance(dataList);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1b812b7b/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java b/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java
new file mode 100644
index 0000000..34c2b65
--- /dev/null
+++ b/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.st.utils;
+
+import org.apache.commons.lang.StringUtils;
+
+public class StringDecorator {
+
+ private static final String UNIQUE_PREFIX = "---bed91874d79720f7e324c43d49dba4ff---";
+
+ public static String decorate(String componentId, String decorate) {
+ return componentId + UNIQUE_PREFIX + decorate;
+ }
+
+ public static boolean isDecorated(String str) {
+ return str != null && str.contains(UNIQUE_PREFIX);
+ }
+
+ public static String[] split2(String decoratedString) {
+ return StringUtils.splitByWholeSeparator(decoratedString, UNIQUE_PREFIX, 2);
+ }
+}