You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/05/23 18:16:03 UTC
[1/3] storm git commit: STORM-2525: Fix flaky integration tests
Repository: storm
Updated Branches:
refs/heads/master d67aaf344 -> 1a380ede3
STORM-2525: Fix flaky integration tests
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1db86744
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1db86744
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1db86744
Branch: refs/heads/master
Commit: 1db867449c2e2dd8b17cbe464f5ac9ac972f8ff7
Parents: 2a0c168
Author: Stig Rohde Døssing <st...@gmail.com>
Authored: Sat May 20 19:46:03 2017 +0200
Committer: Stig Rohde Døssing <st...@gmail.com>
Committed: Sun May 21 00:14:50 2017 +0200
----------------------------------------------------------------------
integration-test/config/Vagrantfile | 8 +-
integration-test/config/cluster.xml | 101 -------------------
integration-test/config/install-storm.sh | 1 -
integration-test/config/install-zookeeper.sh | 3 +-
integration-test/pom.xml | 2 +-
integration-test/run-it.sh | 10 +-
.../topology/window/SlidingTimeCorrectness.java | 4 +-
.../window/SlidingWindowCorrectness.java | 2 -
.../window/TumblingTimeCorrectness.java | 4 +-
.../window/TumblingWindowCorrectness.java | 2 -
.../apache/storm/st/utils/StringDecorator.java | 1 +
.../st/tests/window/SlidingWindowTest.java | 44 ++++----
.../org/apache/storm/st/wrapper/TopoWrap.java | 81 +++++++--------
pom.xml | 23 ++++-
storm-client/pom.xml | 7 +-
.../storm/topology/WindowedBoltExecutor.java | 6 +-
.../apache/storm/windowing/EvictionContext.java | 2 +-
.../apache/storm/windowing/EvictionPolicy.java | 2 +-
.../storm/windowing/TimeEvictionPolicy.java | 2 +-
.../storm/windowing/TimeTriggerPolicy.java | 4 +-
.../windowing/WaterMarkEventGenerator.java | 2 +-
.../windowing/WatermarkCountEvictionPolicy.java | 9 +-
.../windowing/WatermarkTimeEvictionPolicy.java | 9 +-
.../apache/storm/windowing/WindowManager.java | 3 +
.../storm/windowing/WindowManagerTest.java | 48 ++++++++-
25 files changed, 185 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/config/Vagrantfile
----------------------------------------------------------------------
diff --git a/integration-test/config/Vagrantfile b/integration-test/config/Vagrantfile
index 740d0b0..def461d 100644
--- a/integration-test/config/Vagrantfile
+++ b/integration-test/config/Vagrantfile
@@ -19,7 +19,7 @@
require 'uri'
# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
VAGRANTFILE_API_VERSION = "2"
-STORM_BOX_TYPE = "hashicorp/precise64"
+STORM_BOX_TYPE = "ubuntu/xenial64"
STORM_ZIP = Dir.glob("../../storm-dist/binary/**/*.zip")
if(STORM_ZIP.length != 1)
raise "Expected one storm-binary found: " + STORM_ZIP.join(",") + ". Did you run : cd ${STORM_SRC_DIR}/storm-dist/binary && mvn clean package -Dgpg.skip=true"
@@ -53,8 +53,8 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
}
end
- config.vm.synced_folder "../../", "/home/vagrant/build/vagrant/storm"
- config.vm.synced_folder "~/.m2", "/home/vagrant/.m2"
+ config.vm.synced_folder "../../", "/home/ubuntu/build/vagrant/storm"
+ config.vm.synced_folder "~/.m2", "/home/ubuntu/.m2"
config.vm.define "node1" do |node1|
node1.vm.provider "virtualbox" do |v|
@@ -62,7 +62,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
end
node1.vm.network "private_network", ip: "192.168.50.3"
node1.vm.hostname = "node1"
- node1.vm.provision :shell, :inline => "echo run integration test; whoami; env; cd /home/vagrant/build/vagrant/storm/; pwd; bash integration-test/run-it.sh", privileged: false
+ node1.vm.provision :shell, :inline => "echo run integration test; whoami; env; cd /home/ubuntu/build/vagrant/storm/; pwd; bash integration-test/run-it.sh", privileged: false
#node1.vm.provision :shell, :inline => "sudo ln -fs /vagrant/etc-hosts /etc/hosts"
end
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/config/cluster.xml
----------------------------------------------------------------------
diff --git a/integration-test/config/cluster.xml b/integration-test/config/cluster.xml
deleted file mode 100644
index 97968e4..0000000
--- a/integration-test/config/cluster.xml
+++ /dev/null
@@ -1,101 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<configuration scan="true" scanPeriod="60 seconds">
- <properties>
- <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n</property>
- <property name="patternMetrics">%d %-8r %m%n</property>
- </properties>
-
- <appender name="A1" class="ch.qos.logback.core.rolling.RollingFileAppender">
- <file>/var/log/storm/${logfile.name}</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
- <fileNamePattern>/var/log/storm/${logfile.name}.%i</fileNamePattern>
- <minIndex>1</minIndex>
- <maxIndex>9</maxIndex>
- </rollingPolicy>
-
- <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
- <maxFileSize>100MB</maxFileSize>
- </triggeringPolicy>
-
- <encoder>
- <pattern>${pattern}</pattern>
- </encoder>
- </appender>
-
- <appender name="ACCESS" class="ch.qos.logback.core.rolling.RollingFileAppender">
- <file>/var/log/storm/access.log</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
- <fileNamePattern>/var/log/storm/access.log.%i</fileNamePattern>
- <minIndex>1</minIndex>
- <maxIndex>9</maxIndex>
- </rollingPolicy>
-
- <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
- <maxFileSize>100MB</maxFileSize>
- </triggeringPolicy>
-
- <encoder>
- <pattern>${pattern}</pattern>
- </encoder>
- </appender>
-
- <appender name="METRICS" class="ch.qos.logback.core.rolling.RollingFileAppender">
- <file>/var/log/storm/metrics.log</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
- <fileNamePattern>metrics.log.%i</fileNamePattern>
- <minIndex>1</minIndex>
- <maxIndex>9</maxIndex>
- </rollingPolicy>
-
- <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
- <maxFileSize>2MB</maxFileSize>
- </triggeringPolicy>
-
- <encoder>
- <pattern>${patternMetrics}</pattern>
- </encoder>
- </appender>
-
- <root level="INFO">
- <appender-ref ref="A1"/>
- </root>
-
- <logger name="org.apache.storm.messaging.netty">
- <level value="WARN" />
- <appender-ref ref="A1" />
- </logger>
-
- <logger name="org.apache.storm">
- <level value="DEBUG" />
- <appender-ref ref="A1" />
- </logger>
-
- <logger name="org.apache.storm.security.auth.authorizer" additivity="false">
- <level value="INFO" />
- <appender-ref ref="ACCESS" />
- </logger>
-
- <logger name="org.apache.storm.metric.LoggingMetricsConsumer" additivity="false">
- <level value="INFO"/>
- <appender-ref ref="METRICS"/>
- </logger>
-
-</configuration>
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/config/install-storm.sh
----------------------------------------------------------------------
diff --git a/integration-test/config/install-storm.sh b/integration-test/config/install-storm.sh
index b08e2bc..8316c75 100644
--- a/integration-test/config/install-storm.sh
+++ b/integration-test/config/install-storm.sh
@@ -31,7 +31,6 @@ chown -R storm:storm /etc/storm
rm /usr/share/storm/conf/storm.yaml
cp "${SCRIPT_DIR}/storm.yaml" /usr/share/storm/conf/
-cp "${SCRIPT_DIR}/cluster.xml" /usr/share/storm/logback/
ln -s /usr/share/storm/conf/storm.yaml /etc/storm/conf/storm.yaml
mkdir /var/log/storm
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/config/install-zookeeper.sh
----------------------------------------------------------------------
diff --git a/integration-test/config/install-zookeeper.sh b/integration-test/config/install-zookeeper.sh
index a81a07c..98253d7 100644
--- a/integration-test/config/install-zookeeper.sh
+++ b/integration-test/config/install-zookeeper.sh
@@ -14,7 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-apt-get --yes install zookeeper=3.3.5* zookeeperd=3.3.5*
+# $1 is the Zookeeper version to install
+apt-get --yes install zookeeper=$1 zookeeperd=$1
service zookeeper stop
echo maxClientCnxns=200 >> /etc/zookeeper/conf/zoo.cfg
service zookeeper start
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index 7ec9584..8197c72 100755
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -78,7 +78,7 @@
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
- <version>2.3.1</version>
+ <version>2.8.0</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/run-it.sh
----------------------------------------------------------------------
diff --git a/integration-test/run-it.sh b/integration-test/run-it.sh
index 3194a92..d57a4d6 100755
--- a/integration-test/run-it.sh
+++ b/integration-test/run-it.sh
@@ -32,11 +32,11 @@ function list_storm_processes() {
list_storm_processes || true
# increasing swap space so we can run lots of workers
-sudo dd if=/dev/zero of=/swapfile.img bs=8192 count=1M
+sudo dd if=/dev/zero of=/swapfile.img bs=4096 count=1M
sudo mkswap /swapfile.img
sudo swapon /swapfile.img
-if [[ "${USER}" == "vagrant" ]]; then # install oracle jdk8
+if [[ "${USER}" == "ubuntu" ]]; then # install oracle jdk8
sudo apt-get update
sudo apt-get -y install python-software-properties
sudo apt-add-repository -y ppa:webupd8team/java
@@ -44,16 +44,18 @@ if [[ "${USER}" == "vagrant" ]]; then # install oracle jdk8
echo "oracle-java8-installer shared/accepted-oracle-license-v1-1 select true" | sudo debconf-set-selections
sudo apt-get install -y oracle-java8-installer
sudo apt-get -y install maven
+ sudo apt-get install unzip
java -version
mvn --version
export MAVEN_OPTS="-Xmx3000m"
+ zookeeper_version=3.4.8*
else
( while true; do echo "heartbeat"; sleep 300; done ) & #heartbeat needed by travis ci
- (cd "${STORM_SRC_DIR}" && mvn clean install -DskipTests=true) || die "maven install command failed"
if [[ "${USER}" == "travis" ]]; then
( cd "${STORM_SRC_DIR}/storm-dist/binary" && mvn clean package -Dgpg.skip=true )
fi
(( $(find "${STORM_SRC_DIR}/storm-dist/binary" -iname 'apache-storm*.zip' | wc -l) == 1 )) || die "expected exactly one zip file, did you run: cd ${STORM_SRC_DIR}/storm-dist/binary && mvn clean package -Dgpg.skip=true"
+ zookeeper_version=3.3.5*
fi
storm_binary_zip=$(find "${STORM_SRC_DIR}/storm-dist" -iname '*.zip')
@@ -64,7 +66,7 @@ echo "Using storm version:" ${STORM_VERSION}
# setup storm cluster
list_storm_processes || true
sudo bash "${SCRIPT_DIR}/config/common.sh"
-sudo bash "${SCRIPT_DIR}/config/install-zookeeper.sh"
+sudo bash "${SCRIPT_DIR}/config/install-zookeeper.sh" "$zookeeper_version"
sudo bash "${SCRIPT_DIR}/config/install-storm.sh" "$storm_binary_zip"
export JAVA_HOME="${JAVA_HOME}"
env
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java
index be00ffc..7a9163d 100644
--- a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java
+++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java
@@ -114,7 +114,9 @@ public class SlidingTimeCorrectness implements TestableTopology {
@Override
public void nextTuple() {
- TimeUtil.sleepMilliSec(rng.nextInt(800));
+ //Emitting too quickly can lead to spurious test failures because the worker log may roll right before we read it
+ //Sleep a bit between emits
+ TimeUtil.sleepMilliSec(rng.nextInt(100));
currentNum++;
TimeData data = TimeData.newData(currentNum);
final Values tuple = data.getValues();
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java
index 5d1b53b..a52ae7c 100644
--- a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java
+++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java
@@ -39,7 +39,6 @@ import org.apache.storm.st.utils.TimeUtil;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.TimeUnit;
/**
* Computes sliding window sum
@@ -74,7 +73,6 @@ public class SlidingWindowCorrectness implements TestableTopology {
builder.setSpout(getSpoutName(), new IncrementingSpout(), 1);
builder.setBolt(getBoltName(),
new VerificationBolt()
- .withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))
.withWindow(new BaseWindowedBolt.Count(windowSize), new BaseWindowedBolt.Count(slideSize)),
1)
.shuffleGrouping(getSpoutName());
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
index d7fbbca..64d7441 100644
--- a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
+++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
@@ -111,7 +111,9 @@ public class TumblingTimeCorrectness implements TestableTopology {
@Override
public void nextTuple() {
- TimeUtil.sleepMilliSec(rng.nextInt(800));
+ //Emitting too quickly can lead to spurious test failures because the worker log may roll right before we read it
+ //Sleep a bit between emits
+ TimeUtil.sleepMilliSec(rng.nextInt(100));
currentNum++;
TimeData data = TimeData.newData(currentNum);
final Values tuple = data.getValues();
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
index 110c982..05351df 100644
--- a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
+++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
@@ -39,7 +39,6 @@ import org.apache.storm.st.utils.TimeUtil;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.TimeUnit;
/**
* Computes sliding window sum
@@ -72,7 +71,6 @@ public class TumblingWindowCorrectness implements TestableTopology {
builder.setSpout(getSpoutName(), new IncrementingSpout(), 1);
builder.setBolt(getBoltName(),
new VerificationBolt()
- .withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))
.withTumblingWindow(new BaseWindowedBolt.Count(tumbleSize)), 1)
.shuffleGrouping(getSpoutName());
return builder.createTopology();
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java b/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java
index 34c2b65..6901212 100644
--- a/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java
+++ b/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java
@@ -17,6 +17,7 @@
package org.apache.storm.st.utils;
+import java.nio.charset.StandardCharsets;
import org.apache.commons.lang.StringUtils;
public class StringDecorator {
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
----------------------------------------------------------------------
diff --git a/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
index 5cef9fa..3de1a7d 100644
--- a/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
+++ b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
@@ -17,6 +17,7 @@
package org.apache.storm.st.tests.window;
+import java.io.IOException;
import org.apache.storm.st.helper.AbstractTest;
import org.apache.storm.st.wrapper.LogData;
import org.apache.storm.st.wrapper.TopoWrap;
@@ -79,21 +80,25 @@ public final class SlidingWindowTest extends AbstractTest {
runAndVerifyCount(windowSize, slideSize, testable, topo);
}
- static void runAndVerifyCount(int windowSize, int slideSize, TestableTopology testable, TopoWrap topo) throws TException, MalformedURLException {
+ static void runAndVerifyCount(int windowSize, int slideSize, TestableTopology testable, TopoWrap topo) throws IOException, TException, MalformedURLException {
topo.submitSuccessfully();
- final int minSpoutEmits = 1000 + windowSize;
final int minBoltEmits = 5;
+ //Sliding windows should produce one window every slideSize tuples
+ //Wait for the spout to emit at least enough tuples to get minBoltEmit windows and at least one full window
+ final int minSpoutEmits = Math.max(windowSize, minBoltEmits * slideSize);
+
String boltName = testable.getBoltName();
String spoutName = testable.getSpoutName();
- topo.waitForProgress(minSpoutEmits, spoutName, 180);
- topo.waitForProgress(minBoltEmits, boltName, 180);
+ //Waiting for spout tuples isn't strictly necessary since we also wait for bolt emits, but do it anyway
+ topo.assertProgress(minSpoutEmits, spoutName, 180);
+ topo.assertProgress(minBoltEmits, boltName, 180);
List<TopoWrap.ExecutorURL> boltUrls = topo.getLogUrls(boltName);
log.info(boltUrls.toString());
final List<LogData> allBoltData = topo.getLogData(boltName);
final List<LogData> allSpoutData = topo.getLogData(spoutName);
Assert.assertTrue(allBoltData.size() >= minBoltEmits,
"Expecting min " + minBoltEmits + " bolt emits, found: " + allBoltData.size() + " \n\t" + allBoltData);
- final int numberOfWindows = allBoltData.size() - windowSize / slideSize;
+ final int numberOfWindows = allBoltData.size();
for(int i = 0; i < numberOfWindows; ++i ) {
log.info("Comparing window: " + (i + 1) + " of " + numberOfWindows);
final int toIndex = (i + 1) * slideSize;
@@ -143,28 +148,29 @@ public final class SlidingWindowTest extends AbstractTest {
runAndVerifyTime(windowSec, slideSec, testable, topo);
}
- static void runAndVerifyTime(int windowSec, int slideSec, TestableTopology testable, TopoWrap topo) throws TException, java.net.MalformedURLException {
+ static void runAndVerifyTime(int windowSec, int slideSec, TestableTopology testable, TopoWrap topo) throws IOException, TException, java.net.MalformedURLException {
topo.submitSuccessfully();
- final int minSpoutEmits = 1000 + windowSec;
+ final int minSpoutEmits = 100;
final int minBoltEmits = 5;
String boltName = testable.getBoltName();
String spoutName = testable.getSpoutName();
- topo.waitForProgress(minSpoutEmits, spoutName, 60 + 10 * (windowSec + slideSec));
- topo.waitForProgress(minBoltEmits, boltName, 60 + 10 * (windowSec + slideSec));
- final List<TimeData> allSpoutData = topo.getLogData(spoutName, TimeData.CLS);
- final List<LogData> allBoltLog = topo.getLogData(boltName);
- final List<TimeDataWindow> allBoltData = topo.getLogData(boltName, TimeDataWindow.CLS);
- Assert.assertTrue(allBoltLog.size() >= minBoltEmits,
- "Expecting min " + minBoltEmits + " bolt emits, found: " + allBoltLog.size() + " \n\t" + allBoltLog);
- final DateTime firstEndTime = TimeUtil.ceil(new DateTime(allSpoutData.get(0).getDate()).withZone(DateTimeZone.UTC), slideSec);
- final int numberOfWindows = allBoltLog.size() - windowSec / slideSec;
+ //Waiting for spout tuples isn't strictly necessary since we also wait for bolt emits, but do it anyway
+ topo.assertProgress(minSpoutEmits, spoutName, 60 + 10 * (windowSec + slideSec));
+ topo.assertProgress(minBoltEmits, boltName, 60 + 10 * (windowSec + slideSec));
+ final List<TimeData> allSpoutDataDeserialized = topo.getLogData(spoutName, TimeData.CLS);
+ final List<LogData> allBoltData = topo.getLogData(boltName);
+ final List<TimeDataWindow> allBoltDataDeserialized = topo.deserializeLogData(allBoltData, TimeDataWindow.CLS);
+ Assert.assertTrue(allBoltData.size() >= minBoltEmits,
+ "Expecting min " + minBoltEmits + " bolt emits, found: " + allBoltData.size() + " \n\t" + allBoltData);
+ final DateTime firstEndTime = TimeUtil.ceil(new DateTime(allSpoutDataDeserialized.get(0).getDate()).withZone(DateTimeZone.UTC), slideSec);
+ final int numberOfWindows = allBoltData.size();
for(int i = 0; i < numberOfWindows; ++i ) {
final DateTime toDate = firstEndTime.plusSeconds(i * slideSec);
final DateTime fromDate = toDate.minusSeconds(windowSec);
log.info("Comparing window: " + fromDate + " to " + toDate + " iter " + (i+1) + "/" + numberOfWindows);
- final TimeDataWindow computedWindow = TimeDataWindow.newInstance(allSpoutData,fromDate, toDate);
- final LogData oneBoltLog = allBoltLog.get(i);
- final TimeDataWindow actualWindow = allBoltData.get(i);
+ final TimeDataWindow computedWindow = TimeDataWindow.newInstance(allSpoutDataDeserialized,fromDate, toDate);
+ final LogData oneBoltLog = allBoltData.get(i);
+ final TimeDataWindow actualWindow = allBoltDataDeserialized.get(i);
log.info("Actual window: " + actualWindow.getDescription());
log.info("Computed window: " + computedWindow.getDescription());
for (TimeData oneLog : computedWindow) {
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
----------------------------------------------------------------------
diff --git a/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
index ac6d0c7..af1e1be 100644
--- a/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
+++ b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
@@ -53,6 +53,7 @@ import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Arrays;
@@ -65,6 +66,8 @@ import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
public class TopoWrap {
private static Logger log = LoggerFactory.getLogger(TopoWrap.class);
@@ -97,6 +100,9 @@ public class TopoWrap {
submitConf.put("storm.zookeeper.topology.auth.scheme", "digest");
submitConf.put("topology.workers", 3);
submitConf.put("topology.debug", true);
+ //Set the metrics sample rate to 1 to force update the executor stats every time something happens
+ //This is necessary because getAllTimeEmittedCount relies on the executor emit stats to be accurate
+ submitConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 1);
return submitConf;
}
@@ -180,7 +186,7 @@ public class TopoWrap {
Map<String, Long> allTime = emitted.get(since);
if (allTime == null)
return 0L;
- return allTime.get("default");
+ return allTime.get(Utils.DEFAULT_STREAM_ID);
}
});
return sum(ackCounts).longValue();
@@ -213,7 +219,7 @@ public class TopoWrap {
log.info(getInfo().toString());
long emitCount = getAllTimeEmittedCount(componentName);
log.info("Count for component " + componentName + " is " + emitCount);
- if (emitCount > minEmits) {
+ if (emitCount >= minEmits) {
break;
}
TimeUtil.sleepSec(10);
@@ -280,8 +286,13 @@ public class TopoWrap {
}
}
- public <T extends FromJson<T>> List<T> getLogData(final String componentId, final FromJson<T> cls) throws TException, MalformedURLException {
+ public <T extends FromJson<T>> List<T> getLogData(final String componentId, final FromJson<T> cls)
+ throws IOException, TException, MalformedURLException {
final List<LogData> logData = getLogData(componentId);
+ return deserializeLogData(logData, cls);
+ }
+
+ public <T extends FromJson<T>> List<T> deserializeLogData(final List<LogData> logData, final FromJson<T> cls) {
final List<T> data = new ArrayList<>(
Collections2.transform(logData, new Function<LogData, T>() {
@Nullable
@@ -294,7 +305,7 @@ public class TopoWrap {
return data;
}
- public List<LogData> getLogData(final String componentId) throws TException, MalformedURLException {
+ public List<LogData> getLogData(final String componentId) throws IOException, TException, MalformedURLException {
final String logs = getLogs(componentId);
final String dateRegex = "\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3}";
Pattern pattern = Pattern.compile("(?=\\n" + dateRegex + ")");
@@ -318,48 +329,38 @@ public class TopoWrap {
return sortedLogs;
}
- public String getLogs(final String componentId) throws TException, MalformedURLException {
+ public String getLogs(final String componentId) throws IOException, TException, MalformedURLException {
log.info("Fetching logs for componentId = " + componentId);
List<ExecutorURL> exclaim2Urls = getLogUrls(componentId);
log.info("Found " + exclaim2Urls.size() + " urls: " + exclaim2Urls.toString());
- Collection<String> urlOuputs = Collections2.transform(exclaim2Urls, new Function<ExecutorURL, String>() {
- @Nullable
- @Override
- public String apply(@Nullable ExecutorURL executorURL) {
- if (executorURL == null || executorURL.getDownloadUrl() == null) {
- return "";
- }
- String warnMessage = "Couldn't fetch executorURL: " + executorURL;
+ List<String> urlContents = new ArrayList<>();
+ for(ExecutorURL executorUrl : exclaim2Urls) {
+ if(executorUrl == null || executorUrl.getDownloadUrl() == null) {
+ continue;
+ }
+ log.info("Fetching: " + executorUrl);
+ URL downloadUrl = executorUrl.downloadUrl;
+ String urlContent = IOUtils.toString(downloadUrl, StandardCharsets.UTF_8);
+ urlContents.add(urlContent);
+ if (urlContent.length() < 500) {
+ log.info("Fetched: " + urlContent);
+ } else {
+ log.info("Fetched: " + NumberFormat.getNumberInstance(Locale.US).format(urlContent.length()) + " bytes.");
+ }
+ if (System.getProperty("regression.downloadWorkerLogs").equalsIgnoreCase("true")) {
+ final String userDir = System.getProperty("user.dir");
+ final File target = new File(userDir, "target");
+ final File logDir = new File(target, "logs");
+ final File logFile = new File(logDir, downloadUrl.getHost() + "-" + downloadUrl.getFile().split("/")[2]);
try {
- log.info("Fetching: " + executorURL);
- final URL downloadUrl = executorURL.downloadUrl;
- final String urlContent = IOUtils.toString(downloadUrl);
- if (urlContent.length() < 500) {
- log.info("Fetched: " + urlContent);
- } else {
- log.info("Fetched: " + NumberFormat.getNumberInstance(Locale.US).format(urlContent.length()) + " bytes.");
- }
- if (System.getProperty("regression.downloadWorkerLogs").equalsIgnoreCase("true")) {
- final String userDir = System.getProperty("user.dir");
- final File target = new File(userDir, "target");
- final File logDir = new File(target, "logs");
- final File logFile = new File(logDir, downloadUrl.getHost() + "-" + downloadUrl.getFile().split("/")[2]);
- try {
- FileUtils.forceMkdir(logDir);
- FileUtils.write(logFile, urlContent);
- } catch (Throwable throwable) {
- log.info("Caught exteption: " + ExceptionUtils.getFullStackTrace(throwable));
- }
- }
- return urlContent;
- } catch (IOException e) {
- log.warn(warnMessage);
+ FileUtils.forceMkdir(logDir);
+ FileUtils.write(logFile, urlContent, StandardCharsets.UTF_8);
+ } catch (Throwable throwable) {
+ log.info("Caught exception: " + ExceptionUtils.getFullStackTrace(throwable));
}
- String stars = StringUtils.repeat("*", 30);
- return stars + " " + warnMessage + " " + stars;
}
- });
- return StringUtils.join(urlOuputs, '\n');
+ }
+ return StringUtils.join(urlContents, '\n');
}
private Number sum(Collection<? extends Number> nums) {
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bdcc716..f96cfd5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -284,6 +284,7 @@
<junit.version>4.11</junit.version>
<metrics-clojure.version>2.5.1</metrics-clojure.version>
<hdrhistogram.version>2.1.7</hdrhistogram.version>
+ <hamcrest.version>1.3</hamcrest.version>
<calcite.version>1.11.0</calcite.version>
@@ -619,7 +620,7 @@
<artifactId>jersey-container-servlet-core</artifactId>
<version>${jersey.version}</version>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-jetty-http</artifactId>
<version>${jersey.version}</version>
@@ -959,6 +960,24 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ <version>${hamcrest.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <version>${hamcrest.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
@@ -1097,7 +1116,7 @@
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<!-- If you change this, you should also update the storm_checkstyle.xml file to be
- based on the google_checks.xml from the version of checkstyle you are choosing. -->
+ based on the google_checks.xml from the version of checkstyle you are choosing. -->
<version>7.7</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/pom.xml
----------------------------------------------------------------------
diff --git a/storm-client/pom.xml b/storm-client/pom.xml
index 30d7685..538b7b9 100644
--- a/storm-client/pom.xml
+++ b/storm-client/pom.xml
@@ -187,7 +187,12 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
index 2ada9bf..56c329e 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
@@ -199,8 +199,7 @@ public class WindowedBoltExecutor implements IRichBolt {
// validate
validate(topoConf, windowLengthCount, windowLengthDuration,
slidingIntervalCount, slidingIntervalDuration);
- evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration,
- manager);
+ evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration);
triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration,
manager, evictionPolicy);
manager.setEvictionPolicy(evictionPolicy);
@@ -251,8 +250,7 @@ public class WindowedBoltExecutor implements IRichBolt {
}
}
- private EvictionPolicy<Tuple> getEvictionPolicy(Count windowLengthCount, Duration windowLengthDuration,
- WindowManager<Tuple> manager) {
+ private EvictionPolicy<Tuple> getEvictionPolicy(Count windowLengthCount, Duration windowLengthDuration) {
if (windowLengthCount != null) {
if (isTupleTs()) {
return new WatermarkCountEvictionPolicy<>(windowLengthCount.value);
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java b/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java
index 37dcfd9..ee5fdb9 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java
@@ -38,7 +38,7 @@ public interface EvictionContext {
Long getSlidingCount();
/**
- * Returns the current count of events in the queue up to the reference tim
+ * Returns the current count of events in the queue up to the reference time
* based on which count based evictions can be performed.
*
* @return the current count
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
index 774d0a3..fa44444 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
@@ -27,7 +27,7 @@ public interface EvictionPolicy<T> {
/**
* The action to be taken when {@link EvictionPolicy#evict(Event)} is invoked.
*/
- enum Action {
+ public enum Action {
/**
* expire the event and remove it from the queue
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
index 802e6bb..570b057 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
@@ -38,7 +38,7 @@ public class TimeEvictionPolicy<T> implements EvictionPolicy<T> {
* {@inheritDoc}
*/
@Override
- public Action evict(Event<T> event) {
+ public Action evict(Event<T> event) {
long now = evictionContext == null ? System.currentTimeMillis() : evictionContext.getReferenceTime();
long diff = now - event.getTimestamp();
if (diff >= windowLength) {
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
index b057afb..6b6d9fa 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
@@ -114,9 +114,7 @@ public class TimeTriggerPolicy<T> implements TriggerPolicy<T> {
* set the current timestamp as the reference time for the eviction policy
* to evict the events
*/
- if (evictionPolicy != null) {
- evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
- }
+ evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
handler.onTrigger();
} catch (Throwable th) {
LOG.error("handler.onTrigger failed ", th);
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java b/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
index e1df72c..ef81d6e 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
@@ -77,7 +77,7 @@ public class WaterMarkEventGenerator<T> implements Runnable {
try {
long waterMarkTs = computeWaterMarkTs();
if (waterMarkTs > lastWaterMarkTs) {
- this.windowManager.add(new WaterMarkEvent<T>(waterMarkTs));
+ this.windowManager.add(new WaterMarkEvent<>(waterMarkTs));
lastWaterMarkTs = waterMarkTs;
}
} catch (Throwable th) {
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
index c5d7b49..0fe6f75 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
@@ -19,7 +19,7 @@ package org.apache.storm.windowing;
/**
* An eviction policy that tracks count based on watermark ts and
- * evicts events upto the watermark based on a threshold count.
+ * evicts events up to the watermark based on a threshold count.
*
* @param <T> the type of event tracked by this policy.
*/
@@ -32,6 +32,13 @@ public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> {
@Override
public Action evict(Event<T> event) {
+ if(getContext() == null) {
+ //It is possible to get asked about eviction before we have a context, due to WindowManager.compactWindow.
+ //In this case we should hold on to all the events. When the first watermark is received, the context will be set,
+ //and the events will be reevaluated for eviction
+ return Action.STOP;
+ }
+
Action action;
if (event.getTimestamp() <= getContext().getReferenceTime() && processed < currentCount.get()) {
action = super.evict(event);
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java
index e5ecba4..fdb3917 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java
@@ -57,7 +57,14 @@ public class WatermarkTimeEvictionPolicy<T> extends TimeEvictionPolicy<T> {
*/
@Override
public Action evict(Event<T> event) {
- long referenceTime = evictionContext.getReferenceTime() != null ? evictionContext.getReferenceTime() : 0L;
+ if(evictionContext == null) {
+ //It is possible to get asked about eviction before we have a context, due to WindowManager.compactWindow.
+ //In this case we should hold on to all the events. When the first watermark is received, the context will be set,
+ //and the events will be reevaluated for eviction
+ return Action.STOP;
+ }
+
+ long referenceTime = evictionContext.getReferenceTime();
long diff = referenceTime - event.getTimestamp();
if (diff < -lag) {
return Action.STOP;
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java b/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java
index 8021ba8..f6cc521 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java
@@ -46,6 +46,9 @@ public class WindowManager<T> implements TriggerHandler {
/**
* Expire old events every EXPIRE_EVENTS_THRESHOLD to
* keep the window size in check.
+ *
+ * Note that if the eviction policy is based on watermarks, events will not be evicted until a new
+ * watermark would cause them to be considered expired anyway, regardless of this limit
*/
public static final int EXPIRE_EVENTS_THRESHOLD = 100;
http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/windowing/WindowManagerTest.java b/storm-client/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
index 6c170c6..178c1bb 100644
--- a/storm-client/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
+++ b/storm-client/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
@@ -29,9 +29,12 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.empty;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
@@ -155,7 +158,48 @@ public class WindowManagerTest {
// window should be compacted and events should be expired.
assertEquals(seq(1, threshold - windowLength), listener.onExpiryEvents);
}
-
+
+ private void testEvictBeforeWatermarkForWatermarkEvictionPolicy(EvictionPolicy watermarkEvictionPolicy, int windowLength) throws Exception {
+ /**
+ * The watermark eviction policy must not evict tuples until the first watermark has been received.
+ * The policies can't make a meaningful decision prior to the first watermark, so the safe decision
+ * is to postpone eviction.
+ */
+ int threshold = WindowManager.EXPIRE_EVENTS_THRESHOLD;
+ windowManager.setEvictionPolicy(watermarkEvictionPolicy);
+ WatermarkCountTriggerPolicy triggerPolicy = new WatermarkCountTriggerPolicy(windowLength, windowManager,
+ watermarkEvictionPolicy, windowManager);
+ triggerPolicy.start();
+ windowManager.setTriggerPolicy(triggerPolicy);
+ for (int i : seq(1, threshold)) {
+ windowManager.add(i, i);
+ }
+ assertThat("The watermark eviction policies should never evict events before the first watermark is received", listener.onExpiryEvents, is(empty()));
+ windowManager.add(new WaterMarkEvent<>(threshold));
+ // The events should be put in a window when the first watermark is received
+ assertEquals(seq(1, threshold), listener.onActivationEvents);
+ //Now add some more events and a new watermark, and check that the previous events are expired
+ for(int i : seq(threshold+1, threshold*2)) {
+ windowManager.add(i, i);
+ }
+ windowManager.add(new WaterMarkEvent<>(threshold + windowLength+1));
+ //All the events should be expired when the next watermark is received
+ assertThat("All the events should be expired after the second watermark", listener.onExpiryEvents, equalTo(seq(1, threshold)));
+ }
+
+ @Test
+ public void testExpireThresholdWithWatermarkCountEvictionPolicy() throws Exception {
+ int windowLength = WindowManager.EXPIRE_EVENTS_THRESHOLD;
+ EvictionPolicy watermarkCountEvictionPolicy = new WatermarkCountEvictionPolicy(windowLength);
+ testEvictBeforeWatermarkForWatermarkEvictionPolicy(watermarkCountEvictionPolicy, windowLength);
+ }
+
+ @Test
+ public void testExpireThresholdWithWatermarkTimeEvictionPolicy() throws Exception {
+ int windowLength = WindowManager.EXPIRE_EVENTS_THRESHOLD;
+ EvictionPolicy watermarkTimeEvictionPolicy = new WatermarkTimeEvictionPolicy(windowLength);
+ testEvictBeforeWatermarkForWatermarkEvictionPolicy(watermarkTimeEvictionPolicy, windowLength);
+ }
@Test
public void testTimeBasedWindow() throws Exception {
[2/3] storm git commit: Merge branch 'STORM-2525' of
https://github.com/srdo/storm into STORM-2525
Posted by bo...@apache.org.
Merge branch 'STORM-2525' of https://github.com/srdo/storm into STORM-2525
STORM-2525: Fix flaky integration tests
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c2bd9f09
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c2bd9f09
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c2bd9f09
Branch: refs/heads/master
Commit: c2bd9f09aadb42c56960aae032de2ea289e83f32
Parents: d67aaf3 1db8674
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue May 23 12:57:10 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue May 23 12:57:10 2017 -0500
----------------------------------------------------------------------
integration-test/config/Vagrantfile | 8 +-
integration-test/config/cluster.xml | 101 -------------------
integration-test/config/install-storm.sh | 1 -
integration-test/config/install-zookeeper.sh | 3 +-
integration-test/pom.xml | 2 +-
integration-test/run-it.sh | 10 +-
.../topology/window/SlidingTimeCorrectness.java | 4 +-
.../window/SlidingWindowCorrectness.java | 2 -
.../window/TumblingTimeCorrectness.java | 4 +-
.../window/TumblingWindowCorrectness.java | 2 -
.../apache/storm/st/utils/StringDecorator.java | 1 +
.../st/tests/window/SlidingWindowTest.java | 44 ++++----
.../org/apache/storm/st/wrapper/TopoWrap.java | 81 +++++++--------
pom.xml | 23 ++++-
storm-client/pom.xml | 7 +-
.../storm/topology/WindowedBoltExecutor.java | 6 +-
.../apache/storm/windowing/EvictionContext.java | 2 +-
.../apache/storm/windowing/EvictionPolicy.java | 2 +-
.../storm/windowing/TimeEvictionPolicy.java | 2 +-
.../storm/windowing/TimeTriggerPolicy.java | 4 +-
.../windowing/WaterMarkEventGenerator.java | 2 +-
.../windowing/WatermarkCountEvictionPolicy.java | 9 +-
.../windowing/WatermarkTimeEvictionPolicy.java | 9 +-
.../apache/storm/windowing/WindowManager.java | 3 +
.../storm/windowing/WindowManagerTest.java | 48 ++++++++-
25 files changed, 185 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: Added STORM-2525 to Changelog
Posted by bo...@apache.org.
Added STORM-2525 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1a380ede
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1a380ede
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1a380ede
Branch: refs/heads/master
Commit: 1a380ede3fb3e23db95790703c0a5a09c9052222
Parents: c2bd9f0
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue May 23 12:57:32 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue May 23 12:57:32 2017 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1a380ede/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ea8c54a..ce93911 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-2525: Fix flaky integration tests
* STORM-2526: Revert changes mistakenly made to generated files
* STORM-2524: Add the option to set client.id to storm-kafka
* STORM-2448: Add in Storm and JDK versions when submitting a topology.