You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/05/23 18:16:03 UTC

[1/3] storm git commit: STORM-2525: Fix flaky integration tests

Repository: storm
Updated Branches:
  refs/heads/master d67aaf344 -> 1a380ede3


STORM-2525: Fix flaky integration tests


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1db86744
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1db86744
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1db86744

Branch: refs/heads/master
Commit: 1db867449c2e2dd8b17cbe464f5ac9ac972f8ff7
Parents: 2a0c168
Author: Stig Rohde Døssing <st...@gmail.com>
Authored: Sat May 20 19:46:03 2017 +0200
Committer: Stig Rohde Døssing <st...@gmail.com>
Committed: Sun May 21 00:14:50 2017 +0200

----------------------------------------------------------------------
 integration-test/config/Vagrantfile             |   8 +-
 integration-test/config/cluster.xml             | 101 -------------------
 integration-test/config/install-storm.sh        |   1 -
 integration-test/config/install-zookeeper.sh    |   3 +-
 integration-test/pom.xml                        |   2 +-
 integration-test/run-it.sh                      |  10 +-
 .../topology/window/SlidingTimeCorrectness.java |   4 +-
 .../window/SlidingWindowCorrectness.java        |   2 -
 .../window/TumblingTimeCorrectness.java         |   4 +-
 .../window/TumblingWindowCorrectness.java       |   2 -
 .../apache/storm/st/utils/StringDecorator.java  |   1 +
 .../st/tests/window/SlidingWindowTest.java      |  44 ++++----
 .../org/apache/storm/st/wrapper/TopoWrap.java   |  81 +++++++--------
 pom.xml                                         |  23 ++++-
 storm-client/pom.xml                            |   7 +-
 .../storm/topology/WindowedBoltExecutor.java    |   6 +-
 .../apache/storm/windowing/EvictionContext.java |   2 +-
 .../apache/storm/windowing/EvictionPolicy.java  |   2 +-
 .../storm/windowing/TimeEvictionPolicy.java     |   2 +-
 .../storm/windowing/TimeTriggerPolicy.java      |   4 +-
 .../windowing/WaterMarkEventGenerator.java      |   2 +-
 .../windowing/WatermarkCountEvictionPolicy.java |   9 +-
 .../windowing/WatermarkTimeEvictionPolicy.java  |   9 +-
 .../apache/storm/windowing/WindowManager.java   |   3 +
 .../storm/windowing/WindowManagerTest.java      |  48 ++++++++-
 25 files changed, 185 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/config/Vagrantfile
----------------------------------------------------------------------
diff --git a/integration-test/config/Vagrantfile b/integration-test/config/Vagrantfile
index 740d0b0..def461d 100644
--- a/integration-test/config/Vagrantfile
+++ b/integration-test/config/Vagrantfile
@@ -19,7 +19,7 @@
 require 'uri'
 # Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
 VAGRANTFILE_API_VERSION = "2"
-STORM_BOX_TYPE = "hashicorp/precise64"
+STORM_BOX_TYPE = "ubuntu/xenial64"
 STORM_ZIP = Dir.glob("../../storm-dist/binary/**/*.zip")
 if(STORM_ZIP.length != 1)
   raise "Expected one storm-binary found: " + STORM_ZIP.join(",") + ". Did you run : cd ${STORM_SRC_DIR}/storm-dist/binary && mvn clean package -Dgpg.skip=true"
@@ -53,8 +53,8 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
     }
   end
 
-  config.vm.synced_folder "../../", "/home/vagrant/build/vagrant/storm"
-  config.vm.synced_folder "~/.m2", "/home/vagrant/.m2"
+  config.vm.synced_folder "../../", "/home/ubuntu/build/vagrant/storm"
+  config.vm.synced_folder "~/.m2", "/home/ubuntu/.m2"
 
   config.vm.define "node1" do |node1|
     node1.vm.provider "virtualbox" do |v|
@@ -62,7 +62,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
     end
     node1.vm.network "private_network", ip: "192.168.50.3"
     node1.vm.hostname = "node1"
-    node1.vm.provision :shell, :inline => "echo run integration test; whoami; env; cd /home/vagrant/build/vagrant/storm/; pwd; bash integration-test/run-it.sh", privileged: false
+    node1.vm.provision :shell, :inline => "echo run integration test; whoami; env; cd /home/ubuntu/build/vagrant/storm/; pwd; bash integration-test/run-it.sh", privileged: false
     #node1.vm.provision :shell, :inline => "sudo ln -fs /vagrant/etc-hosts /etc/hosts"
   end
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/config/cluster.xml
----------------------------------------------------------------------
diff --git a/integration-test/config/cluster.xml b/integration-test/config/cluster.xml
deleted file mode 100644
index 97968e4..0000000
--- a/integration-test/config/cluster.xml
+++ /dev/null
@@ -1,101 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration scan="true" scanPeriod="60 seconds">
-    <properties>
-        <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n</property>
-        <property name="patternMetrics">%d %-8r %m%n</property>
-    </properties>
-
- <appender name="A1" class="ch.qos.logback.core.rolling.RollingFileAppender">
-    <file>/var/log/storm/${logfile.name}</file>
-    <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
-      <fileNamePattern>/var/log/storm/${logfile.name}.%i</fileNamePattern>
-      <minIndex>1</minIndex>
-      <maxIndex>9</maxIndex>
-    </rollingPolicy>
-
-    <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
-      <maxFileSize>100MB</maxFileSize>
-    </triggeringPolicy>
-
-    <encoder>
-            <pattern>${pattern}</pattern>
-    </encoder>
- </appender>
-
- <appender name="ACCESS" class="ch.qos.logback.core.rolling.RollingFileAppender">
-    <file>/var/log/storm/access.log</file>
-    <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
-      <fileNamePattern>/var/log/storm/access.log.%i</fileNamePattern>
-      <minIndex>1</minIndex>
-      <maxIndex>9</maxIndex>
-    </rollingPolicy>
-
-    <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
-      <maxFileSize>100MB</maxFileSize>
-    </triggeringPolicy>
-
-    <encoder>
-            <pattern>${pattern}</pattern>
-    </encoder>
-  </appender>
-
-  <appender name="METRICS" class="ch.qos.logback.core.rolling.RollingFileAppender">
-    <file>/var/log/storm/metrics.log</file>
-    <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
-      <fileNamePattern>metrics.log.%i</fileNamePattern>
-      <minIndex>1</minIndex>
-      <maxIndex>9</maxIndex>
-    </rollingPolicy>
-
-    <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
-      <maxFileSize>2MB</maxFileSize>
-    </triggeringPolicy>
-
-    <encoder>
-            <pattern>${patternMetrics}</pattern>
-    </encoder>
-  </appender>
-
-  <root level="INFO">
-    <appender-ref ref="A1"/>
-  </root>
-  
-    <logger name="org.apache.storm.messaging.netty">
-    <level value="WARN" />
-    <appender-ref ref="A1" />
-  </logger>
-  
-    <logger name="org.apache.storm">
-    <level value="DEBUG" />
-    <appender-ref ref="A1" />
-  </logger>
-
-    <logger name="org.apache.storm.security.auth.authorizer" additivity="false">
-    <level value="INFO" />
-    <appender-ref ref="ACCESS" />
-  </logger>
-
-    <logger name="org.apache.storm.metric.LoggingMetricsConsumer" additivity="false">
-    <level value="INFO"/>
-    <appender-ref ref="METRICS"/>
-  </logger>
-
-</configuration>

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/config/install-storm.sh
----------------------------------------------------------------------
diff --git a/integration-test/config/install-storm.sh b/integration-test/config/install-storm.sh
index b08e2bc..8316c75 100644
--- a/integration-test/config/install-storm.sh
+++ b/integration-test/config/install-storm.sh
@@ -31,7 +31,6 @@ chown -R storm:storm /etc/storm
 
 rm /usr/share/storm/conf/storm.yaml
 cp "${SCRIPT_DIR}/storm.yaml" /usr/share/storm/conf/
-cp "${SCRIPT_DIR}/cluster.xml" /usr/share/storm/logback/
 ln -s /usr/share/storm/conf/storm.yaml /etc/storm/conf/storm.yaml
 
 mkdir /var/log/storm

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/config/install-zookeeper.sh
----------------------------------------------------------------------
diff --git a/integration-test/config/install-zookeeper.sh b/integration-test/config/install-zookeeper.sh
index a81a07c..98253d7 100644
--- a/integration-test/config/install-zookeeper.sh
+++ b/integration-test/config/install-zookeeper.sh
@@ -14,7 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-apt-get --yes install zookeeper=3.3.5* zookeeperd=3.3.5*
+# $1 is the Zookeeper version to install
+apt-get --yes install zookeeper=$1 zookeeperd=$1
 service zookeeper stop
 echo maxClientCnxns=200 >> /etc/zookeeper/conf/zoo.cfg
 service zookeeper start

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index 7ec9584..8197c72 100755
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -78,7 +78,7 @@
         <dependency>
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
-            <version>2.3.1</version>
+            <version>2.8.0</version>
         </dependency>
         <dependency>
             <groupId>com.google.code.findbugs</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/run-it.sh
----------------------------------------------------------------------
diff --git a/integration-test/run-it.sh b/integration-test/run-it.sh
index 3194a92..d57a4d6 100755
--- a/integration-test/run-it.sh
+++ b/integration-test/run-it.sh
@@ -32,11 +32,11 @@ function list_storm_processes() {
 
 list_storm_processes || true
 # increasing swap space so we can run lots of workers
-sudo dd if=/dev/zero of=/swapfile.img bs=8192 count=1M
+sudo dd if=/dev/zero of=/swapfile.img bs=4096 count=1M
 sudo mkswap /swapfile.img
 sudo swapon /swapfile.img
 
-if [[ "${USER}" == "vagrant" ]]; then # install oracle jdk8
+if [[ "${USER}" == "ubuntu" ]]; then # install oracle jdk8
     sudo apt-get update
     sudo apt-get -y install python-software-properties
     sudo apt-add-repository -y ppa:webupd8team/java
@@ -44,16 +44,18 @@ if [[ "${USER}" == "vagrant" ]]; then # install oracle jdk8
     echo "oracle-java8-installer shared/accepted-oracle-license-v1-1 select true" | sudo debconf-set-selections
     sudo apt-get install -y oracle-java8-installer
     sudo apt-get -y install maven
+    sudo apt-get install unzip
     java -version
     mvn --version
     export MAVEN_OPTS="-Xmx3000m"
+    zookeeper_version=3.4.8*
 else
     ( while true; do echo "heartbeat"; sleep 300; done ) & #heartbeat needed by travis ci
-    (cd "${STORM_SRC_DIR}" && mvn clean install -DskipTests=true) || die "maven install command failed"
     if [[ "${USER}" == "travis" ]]; then
         ( cd "${STORM_SRC_DIR}/storm-dist/binary" && mvn clean package -Dgpg.skip=true )
     fi
     (( $(find "${STORM_SRC_DIR}/storm-dist/binary" -iname 'apache-storm*.zip' | wc -l) == 1 )) || die "expected exactly one zip file, did you run: cd ${STORM_SRC_DIR}/storm-dist/binary && mvn clean package -Dgpg.skip=true"
+    zookeeper_version=3.3.5*
 fi
 
 storm_binary_zip=$(find "${STORM_SRC_DIR}/storm-dist" -iname '*.zip')
@@ -64,7 +66,7 @@ echo "Using storm version:" ${STORM_VERSION}
 # setup storm cluster
 list_storm_processes || true
 sudo bash "${SCRIPT_DIR}/config/common.sh"
-sudo bash "${SCRIPT_DIR}/config/install-zookeeper.sh"
+sudo bash "${SCRIPT_DIR}/config/install-zookeeper.sh" "$zookeeper_version"
 sudo bash "${SCRIPT_DIR}/config/install-storm.sh" "$storm_binary_zip"
 export JAVA_HOME="${JAVA_HOME}"
 env

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java
index be00ffc..7a9163d 100644
--- a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java
+++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java
@@ -114,7 +114,9 @@ public class SlidingTimeCorrectness implements TestableTopology {
 
         @Override
         public void nextTuple() {
-            TimeUtil.sleepMilliSec(rng.nextInt(800));
+            //Emitting too quickly can lead to spurious test failures because the worker log may roll right before we read it
+            //Sleep a bit between emits
+            TimeUtil.sleepMilliSec(rng.nextInt(100));
             currentNum++;
             TimeData data = TimeData.newData(currentNum);
             final Values tuple = data.getValues();

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java
index 5d1b53b..a52ae7c 100644
--- a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java
+++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java
@@ -39,7 +39,6 @@ import org.apache.storm.st.utils.TimeUtil;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Computes sliding window sum
@@ -74,7 +73,6 @@ public class SlidingWindowCorrectness implements TestableTopology {
         builder.setSpout(getSpoutName(), new IncrementingSpout(), 1);
         builder.setBolt(getBoltName(),
                 new VerificationBolt()
-                        .withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))
                         .withWindow(new BaseWindowedBolt.Count(windowSize), new BaseWindowedBolt.Count(slideSize)),
                 1)
                 .shuffleGrouping(getSpoutName());

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
index d7fbbca..64d7441 100644
--- a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
+++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java
@@ -111,7 +111,9 @@ public class TumblingTimeCorrectness implements TestableTopology {
 
         @Override
         public void nextTuple() {
-            TimeUtil.sleepMilliSec(rng.nextInt(800));
+            //Emitting too quickly can lead to spurious test failures because the worker log may roll right before we read it
+            //Sleep a bit between emits
+            TimeUtil.sleepMilliSec(rng.nextInt(100));
             currentNum++;
             TimeData data = TimeData.newData(currentNum);
             final Values tuple = data.getValues();

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
index 110c982..05351df 100644
--- a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
+++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java
@@ -39,7 +39,6 @@ import org.apache.storm.st.utils.TimeUtil;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Computes sliding window sum
@@ -72,7 +71,6 @@ public class TumblingWindowCorrectness implements TestableTopology {
         builder.setSpout(getSpoutName(), new IncrementingSpout(), 1);
         builder.setBolt(getBoltName(),
                 new VerificationBolt()
-                        .withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))
                         .withTumblingWindow(new BaseWindowedBolt.Count(tumbleSize)), 1)
                 .shuffleGrouping(getSpoutName());
         return builder.createTopology();

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java b/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java
index 34c2b65..6901212 100644
--- a/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java
+++ b/integration-test/src/main/java/org/apache/storm/st/utils/StringDecorator.java
@@ -17,6 +17,7 @@
 
 package org.apache.storm.st.utils;
 
+import java.nio.charset.StandardCharsets;
 import org.apache.commons.lang.StringUtils;
 
 public class StringDecorator {

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
----------------------------------------------------------------------
diff --git a/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
index 5cef9fa..3de1a7d 100644
--- a/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
+++ b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.storm.st.tests.window;
 
+import java.io.IOException;
 import org.apache.storm.st.helper.AbstractTest;
 import org.apache.storm.st.wrapper.LogData;
 import org.apache.storm.st.wrapper.TopoWrap;
@@ -79,21 +80,25 @@ public final class SlidingWindowTest extends AbstractTest {
         runAndVerifyCount(windowSize, slideSize, testable, topo);
     }
 
-    static void runAndVerifyCount(int windowSize, int slideSize, TestableTopology testable, TopoWrap topo) throws TException, MalformedURLException {
+    static void runAndVerifyCount(int windowSize, int slideSize, TestableTopology testable, TopoWrap topo) throws IOException, TException, MalformedURLException {
         topo.submitSuccessfully();
-        final int minSpoutEmits = 1000 + windowSize;
         final int minBoltEmits = 5;
+        //Sliding windows should produce one window every slideSize tuples
+        //Wait for the spout to emit at least enough tuples to get minBoltEmit windows and at least one full window
+        final int minSpoutEmits = Math.max(windowSize, minBoltEmits * slideSize);
+        
         String boltName = testable.getBoltName();
         String spoutName = testable.getSpoutName();
-        topo.waitForProgress(minSpoutEmits, spoutName, 180);
-        topo.waitForProgress(minBoltEmits, boltName, 180);
+        //Waiting for spout tuples isn't strictly necessary since we also wait for bolt emits, but do it anyway
+        topo.assertProgress(minSpoutEmits, spoutName, 180);
+        topo.assertProgress(minBoltEmits, boltName, 180);
         List<TopoWrap.ExecutorURL> boltUrls = topo.getLogUrls(boltName);
         log.info(boltUrls.toString());
         final List<LogData> allBoltData = topo.getLogData(boltName);
         final List<LogData> allSpoutData = topo.getLogData(spoutName);
         Assert.assertTrue(allBoltData.size() >= minBoltEmits,
                 "Expecting min " + minBoltEmits + " bolt emits, found: " + allBoltData.size() + " \n\t" + allBoltData);
-        final int numberOfWindows = allBoltData.size() - windowSize / slideSize;
+        final int numberOfWindows = allBoltData.size();
         for(int i = 0; i < numberOfWindows; ++i ) {
             log.info("Comparing window: " + (i + 1) + " of " + numberOfWindows);
             final int toIndex = (i + 1) * slideSize;
@@ -143,28 +148,29 @@ public final class SlidingWindowTest extends AbstractTest {
         runAndVerifyTime(windowSec, slideSec, testable, topo);
     }
 
-    static void runAndVerifyTime(int windowSec, int slideSec, TestableTopology testable, TopoWrap topo) throws TException, java.net.MalformedURLException {
+    static void runAndVerifyTime(int windowSec, int slideSec, TestableTopology testable, TopoWrap topo) throws IOException, TException, java.net.MalformedURLException {
         topo.submitSuccessfully();
-        final int minSpoutEmits = 1000 + windowSec;
+        final int minSpoutEmits = 100;
         final int minBoltEmits = 5;
         String boltName = testable.getBoltName();
         String spoutName = testable.getSpoutName();
-        topo.waitForProgress(minSpoutEmits, spoutName, 60 + 10 * (windowSec + slideSec));
-        topo.waitForProgress(minBoltEmits, boltName, 60 + 10 * (windowSec + slideSec));
-        final List<TimeData> allSpoutData = topo.getLogData(spoutName, TimeData.CLS);
-        final List<LogData> allBoltLog = topo.getLogData(boltName);
-        final List<TimeDataWindow> allBoltData = topo.getLogData(boltName, TimeDataWindow.CLS);
-        Assert.assertTrue(allBoltLog.size() >= minBoltEmits,
-                "Expecting min " + minBoltEmits + " bolt emits, found: " + allBoltLog.size() + " \n\t" + allBoltLog);
-        final DateTime firstEndTime = TimeUtil.ceil(new DateTime(allSpoutData.get(0).getDate()).withZone(DateTimeZone.UTC), slideSec);
-        final int numberOfWindows = allBoltLog.size() - windowSec / slideSec;
+        //Waiting for spout tuples isn't strictly necessary since we also wait for bolt emits, but do it anyway
+        topo.assertProgress(minSpoutEmits, spoutName, 60 + 10 * (windowSec + slideSec));
+        topo.assertProgress(minBoltEmits, boltName, 60 + 10 * (windowSec + slideSec));
+        final List<TimeData> allSpoutDataDeserialized = topo.getLogData(spoutName, TimeData.CLS);
+        final List<LogData> allBoltData = topo.getLogData(boltName);
+        final List<TimeDataWindow> allBoltDataDeserialized = topo.deserializeLogData(allBoltData, TimeDataWindow.CLS);
+        Assert.assertTrue(allBoltData.size() >= minBoltEmits,
+                "Expecting min " + minBoltEmits + " bolt emits, found: " + allBoltData.size() + " \n\t" + allBoltData);
+        final DateTime firstEndTime = TimeUtil.ceil(new DateTime(allSpoutDataDeserialized.get(0).getDate()).withZone(DateTimeZone.UTC), slideSec);
+        final int numberOfWindows = allBoltData.size();
         for(int i = 0; i < numberOfWindows; ++i ) {
             final DateTime toDate = firstEndTime.plusSeconds(i * slideSec);
             final DateTime  fromDate =  toDate.minusSeconds(windowSec);
             log.info("Comparing window: " + fromDate + " to " + toDate + " iter " + (i+1) + "/" + numberOfWindows);
-            final TimeDataWindow computedWindow = TimeDataWindow.newInstance(allSpoutData,fromDate, toDate);
-            final LogData oneBoltLog = allBoltLog.get(i);
-            final TimeDataWindow actualWindow = allBoltData.get(i);
+            final TimeDataWindow computedWindow = TimeDataWindow.newInstance(allSpoutDataDeserialized,fromDate, toDate);
+            final LogData oneBoltLog = allBoltData.get(i);
+            final TimeDataWindow actualWindow = allBoltDataDeserialized.get(i);
             log.info("Actual window: " + actualWindow.getDescription());
             log.info("Computed window: " + computedWindow.getDescription());
             for (TimeData oneLog : computedWindow) {

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
----------------------------------------------------------------------
diff --git a/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
index ac6d0c7..af1e1be 100644
--- a/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
+++ b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java
@@ -53,6 +53,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -65,6 +66,8 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
 
 public class TopoWrap {
     private static Logger log = LoggerFactory.getLogger(TopoWrap.class);
@@ -97,6 +100,9 @@ public class TopoWrap {
         submitConf.put("storm.zookeeper.topology.auth.scheme", "digest");
         submitConf.put("topology.workers", 3);
         submitConf.put("topology.debug", true);
+        //Set the metrics sample rate to 1 to force update the executor stats every time something happens
+        //This is necessary because getAllTimeEmittedCount relies on the executor emit stats to be accurate
+        submitConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 1);
         return submitConf;
     }
 
@@ -180,7 +186,7 @@ public class TopoWrap {
                 Map<String, Long> allTime = emitted.get(since);
                 if (allTime == null)
                     return 0L;
-                return allTime.get("default");
+                return allTime.get(Utils.DEFAULT_STREAM_ID);
             }
         });
         return sum(ackCounts).longValue();
@@ -213,7 +219,7 @@ public class TopoWrap {
             log.info(getInfo().toString());
             long emitCount = getAllTimeEmittedCount(componentName);
             log.info("Count for component " + componentName + " is " + emitCount);
-            if (emitCount > minEmits) {
+            if (emitCount >= minEmits) {
                 break;
             }
             TimeUtil.sleepSec(10);
@@ -280,8 +286,13 @@ public class TopoWrap {
         }
     }
 
-    public <T extends FromJson<T>> List<T> getLogData(final String componentId, final FromJson<T> cls) throws TException, MalformedURLException {
+    public <T extends FromJson<T>> List<T> getLogData(final String componentId, final FromJson<T> cls) 
+            throws IOException, TException, MalformedURLException {
         final List<LogData> logData = getLogData(componentId);
+        return deserializeLogData(logData, cls);
+    }
+    
+    public <T extends FromJson<T>> List<T> deserializeLogData(final List<LogData> logData, final FromJson<T> cls) {
         final List<T> data = new ArrayList<>(
                 Collections2.transform(logData, new Function<LogData, T>() {
                     @Nullable
@@ -294,7 +305,7 @@ public class TopoWrap {
         return data;
     }
 
-    public List<LogData> getLogData(final String componentId) throws TException, MalformedURLException {
+    public List<LogData> getLogData(final String componentId) throws IOException, TException, MalformedURLException {
         final String logs = getLogs(componentId);
         final String dateRegex = "\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3}";
         Pattern pattern = Pattern.compile("(?=\\n" + dateRegex + ")");
@@ -318,48 +329,38 @@ public class TopoWrap {
         return sortedLogs;
     }
 
-    public String getLogs(final String componentId) throws TException, MalformedURLException {
+    public String getLogs(final String componentId) throws IOException, TException, MalformedURLException {
         log.info("Fetching logs for componentId = " + componentId);
         List<ExecutorURL> exclaim2Urls = getLogUrls(componentId);
         log.info("Found " + exclaim2Urls.size() + " urls: " + exclaim2Urls.toString());
-        Collection<String> urlOuputs = Collections2.transform(exclaim2Urls, new Function<ExecutorURL, String>() {
-            @Nullable
-            @Override
-            public String apply(@Nullable ExecutorURL executorURL) {
-                if (executorURL == null || executorURL.getDownloadUrl() == null) {
-                    return "";
-                }
-                String warnMessage = "Couldn't fetch executorURL: " + executorURL;
+        List<String> urlContents = new ArrayList<>();
+        for(ExecutorURL executorUrl : exclaim2Urls) {
+            if(executorUrl == null || executorUrl.getDownloadUrl() == null) {
+                continue;
+            }
+            log.info("Fetching: " + executorUrl);
+            URL downloadUrl = executorUrl.downloadUrl;
+            String urlContent = IOUtils.toString(downloadUrl, StandardCharsets.UTF_8);
+            urlContents.add(urlContent);
+            if (urlContent.length() < 500) {
+                log.info("Fetched: " + urlContent);
+            } else {
+                log.info("Fetched: " + NumberFormat.getNumberInstance(Locale.US).format(urlContent.length()) + " bytes.");
+            }
+            if (System.getProperty("regression.downloadWorkerLogs").equalsIgnoreCase("true")) {
+                final String userDir = System.getProperty("user.dir");
+                final File target = new File(userDir, "target");
+                final File logDir = new File(target, "logs");
+                final File logFile = new File(logDir, downloadUrl.getHost() + "-" + downloadUrl.getFile().split("/")[2]);
                 try {
-                    log.info("Fetching: " + executorURL);
-                    final URL downloadUrl = executorURL.downloadUrl;
-                    final String urlContent = IOUtils.toString(downloadUrl);
-                    if (urlContent.length() < 500) {
-                        log.info("Fetched: " + urlContent);
-                    } else {
-                        log.info("Fetched: " + NumberFormat.getNumberInstance(Locale.US).format(urlContent.length()) + " bytes.");
-                    }
-                    if (System.getProperty("regression.downloadWorkerLogs").equalsIgnoreCase("true")) {
-                        final String userDir = System.getProperty("user.dir");
-                        final File target = new File(userDir, "target");
-                        final File logDir = new File(target, "logs");
-                        final File logFile = new File(logDir, downloadUrl.getHost() + "-" + downloadUrl.getFile().split("/")[2]);
-                        try {
-                            FileUtils.forceMkdir(logDir);
-                            FileUtils.write(logFile, urlContent);
-                        } catch (Throwable throwable) {
-                            log.info("Caught exteption: " + ExceptionUtils.getFullStackTrace(throwable));
-                        }
-                    }
-                    return urlContent;
-                } catch (IOException e) {
-                    log.warn(warnMessage);
+                    FileUtils.forceMkdir(logDir);
+                    FileUtils.write(logFile, urlContent, StandardCharsets.UTF_8);
+                } catch (Throwable throwable) {
+                    log.info("Caught exception: " + ExceptionUtils.getFullStackTrace(throwable));
                 }
-                String stars = StringUtils.repeat("*", 30);
-                return stars + "   " + warnMessage + "   " + stars;
             }
-        });
-        return StringUtils.join(urlOuputs, '\n');
+        }
+        return StringUtils.join(urlContents, '\n');
     }
 
     private Number sum(Collection<? extends Number> nums) {

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bdcc716..f96cfd5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -284,6 +284,7 @@
         <junit.version>4.11</junit.version>
         <metrics-clojure.version>2.5.1</metrics-clojure.version>
         <hdrhistogram.version>2.1.7</hdrhistogram.version>
+        <hamcrest.version>1.3</hamcrest.version>
 
         <calcite.version>1.11.0</calcite.version>
 
@@ -619,7 +620,7 @@
                 <artifactId>jersey-container-servlet-core</artifactId>
                 <version>${jersey.version}</version>
             </dependency> 
-             <dependency>
+            <dependency>
                 <groupId>org.glassfish.jersey.containers</groupId> 
                 <artifactId>jersey-container-jetty-http</artifactId>
                 <version>${jersey.version}</version>
@@ -959,6 +960,24 @@
             </dependency>
             <dependency>
                 <groupId>org.mockito</groupId>
+                <artifactId>mockito-core</artifactId>
+                <version>${mockito.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.hamcrest</groupId>
+                <artifactId>hamcrest-core</artifactId>
+                <version>${hamcrest.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.hamcrest</groupId>
+                <artifactId>hamcrest-library</artifactId>
+                <version>${hamcrest.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.mockito</groupId>
                 <artifactId>mockito-all</artifactId>
                 <version>${mockito.version}</version>
                 <scope>test</scope>
@@ -1097,7 +1116,7 @@
                             <groupId>com.puppycrawl.tools</groupId>
                             <artifactId>checkstyle</artifactId>
                             <!-- If you change this, you should also update the storm_checkstyle.xml file to be
-                                 based on the google_checks.xml from the version of checkstyle you are choosing. -->
+                            based on the google_checks.xml from the version of checkstyle you are choosing. -->
                             <version>7.7</version>
                         </dependency>
                     </dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/pom.xml
----------------------------------------------------------------------
diff --git a/storm-client/pom.xml b/storm-client/pom.xml
index 30d7685..538b7b9 100644
--- a/storm-client/pom.xml
+++ b/storm-client/pom.xml
@@ -187,7 +187,12 @@
         </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-library</artifactId>
             <scope>test</scope>
         </dependency>
     </dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
index 2ada9bf..56c329e 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
@@ -199,8 +199,7 @@ public class WindowedBoltExecutor implements IRichBolt {
         // validate
         validate(topoConf, windowLengthCount, windowLengthDuration,
                  slidingIntervalCount, slidingIntervalDuration);
-        evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration,
-                                                                 manager);
+        evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration);
         triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration,
                                                               manager, evictionPolicy);
         manager.setEvictionPolicy(evictionPolicy);
@@ -251,8 +250,7 @@ public class WindowedBoltExecutor implements IRichBolt {
         }
     }
 
-    private EvictionPolicy<Tuple> getEvictionPolicy(Count windowLengthCount, Duration windowLengthDuration,
-                                                    WindowManager<Tuple> manager) {
+    private EvictionPolicy<Tuple> getEvictionPolicy(Count windowLengthCount, Duration windowLengthDuration) {
         if (windowLengthCount != null) {
             if (isTupleTs()) {
                 return new WatermarkCountEvictionPolicy<>(windowLengthCount.value);

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java b/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java
index 37dcfd9..ee5fdb9 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/EvictionContext.java
@@ -38,7 +38,7 @@ public interface EvictionContext {
     Long getSlidingCount();
 
     /**
-     * Returns the current count of events in the queue up to the reference tim
+     * Returns the current count of events in the queue up to the reference time
      * based on which count based evictions can be performed.
      *
      * @return the current count

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
index 774d0a3..fa44444 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
@@ -27,7 +27,7 @@ public interface EvictionPolicy<T> {
     /**
      * The action to be taken when {@link EvictionPolicy#evict(Event)} is invoked.
      */
-    enum Action {
+    public enum Action {
         /**
          * expire the event and remove it from the queue
          */

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
index 802e6bb..570b057 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
@@ -38,7 +38,7 @@ public class TimeEvictionPolicy<T> implements EvictionPolicy<T> {
      * {@inheritDoc}
      */
     @Override
-    public Action evict(Event<T> event) {
+    public Action evict(Event<T> event) {      
         long now = evictionContext == null ? System.currentTimeMillis() : evictionContext.getReferenceTime();
         long diff = now - event.getTimestamp();
         if (diff >= windowLength) {

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
index b057afb..6b6d9fa 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
@@ -114,9 +114,7 @@ public class TimeTriggerPolicy<T> implements TriggerPolicy<T> {
                      * set the current timestamp as the reference time for the eviction policy
                      * to evict the events
                      */
-                    if (evictionPolicy != null) {
-                        evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
-                    }
+                    evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
                     handler.onTrigger();
                 } catch (Throwable th) {
                     LOG.error("handler.onTrigger failed ", th);

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java b/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
index e1df72c..ef81d6e 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
@@ -77,7 +77,7 @@ public class WaterMarkEventGenerator<T> implements Runnable {
         try {
             long waterMarkTs = computeWaterMarkTs();
             if (waterMarkTs > lastWaterMarkTs) {
-                this.windowManager.add(new WaterMarkEvent<T>(waterMarkTs));
+                this.windowManager.add(new WaterMarkEvent<>(waterMarkTs));
                 lastWaterMarkTs = waterMarkTs;
             }
         } catch (Throwable th) {

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
index c5d7b49..0fe6f75 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
@@ -19,7 +19,7 @@ package org.apache.storm.windowing;
 
 /**
  * An eviction policy that tracks count based on watermark ts and
- * evicts events upto the watermark based on a threshold count.
+ * evicts events up to the watermark based on a threshold count.
  *
  * @param <T> the type of event tracked by this policy.
  */
@@ -32,6 +32,13 @@ public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> {
 
     @Override
     public Action evict(Event<T> event) {
+        if(getContext() == null) {
+            //It is possible to get asked about eviction before we have a context, due to WindowManager.compactWindow.
+            //In this case we should hold on to all the events. When the first watermark is received, the context will be set,
+            //and the events will be reevaluated for eviction
+            return Action.STOP;
+        }
+        
         Action action;
         if (event.getTimestamp() <= getContext().getReferenceTime() && processed < currentCount.get()) {
             action = super.evict(event);

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java
index e5ecba4..fdb3917 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java
@@ -57,7 +57,14 @@ public class WatermarkTimeEvictionPolicy<T> extends TimeEvictionPolicy<T> {
      */
     @Override
     public Action evict(Event<T> event) {
-        long referenceTime = evictionContext.getReferenceTime() != null ? evictionContext.getReferenceTime() : 0L;
+        if(evictionContext == null) {
+            //It is possible to get asked about eviction before we have a context, due to WindowManager.compactWindow.
+            //In this case we should hold on to all the events. When the first watermark is received, the context will be set,
+            //and the events will be reevaluated for eviction
+            return Action.STOP;
+        }
+        
+        long referenceTime = evictionContext.getReferenceTime();
         long diff =  referenceTime - event.getTimestamp();
         if (diff < -lag) {
             return Action.STOP;

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java b/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java
index 8021ba8..f6cc521 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java
@@ -46,6 +46,9 @@ public class WindowManager<T> implements TriggerHandler {
     /**
      * Expire old events every EXPIRE_EVENTS_THRESHOLD to
      * keep the window size in check.
+     * 
+     * Note that if the eviction policy is based on watermarks, events will not be evicted until a new
+     * watermark would cause them to be considered expired anyway, regardless of this limit
      */
     public static final int EXPIRE_EVENTS_THRESHOLD = 100;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1db86744/storm-client/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/windowing/WindowManagerTest.java b/storm-client/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
index 6c170c6..178c1bb 100644
--- a/storm-client/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
+++ b/storm-client/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
@@ -29,9 +29,12 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
 import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.empty;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -155,7 +158,48 @@ public class WindowManagerTest {
         // window should be compacted and events should be expired.
         assertEquals(seq(1, threshold - windowLength), listener.onExpiryEvents);
     }
-
+    
+    private void testEvictBeforeWatermarkForWatermarkEvictionPolicy(EvictionPolicy watermarkEvictionPolicy, int windowLength) throws Exception {
+        /**
+         * The watermark eviction policy must not evict tuples until the first watermark has been received.
+         * The policies can't make a meaningful decision prior to the first watermark, so the safe decision
+         * is to postpone eviction.
+         */ 
+        int threshold = WindowManager.EXPIRE_EVENTS_THRESHOLD;
+        windowManager.setEvictionPolicy(watermarkEvictionPolicy);
+        WatermarkCountTriggerPolicy triggerPolicy = new WatermarkCountTriggerPolicy(windowLength, windowManager,
+            watermarkEvictionPolicy, windowManager);
+        triggerPolicy.start();
+        windowManager.setTriggerPolicy(triggerPolicy);
+        for (int i : seq(1, threshold)) {
+            windowManager.add(i, i);
+        }
+        assertThat("The watermark eviction policies should never evict events before the first watermark is received",  listener.onExpiryEvents, is(empty()));
+        windowManager.add(new WaterMarkEvent<>(threshold));
+        // The events should be put in a window when the first watermark is received
+        assertEquals(seq(1, threshold), listener.onActivationEvents);
+        //Now add some more events and a new watermark, and check that the previous events are expired
+        for(int i : seq(threshold+1, threshold*2)) {
+            windowManager.add(i, i);
+        }
+        windowManager.add(new WaterMarkEvent<>(threshold + windowLength+1));
+        //All the events should be expired when the next watermark is received
+        assertThat("All the events should be expired after the second watermark", listener.onExpiryEvents, equalTo(seq(1, threshold)));
+    }
+    
+    @Test
+    public void testExpireThresholdWithWatermarkCountEvictionPolicy() throws Exception {
+        int windowLength = WindowManager.EXPIRE_EVENTS_THRESHOLD;
+        EvictionPolicy watermarkCountEvictionPolicy = new WatermarkCountEvictionPolicy(windowLength);
+        testEvictBeforeWatermarkForWatermarkEvictionPolicy(watermarkCountEvictionPolicy, windowLength);
+    }
+    
+    @Test
+    public void testExpireThresholdWithWatermarkTimeEvictionPolicy() throws Exception {
+        int windowLength = WindowManager.EXPIRE_EVENTS_THRESHOLD;
+        EvictionPolicy watermarkTimeEvictionPolicy = new WatermarkTimeEvictionPolicy(windowLength);
+        testEvictBeforeWatermarkForWatermarkEvictionPolicy(watermarkTimeEvictionPolicy, windowLength);
+    }
 
     @Test
     public void testTimeBasedWindow() throws Exception {


[2/3] storm git commit: Merge branch 'STORM-2525' of https://github.com/srdo/storm into STORM-2525

Posted by bo...@apache.org.
Merge branch 'STORM-2525' of https://github.com/srdo/storm into STORM-2525

STORM-2525: Fix flaky integration tests


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c2bd9f09
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c2bd9f09
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c2bd9f09

Branch: refs/heads/master
Commit: c2bd9f09aadb42c56960aae032de2ea289e83f32
Parents: d67aaf3 1db8674
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue May 23 12:57:10 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue May 23 12:57:10 2017 -0500

----------------------------------------------------------------------
 integration-test/config/Vagrantfile             |   8 +-
 integration-test/config/cluster.xml             | 101 -------------------
 integration-test/config/install-storm.sh        |   1 -
 integration-test/config/install-zookeeper.sh    |   3 +-
 integration-test/pom.xml                        |   2 +-
 integration-test/run-it.sh                      |  10 +-
 .../topology/window/SlidingTimeCorrectness.java |   4 +-
 .../window/SlidingWindowCorrectness.java        |   2 -
 .../window/TumblingTimeCorrectness.java         |   4 +-
 .../window/TumblingWindowCorrectness.java       |   2 -
 .../apache/storm/st/utils/StringDecorator.java  |   1 +
 .../st/tests/window/SlidingWindowTest.java      |  44 ++++----
 .../org/apache/storm/st/wrapper/TopoWrap.java   |  81 +++++++--------
 pom.xml                                         |  23 ++++-
 storm-client/pom.xml                            |   7 +-
 .../storm/topology/WindowedBoltExecutor.java    |   6 +-
 .../apache/storm/windowing/EvictionContext.java |   2 +-
 .../apache/storm/windowing/EvictionPolicy.java  |   2 +-
 .../storm/windowing/TimeEvictionPolicy.java     |   2 +-
 .../storm/windowing/TimeTriggerPolicy.java      |   4 +-
 .../windowing/WaterMarkEventGenerator.java      |   2 +-
 .../windowing/WatermarkCountEvictionPolicy.java |   9 +-
 .../windowing/WatermarkTimeEvictionPolicy.java  |   9 +-
 .../apache/storm/windowing/WindowManager.java   |   3 +
 .../storm/windowing/WindowManagerTest.java      |  48 ++++++++-
 25 files changed, 185 insertions(+), 195 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: Added STORM-2525 to Changelog

Posted by bo...@apache.org.
Added STORM-2525 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1a380ede
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1a380ede
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1a380ede

Branch: refs/heads/master
Commit: 1a380ede3fb3e23db95790703c0a5a09c9052222
Parents: c2bd9f0
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue May 23 12:57:32 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue May 23 12:57:32 2017 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1a380ede/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ea8c54a..ce93911 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-2525: Fix flaky integration tests
  * STORM-2526: Revert changes mistakenly made to generated files
  * STORM-2524: Add the option to set client.id to storm-kafka
  * STORM-2448: Add in Storm and JDK versions when submitting a topology.