You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/03/11 01:47:19 UTC

[1/9] beam git commit: Move GC timer checking to StatefulDoFnRunner.CleanupTimer

Repository: beam
Updated Branches:
  refs/heads/release-0.6.0 dc64c2fc0 -> ebc2ba5bf


Move GC timer checking to StatefulDoFnRunner.CleanupTimer


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a18b5b16
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a18b5b16
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a18b5b16

Branch: refs/heads/release-0.6.0
Commit: a18b5b1648489f14fd7a621f345e4d21c09b437f
Parents: dc64c2f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 08:29:27 2017 +0100
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:13:40 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/core/StatefulDoFnRunner.java   | 29 ++++++++++++++++----
 1 file changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a18b5b16/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 154d8bc..926345e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -115,15 +115,12 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
   @Override
   public void onTimer(
       String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
-    boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
-    Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
-    if (isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp)) {
+    if (cleanupTimer.isForWindow(timerId, window, timestamp, timeDomain)) {
       stateCleaner.clearForWindow(window);
       // There should invoke the onWindowExpiration of DoFn
     } else {
-      if (isEventTimer || !dropLateData(window)) {
-        doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
-      }
+      // a timer can never be late because we don't allow setting timers after GC time
+      doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
     }
   }
 
@@ -151,6 +148,16 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
      * Set the garbage collect time of the window to timer.
      */
     void setForWindow(BoundedWindow window);
+
+    /**
+     * Checks whether the given timer is a cleanup timer for the window.
+     */
+    boolean isForWindow(
+        String timerId,
+        BoundedWindow window,
+        Instant timestamp,
+        TimeDomain timeDomain);
+
   }
 
   /**
@@ -191,6 +198,16 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
           GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
     }
 
+    @Override
+    public boolean isForWindow(
+        String timerId,
+        BoundedWindow window,
+        Instant timestamp,
+        TimeDomain timeDomain) {
+      boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
+      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+      return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
+    }
   }
 
   /**


[7/9] beam git commit: Generate zip distribution for pyhthon

Posted by al...@apache.org.
Generate zip distribution for pyhthon


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bce94c4e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bce94c4e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bce94c4e

Branch: refs/heads/release-0.6.0
Commit: bce94c4e06bf2ee2428e7b6fa71d0d9144b7ee61
Parents: ef47c9f
Author: Ahmet Altay <al...@google.com>
Authored: Fri Mar 10 16:40:34 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:16:09 2017 -0800

----------------------------------------------------------------------
 sdks/python/pom.xml | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bce94c4e/sdks/python/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/python/pom.xml b/sdks/python/pom.xml
index 98b7fa3..2fcbaad 100644
--- a/sdks/python/pom.xml
+++ b/sdks/python/pom.xml
@@ -136,6 +136,8 @@
                 <argument>sdist</argument>
                 <argument>--dist-dir</argument>
                 <argument>${project.build.directory}</argument>
+                <argument>--formats</argument>
+                <argument>zip,gztar</argument>
               </arguments>
               <environmentVariables>
                 <PYTHONUSERBASE>${python.user.base}</PYTHONUSERBASE>


[5/9] beam git commit: Remove duplicated dependency from Dataflow runner pom.xml

Posted by al...@apache.org.
Remove duplicated dependency from Dataflow runner pom.xml


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c7c4da28
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c7c4da28
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c7c4da28

Branch: refs/heads/release-0.6.0
Commit: c7c4da28b7925de38e7c10fcc4e9ef52a5ea76fc
Parents: 2b92b0d
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Mar 10 13:14:58 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:15:32 2017 -0800

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml | 4 ----
 1 file changed, 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c7c4da28/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index d305e15..a2bca0d 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -357,9 +357,5 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
-      <dependency>
-          <groupId>org.apache.beam</groupId>
-          <artifactId>beam-runners-core-construction-java</artifactId>
-      </dependency>
   </dependencies>
 </project>


[9/9] beam git commit: Revert "[maven-release-plugin] prepare release v0.6.0-RC2"

Posted by al...@apache.org.
Revert "[maven-release-plugin] prepare release v0.6.0-RC2"

This reverts commit 7321c9afc5aeb3b786584bfe4b145cc3bf639830.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ebc2ba5b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ebc2ba5b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ebc2ba5b

Branch: refs/heads/release-0.6.0
Commit: ebc2ba5bf4cc368b25a9cd6131175bac3afffe13
Parents: 7321c9a
Author: Ahmet Altay <al...@google.com>
Authored: Fri Mar 10 17:46:44 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:46:44 2017 -0800

----------------------------------------------------------------------
 examples/java/pom.xml                             | 2 +-
 examples/java8/pom.xml                            | 2 +-
 examples/pom.xml                                  | 2 +-
 pom.xml                                           | 4 ++--
 runners/apex/pom.xml                              | 2 +-
 runners/core-construction-java/pom.xml            | 2 +-
 runners/core-java/pom.xml                         | 2 +-
 runners/direct-java/pom.xml                       | 2 +-
 runners/flink/examples/pom.xml                    | 2 +-
 runners/flink/pom.xml                             | 2 +-
 runners/flink/runner/pom.xml                      | 2 +-
 runners/google-cloud-dataflow-java/pom.xml        | 2 +-
 runners/pom.xml                                   | 2 +-
 runners/spark/pom.xml                             | 2 +-
 sdks/common/fn-api/pom.xml                        | 2 +-
 sdks/common/pom.xml                               | 2 +-
 sdks/common/runner-api/pom.xml                    | 2 +-
 sdks/java/build-tools/pom.xml                     | 2 +-
 sdks/java/core/pom.xml                            | 2 +-
 sdks/java/extensions/jackson/pom.xml              | 2 +-
 sdks/java/extensions/join-library/pom.xml         | 2 +-
 sdks/java/extensions/pom.xml                      | 2 +-
 sdks/java/extensions/sorter/pom.xml               | 2 +-
 sdks/java/harness/pom.xml                         | 2 +-
 sdks/java/io/elasticsearch/pom.xml                | 2 +-
 sdks/java/io/google-cloud-platform/pom.xml        | 2 +-
 sdks/java/io/hadoop-common/pom.xml                | 2 +-
 sdks/java/io/hbase/pom.xml                        | 2 +-
 sdks/java/io/hdfs/pom.xml                         | 2 +-
 sdks/java/io/jdbc/pom.xml                         | 2 +-
 sdks/java/io/jms/pom.xml                          | 2 +-
 sdks/java/io/kafka/pom.xml                        | 2 +-
 sdks/java/io/kinesis/pom.xml                      | 2 +-
 sdks/java/io/mongodb/pom.xml                      | 2 +-
 sdks/java/io/mqtt/pom.xml                         | 2 +-
 sdks/java/io/pom.xml                              | 2 +-
 sdks/java/java8tests/pom.xml                      | 2 +-
 sdks/java/javadoc/pom.xml                         | 2 +-
 sdks/java/maven-archetypes/examples-java8/pom.xml | 2 +-
 sdks/java/maven-archetypes/examples/pom.xml       | 2 +-
 sdks/java/maven-archetypes/pom.xml                | 2 +-
 sdks/java/maven-archetypes/starter/pom.xml        | 2 +-
 sdks/java/pom.xml                                 | 2 +-
 sdks/pom.xml                                      | 2 +-
 sdks/python/pom.xml                               | 2 +-
 45 files changed, 46 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 89d55b5..60e4fed 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-examples-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/examples/java8/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml
index d00dbc9..580a154 100644
--- a/examples/java8/pom.xml
+++ b/examples/java8/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-examples-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index f5f7175..550578b 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 109faf6..eded684 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
   <url>http://beam.apache.org/</url>
   <inceptionYear>2016</inceptionYear>
 
-  <version>0.6.0</version>
+  <version>0.6.0-SNAPSHOT</version>
 
   <licenses>
     <license>
@@ -48,7 +48,7 @@
     <connection>scm:git:https://git-wip-us.apache.org/repos/asf/beam.git</connection>
     <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/beam.git</developerConnection>
     <url>https://git-wip-us.apache.org/repos/asf?p=beam.git;a=summary</url>
-    <tag>v0.6.0-RC2</tag>
+    <tag>release-0.6.0</tag>
   </scm>
 
   <issueManagement>

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 1401e3d..f5fe4bc 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index b45588f..4c2394c 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>beam-runners-parent</artifactId>
     <groupId>org.apache.beam</groupId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index f59caee..c2c88a0 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index f961681..a7d5409 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/flink/examples/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml
index b4d77dd..1d426bd 100644
--- a/runners/flink/examples/pom.xml
+++ b/runners/flink/examples/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-flink-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 7af4842..0030f61 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 852a173..13d5b10 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-flink-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index e44eaf2..a2bca0d 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index f6379d0..3f74f7b 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index f58f439..0d4c413 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/common/fn-api/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/pom.xml b/sdks/common/fn-api/pom.xml
index 6ada6af..5a41d9e 100644
--- a/sdks/common/fn-api/pom.xml
+++ b/sdks/common/fn-api/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-common-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/pom.xml b/sdks/common/pom.xml
index 0c5e4d2..55db181 100644
--- a/sdks/common/pom.xml
+++ b/sdks/common/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/common/runner-api/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/pom.xml b/sdks/common/runner-api/pom.xml
index aadc560..9c6de1e 100644
--- a/sdks/common/runner-api/pom.xml
+++ b/sdks/common/runner-api/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-common-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/build-tools/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/pom.xml b/sdks/java/build-tools/pom.xml
index 9a55048..545f394 100644
--- a/sdks/java/build-tools/pom.xml
+++ b/sdks/java/build-tools/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../../../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 04f07bb..55cbcd7 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/extensions/jackson/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/jackson/pom.xml b/sdks/java/extensions/jackson/pom.xml
index c825e48..bdec58f 100644
--- a/sdks/java/extensions/jackson/pom.xml
+++ b/sdks/java/extensions/jackson/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-extensions-parent</artifactId>
-        <version>0.6.0</version>
+        <version>0.6.0-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/extensions/join-library/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/pom.xml b/sdks/java/extensions/join-library/pom.xml
index c5b6324..728c6f7 100644
--- a/sdks/java/extensions/join-library/pom.xml
+++ b/sdks/java/extensions/join-library/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-extensions-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
index 721495f..26d92de 100644
--- a/sdks/java/extensions/pom.xml
+++ b/sdks/java/extensions/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/extensions/sorter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/pom.xml b/sdks/java/extensions/sorter/pom.xml
index 0e794c1..693359e 100644
--- a/sdks/java/extensions/sorter/pom.xml
+++ b/sdks/java/extensions/sorter/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-extensions-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index 4a7fa0a..80b01ca 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/pom.xml b/sdks/java/io/elasticsearch/pom.xml
index c308fac..5ea4452 100644
--- a/sdks/java/io/elasticsearch/pom.xml
+++ b/sdks/java/io/elasticsearch/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 83d551a..66a4207 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/pom.xml b/sdks/java/io/hadoop-common/pom.xml
index 996d4ac..fcd984f 100644
--- a/sdks/java/io/hadoop-common/pom.xml
+++ b/sdks/java/io/hadoop-common/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
index efd7a64..85ebc87 100644
--- a/sdks/java/io/hbase/pom.xml
+++ b/sdks/java/io/hbase/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index e6d14a8..58dbf45 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index 87b0b27..fd5c52b 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/jms/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/pom.xml b/sdks/java/io/jms/pom.xml
index d510583..cc7fb27 100644
--- a/sdks/java/io/jms/pom.xml
+++ b/sdks/java/io/jms/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index be02dc6..f04361c 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/pom.xml b/sdks/java/io/kinesis/pom.xml
index bef65a4..5138368 100644
--- a/sdks/java/io/kinesis/pom.xml
+++ b/sdks/java/io/kinesis/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml
index a469501..7deae14 100644
--- a/sdks/java/io/mongodb/pom.xml
+++ b/sdks/java/io/mongodb/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/pom.xml b/sdks/java/io/mqtt/pom.xml
index 04f5416..8b1d848 100644
--- a/sdks/java/io/mqtt/pom.xml
+++ b/sdks/java/io/mqtt/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 22432b9..8d5b69b 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/java8tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/pom.xml b/sdks/java/java8tests/pom.xml
index aee81ef..fda1d63 100644
--- a/sdks/java/java8tests/pom.xml
+++ b/sdks/java/java8tests/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/javadoc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml
index 91c76fc..b785c98 100644
--- a/sdks/java/javadoc/pom.xml
+++ b/sdks/java/javadoc/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../../../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/maven-archetypes/examples-java8/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples-java8/pom.xml b/sdks/java/maven-archetypes/examples-java8/pom.xml
index 4603e11..2632d6d 100644
--- a/sdks/java/maven-archetypes/examples-java8/pom.xml
+++ b/sdks/java/maven-archetypes/examples-java8/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/maven-archetypes/examples/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/pom.xml b/sdks/java/maven-archetypes/examples/pom.xml
index 674ff60..09e5428 100644
--- a/sdks/java/maven-archetypes/examples/pom.xml
+++ b/sdks/java/maven-archetypes/examples/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/maven-archetypes/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/pom.xml b/sdks/java/maven-archetypes/pom.xml
index 3d1b539..194e5bd 100644
--- a/sdks/java/maven-archetypes/pom.xml
+++ b/sdks/java/maven-archetypes/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/maven-archetypes/starter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml
index 29df45e..092995a 100644
--- a/sdks/java/maven-archetypes/starter/pom.xml
+++ b/sdks/java/maven-archetypes/starter/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 7d7153a..a09a6be 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/pom.xml b/sdks/pom.xml
index cf21743..f130816 100644
--- a/sdks/pom.xml
+++ b/sdks/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/python/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/python/pom.xml b/sdks/python/pom.xml
index 56a232c..2fcbaad 100644
--- a/sdks/python/pom.xml
+++ b/sdks/python/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-parent</artifactId>
-    <version>0.6.0</version>
+    <version>0.6.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 


[3/9] beam git commit: Properly deal with late processing-time timers

Posted by al...@apache.org.
Properly deal with late processing-time timers


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/86522157
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/86522157
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/86522157

Branch: refs/heads/release-0.6.0
Commit: 86522157a79fd9a753436312ff8b746cb5740135
Parents: 8fa718d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 15:25:26 2017 +0100
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:13:57 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/core/StatefulDoFnRunner.java   | 40 ++++++++++++--------
 1 file changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/86522157/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index c672902..d27193c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -76,33 +76,31 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
   }
 
   @Override
-  public void processElement(WindowedValue<InputT> compressedElem) {
+  public void processElement(WindowedValue<InputT> input) {
 
     // StatefulDoFnRunner always observes windows, so we need to explode
-    for (WindowedValue<InputT> value : compressedElem.explodeWindows()) {
+    for (WindowedValue<InputT> value : input.explodeWindows()) {
 
       BoundedWindow window = value.getWindows().iterator().next();
 
-      if (!dropLateData(window)) {
+      if (isLate(window)) {
+        // The element is too late for this window.
+        droppedDueToLateness.addValue(1L);
+        WindowTracing.debug(
+            "StatefulDoFnRunner.processElement: Dropping element at {}; window:{} "
+                + "since too far behind inputWatermark:{}",
+            input.getTimestamp(), window, cleanupTimer.currentInputWatermarkTime());
+      } else {
         cleanupTimer.setForWindow(window);
         doFnRunner.processElement(value);
       }
     }
   }
 
-  private boolean dropLateData(BoundedWindow window) {
+  private boolean isLate(BoundedWindow window) {
     Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
     Instant inputWM = cleanupTimer.currentInputWatermarkTime();
-    if (gcTime.isBefore(inputWM)) {
-      // The element is too late for this window.
-      droppedDueToLateness.addValue(1L);
-      WindowTracing.debug(
-          "StatefulDoFnRunner.processElement/onTimer: Dropping element for window:{} "
-              + "since too far behind inputWatermark:{}", window, inputWM);
-      return true;
-    } else {
-      return false;
-    }
+    return gcTime.isBefore(inputWM);
   }
 
   @Override
@@ -112,8 +110,18 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
       stateCleaner.clearForWindow(window);
       // There should invoke the onWindowExpiration of DoFn
     } else {
-      // a timer can never be late because we don't allow setting timers after GC time
-      doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+      // An event-time timer can never be late because we don't allow setting timers after GC time.
+      // Ot can happen that a processing-time time fires for a late window, we need to ignore
+      // this.
+      if (!timeDomain.equals(TimeDomain.EVENT_TIME) && isLate(window)) {
+        // don't increment the dropped counter, only do that for elements
+        WindowTracing.debug(
+            "StatefulDoFnRunner.onTimer: Ignoring processing-time timer at {}; window:{} "
+                + "since window is too far behind inputWatermark:{}",
+            timestamp, window, cleanupTimer.currentInputWatermarkTime());
+      } else {
+        doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+      }
     }
   }
 


[4/9] beam git commit: Add README to python tarball.

Posted by al...@apache.org.
Add README to python tarball.

And, delete test created files, to avoid them being included in the tarball.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2b92b0d8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2b92b0d8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2b92b0d8

Branch: refs/heads/release-0.6.0
Commit: 2b92b0d851bcc5aedcc40ebf02ad4f39f3d67514
Parents: 8652215
Author: Ahmet Altay <al...@google.com>
Authored: Fri Mar 10 13:42:17 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:15:03 2017 -0800

----------------------------------------------------------------------
 sdks/python/MANIFEST.in | 2 ++
 sdks/python/tox.ini     | 2 ++
 2 files changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2b92b0d8/sdks/python/MANIFEST.in
----------------------------------------------------------------------
diff --git a/sdks/python/MANIFEST.in b/sdks/python/MANIFEST.in
index baa2fda..57f684e 100644
--- a/sdks/python/MANIFEST.in
+++ b/sdks/python/MANIFEST.in
@@ -17,3 +17,5 @@
 
 # This file is used from Python to sync versions
 include pom.xml
+
+include README.md

http://git-wip-us.apache.org/repos/asf/beam/blob/2b92b0d8/sdks/python/tox.ini
----------------------------------------------------------------------
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 8d8acfa..807fe3f 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -54,6 +54,8 @@ commands =
   # Clean up all cython generated files.
   find apache_beam -type f -name '*.c' -delete
   find apache_beam -type f -name '*.so' -delete
+  find target/build -type f -name '*.c' -delete
+  find target/build -type f -name '*.so' -delete
 passenv = TRAVIS*
 
 [testenv:py27gcp]


[8/9] beam git commit: [maven-release-plugin] prepare release v0.6.0-RC2

Posted by al...@apache.org.
[maven-release-plugin] prepare release v0.6.0-RC2


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7321c9af
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7321c9af
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7321c9af

Branch: refs/heads/release-0.6.0
Commit: 7321c9afc5aeb3b786584bfe4b145cc3bf639830
Parents: bce94c4
Author: Ahmet Altay <al...@google.com>
Authored: Fri Mar 10 17:41:21 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:41:21 2017 -0800

----------------------------------------------------------------------
 examples/java/pom.xml                             | 2 +-
 examples/java8/pom.xml                            | 2 +-
 examples/pom.xml                                  | 2 +-
 pom.xml                                           | 4 ++--
 runners/apex/pom.xml                              | 2 +-
 runners/core-construction-java/pom.xml            | 2 +-
 runners/core-java/pom.xml                         | 2 +-
 runners/direct-java/pom.xml                       | 2 +-
 runners/flink/examples/pom.xml                    | 2 +-
 runners/flink/pom.xml                             | 2 +-
 runners/flink/runner/pom.xml                      | 2 +-
 runners/google-cloud-dataflow-java/pom.xml        | 2 +-
 runners/pom.xml                                   | 2 +-
 runners/spark/pom.xml                             | 2 +-
 sdks/common/fn-api/pom.xml                        | 2 +-
 sdks/common/pom.xml                               | 2 +-
 sdks/common/runner-api/pom.xml                    | 2 +-
 sdks/java/build-tools/pom.xml                     | 2 +-
 sdks/java/core/pom.xml                            | 2 +-
 sdks/java/extensions/jackson/pom.xml              | 2 +-
 sdks/java/extensions/join-library/pom.xml         | 2 +-
 sdks/java/extensions/pom.xml                      | 2 +-
 sdks/java/extensions/sorter/pom.xml               | 2 +-
 sdks/java/harness/pom.xml                         | 2 +-
 sdks/java/io/elasticsearch/pom.xml                | 2 +-
 sdks/java/io/google-cloud-platform/pom.xml        | 2 +-
 sdks/java/io/hadoop-common/pom.xml                | 2 +-
 sdks/java/io/hbase/pom.xml                        | 2 +-
 sdks/java/io/hdfs/pom.xml                         | 2 +-
 sdks/java/io/jdbc/pom.xml                         | 2 +-
 sdks/java/io/jms/pom.xml                          | 2 +-
 sdks/java/io/kafka/pom.xml                        | 2 +-
 sdks/java/io/kinesis/pom.xml                      | 2 +-
 sdks/java/io/mongodb/pom.xml                      | 2 +-
 sdks/java/io/mqtt/pom.xml                         | 2 +-
 sdks/java/io/pom.xml                              | 2 +-
 sdks/java/java8tests/pom.xml                      | 2 +-
 sdks/java/javadoc/pom.xml                         | 2 +-
 sdks/java/maven-archetypes/examples-java8/pom.xml | 2 +-
 sdks/java/maven-archetypes/examples/pom.xml       | 2 +-
 sdks/java/maven-archetypes/pom.xml                | 2 +-
 sdks/java/maven-archetypes/starter/pom.xml        | 2 +-
 sdks/java/pom.xml                                 | 2 +-
 sdks/pom.xml                                      | 2 +-
 sdks/python/pom.xml                               | 2 +-
 45 files changed, 46 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 60e4fed..89d55b5 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-examples-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/examples/java8/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml
index 580a154..d00dbc9 100644
--- a/examples/java8/pom.xml
+++ b/examples/java8/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-examples-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 550578b..f5f7175 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index eded684..109faf6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
   <url>http://beam.apache.org/</url>
   <inceptionYear>2016</inceptionYear>
 
-  <version>0.6.0-SNAPSHOT</version>
+  <version>0.6.0</version>
 
   <licenses>
     <license>
@@ -48,7 +48,7 @@
     <connection>scm:git:https://git-wip-us.apache.org/repos/asf/beam.git</connection>
     <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/beam.git</developerConnection>
     <url>https://git-wip-us.apache.org/repos/asf?p=beam.git;a=summary</url>
-    <tag>release-0.6.0</tag>
+    <tag>v0.6.0-RC2</tag>
   </scm>
 
   <issueManagement>

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index f5fe4bc..1401e3d 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index 4c2394c..b45588f 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>beam-runners-parent</artifactId>
     <groupId>org.apache.beam</groupId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index c2c88a0..f59caee 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index a7d5409..f961681 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/flink/examples/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml
index 1d426bd..b4d77dd 100644
--- a/runners/flink/examples/pom.xml
+++ b/runners/flink/examples/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-flink-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 0030f61..7af4842 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 13d5b10..852a173 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-flink-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index a2bca0d..e44eaf2 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 3f74f7b..f6379d0 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 0d4c413..f58f439 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/common/fn-api/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/pom.xml b/sdks/common/fn-api/pom.xml
index 5a41d9e..6ada6af 100644
--- a/sdks/common/fn-api/pom.xml
+++ b/sdks/common/fn-api/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-common-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/pom.xml b/sdks/common/pom.xml
index 55db181..0c5e4d2 100644
--- a/sdks/common/pom.xml
+++ b/sdks/common/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/common/runner-api/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/pom.xml b/sdks/common/runner-api/pom.xml
index 9c6de1e..aadc560 100644
--- a/sdks/common/runner-api/pom.xml
+++ b/sdks/common/runner-api/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-common-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/build-tools/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/pom.xml b/sdks/java/build-tools/pom.xml
index 545f394..9a55048 100644
--- a/sdks/java/build-tools/pom.xml
+++ b/sdks/java/build-tools/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../../../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 55cbcd7..04f07bb 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/extensions/jackson/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/jackson/pom.xml b/sdks/java/extensions/jackson/pom.xml
index bdec58f..c825e48 100644
--- a/sdks/java/extensions/jackson/pom.xml
+++ b/sdks/java/extensions/jackson/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-extensions-parent</artifactId>
-        <version>0.6.0-SNAPSHOT</version>
+        <version>0.6.0</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/extensions/join-library/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/pom.xml b/sdks/java/extensions/join-library/pom.xml
index 728c6f7..c5b6324 100644
--- a/sdks/java/extensions/join-library/pom.xml
+++ b/sdks/java/extensions/join-library/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-extensions-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
index 26d92de..721495f 100644
--- a/sdks/java/extensions/pom.xml
+++ b/sdks/java/extensions/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/extensions/sorter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/pom.xml b/sdks/java/extensions/sorter/pom.xml
index 693359e..0e794c1 100644
--- a/sdks/java/extensions/sorter/pom.xml
+++ b/sdks/java/extensions/sorter/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-extensions-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index 80b01ca..4a7fa0a 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/pom.xml b/sdks/java/io/elasticsearch/pom.xml
index 5ea4452..c308fac 100644
--- a/sdks/java/io/elasticsearch/pom.xml
+++ b/sdks/java/io/elasticsearch/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 66a4207..83d551a 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/pom.xml b/sdks/java/io/hadoop-common/pom.xml
index fcd984f..996d4ac 100644
--- a/sdks/java/io/hadoop-common/pom.xml
+++ b/sdks/java/io/hadoop-common/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
index 85ebc87..efd7a64 100644
--- a/sdks/java/io/hbase/pom.xml
+++ b/sdks/java/io/hbase/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index 58dbf45..e6d14a8 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index fd5c52b..87b0b27 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/jms/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/pom.xml b/sdks/java/io/jms/pom.xml
index cc7fb27..d510583 100644
--- a/sdks/java/io/jms/pom.xml
+++ b/sdks/java/io/jms/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index f04361c..be02dc6 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/pom.xml b/sdks/java/io/kinesis/pom.xml
index 5138368..bef65a4 100644
--- a/sdks/java/io/kinesis/pom.xml
+++ b/sdks/java/io/kinesis/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml
index 7deae14..a469501 100644
--- a/sdks/java/io/mongodb/pom.xml
+++ b/sdks/java/io/mongodb/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/pom.xml b/sdks/java/io/mqtt/pom.xml
index 8b1d848..04f5416 100644
--- a/sdks/java/io/mqtt/pom.xml
+++ b/sdks/java/io/mqtt/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 8d5b69b..22432b9 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/java8tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/pom.xml b/sdks/java/java8tests/pom.xml
index fda1d63..aee81ef 100644
--- a/sdks/java/java8tests/pom.xml
+++ b/sdks/java/java8tests/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/javadoc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml
index b785c98..91c76fc 100644
--- a/sdks/java/javadoc/pom.xml
+++ b/sdks/java/javadoc/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../../../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/maven-archetypes/examples-java8/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples-java8/pom.xml b/sdks/java/maven-archetypes/examples-java8/pom.xml
index 2632d6d..4603e11 100644
--- a/sdks/java/maven-archetypes/examples-java8/pom.xml
+++ b/sdks/java/maven-archetypes/examples-java8/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/maven-archetypes/examples/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/pom.xml b/sdks/java/maven-archetypes/examples/pom.xml
index 09e5428..674ff60 100644
--- a/sdks/java/maven-archetypes/examples/pom.xml
+++ b/sdks/java/maven-archetypes/examples/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/maven-archetypes/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/pom.xml b/sdks/java/maven-archetypes/pom.xml
index 194e5bd..3d1b539 100644
--- a/sdks/java/maven-archetypes/pom.xml
+++ b/sdks/java/maven-archetypes/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/maven-archetypes/starter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml
index 092995a..29df45e 100644
--- a/sdks/java/maven-archetypes/starter/pom.xml
+++ b/sdks/java/maven-archetypes/starter/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index a09a6be..7d7153a 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/pom.xml b/sdks/pom.xml
index f130816..cf21743 100644
--- a/sdks/pom.xml
+++ b/sdks/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/python/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/python/pom.xml b/sdks/python/pom.xml
index 2fcbaad..56a232c 100644
--- a/sdks/python/pom.xml
+++ b/sdks/python/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-parent</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.6.0</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 


[6/9] beam git commit: Ignore results from the tox clean up phase

Posted by al...@apache.org.
Ignore results from the tox clean up phase

Some temporary files are generated only under certain conditions and
this should not fail tox.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ef47c9f5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ef47c9f5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ef47c9f5

Branch: refs/heads/release-0.6.0
Commit: ef47c9f511b0f5730b0dc417aefa703fd6f974c5
Parents: c7c4da2
Author: Ahmet Altay <al...@google.com>
Authored: Fri Mar 10 16:21:17 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:15:45 2017 -0800

----------------------------------------------------------------------
 sdks/python/tox.ini | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ef47c9f5/sdks/python/tox.ini
----------------------------------------------------------------------
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 807fe3f..2ed21c6 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -51,11 +51,11 @@ commands =
   pip install -e .[test]
   python apache_beam/examples/complete/autocomplete_test.py
   python setup.py test
-  # Clean up all cython generated files.
-  find apache_beam -type f -name '*.c' -delete
-  find apache_beam -type f -name '*.so' -delete
-  find target/build -type f -name '*.c' -delete
-  find target/build -type f -name '*.so' -delete
+  # Clean up all cython generated files. Ignore if deletion fails.
+  - find apache_beam -type f -name '*.c' -delete
+  - find apache_beam -type f -name '*.so' -delete
+  - find target/build -type f -name '*.c' -delete
+  - find target/build -type f -name '*.so' -delete
 passenv = TRAVIS*
 
 [testenv:py27gcp]


[2/9] beam git commit: Introduce Flink-specific state GC implementations

Posted by al...@apache.org.
Introduce Flink-specific state GC implementations

We now set the GC timer for window.maxTimestamp() + 1 to ensure that a
user timer set for window.maxTimestamp() still has all state.

This also adds tests for late data dropping and state GC specifically
for the Flink DoFnOperator.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8fa718db
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8fa718db
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8fa718db

Branch: refs/heads/release-0.6.0
Commit: 8fa718db5bc14efd1beefc2c757c331a5bdbf927
Parents: a18b5b1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 11:07:00 2017 +0100
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:13:49 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/core/DoFnRunners.java   |  15 +-
 .../beam/runners/core/StatefulDoFnRunner.java   |  87 -------
 .../runners/core/StatefulDoFnRunnerTest.java    | 110 ++++++++-
 .../wrappers/streaming/DoFnOperator.java        | 111 ++++++++-
 .../flink/streaming/DoFnOperatorTest.java       | 225 +++++++++++++++++++
 5 files changed, 439 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8fa718db/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 9455eea..a1b7c8b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -21,9 +21,6 @@ import java.util.List;
 import org.apache.beam.runners.core.ExecutionContext.StepContext;
 import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer;
 import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner;
-import org.apache.beam.runners.core.StatefulDoFnRunner.StateInternalsStateCleaner;
-import org.apache.beam.runners.core.StatefulDoFnRunner.TimeInternalsCleanupTimer;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -135,18 +132,13 @@ public class DoFnRunners {
           DoFnRunner<InputT, OutputT> doFnRunner,
           StepContext stepContext,
           AggregatorFactory aggregatorFactory,
-          WindowingStrategy<?, ?> windowingStrategy) {
+          WindowingStrategy<?, ?> windowingStrategy,
+          CleanupTimer cleanupTimer,
+          StateCleaner<W> stateCleaner) {
     Aggregator<Long, Long> droppedDueToLateness = aggregatorFactory.createAggregatorForDoFn(
         fn.getClass(), stepContext, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER,
         Sum.ofLongs());
 
-    CleanupTimer cleanupTimer =
-        new TimeInternalsCleanupTimer(stepContext.timerInternals(), windowingStrategy);
-
-    Coder<W> windowCoder = (Coder<W>) windowingStrategy.getWindowFn().windowCoder();
-    StateCleaner<W> stateCleaner =
-        new StateInternalsStateCleaner<>(fn, stepContext.stateInternals(), windowCoder);
-
     return new StatefulDoFnRunner<>(
         doFnRunner,
         windowingStrategy,
@@ -154,5 +146,4 @@ public class DoFnRunners {
         stateCleaner,
         droppedDueToLateness);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fa718db/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 926345e..c672902 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -17,12 +17,8 @@
  */
 package org.apache.beam.runners.core;
 
-import java.util.Map;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -30,8 +26,6 @@ import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateSpec;
 import org.joda.time.Instant;
 
 /**
@@ -45,7 +39,6 @@ import org.joda.time.Instant;
 public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
     implements DoFnRunner<InputT, OutputT> {
 
-  public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
   public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "StatefulParDoDropped";
 
   private final DoFnRunner<InputT, OutputT> doFnRunner;
@@ -167,84 +160,4 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
 
     void clearForWindow(W window);
   }
-
-  /**
-   * A {@link CleanupTimer} implemented by TimerInternals.
-   */
-  public static class TimeInternalsCleanupTimer implements CleanupTimer {
-
-    private final TimerInternals timerInternals;
-    private final WindowingStrategy<?, ?> windowingStrategy;
-    private final Coder<BoundedWindow> windowCoder;
-
-    public TimeInternalsCleanupTimer(
-        TimerInternals timerInternals,
-        WindowingStrategy<?, ?> windowingStrategy) {
-      this.windowingStrategy = windowingStrategy;
-      WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
-      windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder();
-      this.timerInternals = timerInternals;
-    }
-
-    @Override
-    public Instant currentInputWatermarkTime() {
-      return timerInternals.currentInputWatermarkTime();
-    }
-
-    @Override
-    public void setForWindow(BoundedWindow window) {
-      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
-      timerInternals.setTimer(StateNamespaces.window(windowCoder, window),
-          GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
-    }
-
-    @Override
-    public boolean isForWindow(
-        String timerId,
-        BoundedWindow window,
-        Instant timestamp,
-        TimeDomain timeDomain) {
-      boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
-      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
-      return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
-    }
-  }
-
-  /**
-   * A {@link StateCleaner} implemented by StateInternals.
-   */
-  public static class StateInternalsStateCleaner<W extends BoundedWindow>
-      implements StateCleaner<W> {
-
-    private final DoFn<?, ?> fn;
-    private final DoFnSignature signature;
-    private final StateInternals<?> stateInternals;
-    private final Coder<W> windowCoder;
-
-    public StateInternalsStateCleaner(
-        DoFn<?, ?> fn,
-        StateInternals<?> stateInternals,
-        Coder<W> windowCoder) {
-      this.fn = fn;
-      this.signature = DoFnSignatures.getSignature(fn.getClass());
-      this.stateInternals = stateInternals;
-      this.windowCoder = windowCoder;
-    }
-
-    @Override
-    public void clearForWindow(W window) {
-      for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
-          signature.stateDeclarations().entrySet()) {
-        try {
-          StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn);
-          State state = stateInternals.state(StateNamespaces.window(windowCoder, window),
-              StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
-          state.clear();
-        } catch (IllegalAccessException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fa718db/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index 54ac77e..fd6a73c 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
 
 import com.google.common.base.MoreObjects;
 import java.util.Collections;
+import java.util.Map;
 import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -31,14 +32,18 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.util.state.StateSpecs;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -114,7 +119,14 @@ public class StatefulDoFnRunnerTest {
     DoFn<KV<String, Integer>, Integer> fn = new MyDoFn();
 
     DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
-        fn, getDoFnRunner(fn), mockStepContext, aggregatorFactory, WINDOWING_STRATEGY);
+        fn,
+        getDoFnRunner(fn),
+        mockStepContext,
+        aggregatorFactory,
+        WINDOWING_STRATEGY,
+        new TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
+        new StateInternalsStateCleaner<>(
+            fn, stateInternals, (Coder) WINDOWING_STRATEGY.getWindowFn().windowCoder()));
 
     runner.startBundle();
 
@@ -125,13 +137,6 @@ public class StatefulDoFnRunnerTest {
         WindowedValue.of(KV.of("hello", 1), timestamp, window, PaneInfo.NO_FIRING));
     assertEquals(1L, droppedDueToLateness.sum);
 
-    runner.onTimer("processTimer", window, timestamp, TimeDomain.PROCESSING_TIME);
-    assertEquals(2L, droppedDueToLateness.sum);
-
-    runner.onTimer("synchronizedProcessTimer", window, timestamp,
-        TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    assertEquals(3L, droppedDueToLateness.sum);
-
     runner.finishBundle();
   }
 
@@ -143,7 +148,14 @@ public class StatefulDoFnRunnerTest {
     StateTag<Object, ValueState<Integer>> stateTag = StateTags.tagForSpec(fn.stateId, fn.intState);
 
     DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
-        fn, getDoFnRunner(fn), mockStepContext, aggregatorFactory, WINDOWING_STRATEGY);
+        fn,
+        getDoFnRunner(fn),
+        mockStepContext,
+        aggregatorFactory,
+        WINDOWING_STRATEGY,
+        new TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
+        new StateInternalsStateCleaner<>(
+            fn, stateInternals, (Coder) WINDOWING_STRATEGY.getWindowFn().windowCoder()));
 
     Instant elementTime = new Instant(1);
 
@@ -252,4 +264,84 @@ public class StatefulDoFnRunnerTest {
     }
   }
 
+  /**
+   * A {@link StatefulDoFnRunner.CleanupTimer} implemented by TimerInternals.
+   */
+  public static class TimeInternalsCleanupTimer implements StatefulDoFnRunner.CleanupTimer {
+
+    public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
+
+    private final TimerInternals timerInternals;
+    private final WindowingStrategy<?, ?> windowingStrategy;
+    private final Coder<BoundedWindow> windowCoder;
+
+    public TimeInternalsCleanupTimer(
+        TimerInternals timerInternals,
+        WindowingStrategy<?, ?> windowingStrategy) {
+      this.windowingStrategy = windowingStrategy;
+      WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
+      windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder();
+      this.timerInternals = timerInternals;
+    }
+
+    @Override
+    public Instant currentInputWatermarkTime() {
+      return timerInternals.currentInputWatermarkTime();
+    }
+
+    @Override
+    public void setForWindow(BoundedWindow window) {
+      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+      timerInternals.setTimer(StateNamespaces.window(windowCoder, window),
+          GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
+    }
+
+    @Override
+    public boolean isForWindow(
+        String timerId,
+        BoundedWindow window,
+        Instant timestamp,
+        TimeDomain timeDomain) {
+      boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
+      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+      return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
+    }
+  }
+
+  /**
+   * A {@link StatefulDoFnRunner.StateCleaner} implemented by StateInternals.
+   */
+  public static class StateInternalsStateCleaner<W extends BoundedWindow>
+      implements StatefulDoFnRunner.StateCleaner<W> {
+
+    private final DoFn<?, ?> fn;
+    private final DoFnSignature signature;
+    private final StateInternals<?> stateInternals;
+    private final Coder<W> windowCoder;
+
+    public StateInternalsStateCleaner(
+        DoFn<?, ?> fn,
+        StateInternals<?> stateInternals,
+        Coder<W> windowCoder) {
+      this.fn = fn;
+      this.signature = DoFnSignatures.getSignature(fn.getClass());
+      this.stateInternals = stateInternals;
+      this.windowCoder = windowCoder;
+    }
+
+    @Override
+    public void clearForWindow(W window) {
+      for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
+          signature.stateDeclarations().entrySet()) {
+        try {
+          StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn);
+          State state = stateInternals.state(StateNamespaces.window(windowCoder, window),
+              StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
+          state.clear();
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fa718db/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index c4622ba..a8ce680 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -43,6 +43,7 @@ import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
@@ -61,13 +62,18 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -286,6 +292,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       //
       // for some K, V
 
+
       doFnRunner = DoFnRunners.lateDataDroppingRunner(
           (DoFnRunner) doFnRunner,
           stepContext,
@@ -293,8 +300,27 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
           ((GroupAlsoByWindowViaWindowSetNewDoFn) doFn).getDroppedDueToLatenessAggregator());
     } else if (keyCoder != null) {
       // It is a stateful DoFn
+
+      StatefulDoFnRunner.CleanupTimer cleanupTimer =
+          new TimeInternalsCleanupTimer(stepContext.timerInternals(), windowingStrategy);
+
+      // we don't know the window type
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
+
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      StatefulDoFnRunner.StateCleaner<?> stateCleaner =
+          new StateInternalsStateCleaner<>(
+              doFn, stepContext.stateInternals(), windowCoder);
+
       doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
-          doFn, doFnRunner, stepContext, aggregatorFactory, windowingStrategy);
+          doFn,
+          doFnRunner,
+          stepContext,
+          aggregatorFactory,
+          windowingStrategy,
+          cleanupTimer,
+          stateCleaner);
     }
 
     pushbackDoFnRunner =
@@ -746,7 +772,90 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     public Instant currentOutputWatermarkTime() {
       return new Instant(currentOutputWatermark);
     }
+  }
+
+
+  /**
+   * A {@link StatefulDoFnRunner.CleanupTimer} implemented by TimerInternals.
+   */
+  public static class TimeInternalsCleanupTimer implements StatefulDoFnRunner.CleanupTimer {
+
+    public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
+
+    private final TimerInternals timerInternals;
+    private final WindowingStrategy<?, ?> windowingStrategy;
+    private final Coder<BoundedWindow> windowCoder;
+
+    public TimeInternalsCleanupTimer(
+        TimerInternals timerInternals,
+        WindowingStrategy<?, ?> windowingStrategy) {
+      this.windowingStrategy = windowingStrategy;
+      WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
+      windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder();
+      this.timerInternals = timerInternals;
+    }
 
+    @Override
+    public Instant currentInputWatermarkTime() {
+      return timerInternals.currentInputWatermarkTime();
+    }
+
+    @Override
+    public void setForWindow(BoundedWindow window) {
+      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+      // make sure this fires after any window.maxTimestamp() timers
+      gcTime = gcTime.plus(1L);
+      timerInternals.setTimer(StateNamespaces.window(windowCoder, window),
+          GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
+    }
+
+    @Override
+    public boolean isForWindow(
+        String timerId,
+        BoundedWindow window,
+        Instant timestamp,
+        TimeDomain timeDomain) {
+      boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
+      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+      gcTime = gcTime.plus(1L);
+      return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
+    }
   }
 
+  /**
+   * A {@link StatefulDoFnRunner.StateCleaner} implemented by StateInternals.
+   */
+  public static class StateInternalsStateCleaner<W extends BoundedWindow>
+      implements StatefulDoFnRunner.StateCleaner<W> {
+
+    private final DoFn<?, ?> fn;
+    private final DoFnSignature signature;
+    private final StateInternals<?> stateInternals;
+    private final Coder<W> windowCoder;
+
+    public StateInternalsStateCleaner(
+        DoFn<?, ?> fn,
+        StateInternals<?> stateInternals,
+        Coder<W> windowCoder) {
+      this.fn = fn;
+      this.signature = DoFnSignatures.getSignature(fn.getClass());
+      this.stateInternals = stateInternals;
+      this.windowCoder = windowCoder;
+    }
+
+    @Override
+    public void clearForWindow(W window) {
+      for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
+          signature.stateDeclarations().entrySet()) {
+        try {
+          StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn);
+          State state = stateInternals.state(StateNamespaces.window(windowCoder, window),
+              StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
+          state.clear();
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fa718db/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 7d14a87..bbd3428 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -17,7 +17,9 @@
  */
 package org.apache.beam.runners.flink.streaming;
 
+import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.base.Function;
@@ -29,9 +31,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PCollectionViewTesting;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -40,14 +45,23 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
@@ -169,6 +183,217 @@ public class DoFnOperatorTest {
     testHarness.close();
   }
 
+  @Test
+  public void testLateDroppingForStatefulFn() throws Exception {
+
+    WindowingStrategy<Object, IntervalWindow> windowingStrategy =
+        WindowingStrategy.of(FixedWindows.of(new Duration(10)));
+
+    DoFn<Integer, String> fn = new DoFn<Integer, String>() {
+
+      @StateId("state")
+      private final StateSpec<Object, ValueState<String>> stateSpec =
+          StateSpecs.value(StringUtf8Coder.of());
+
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        context.output(context.element().toString());
+      }
+    };
+
+    WindowedValue.FullWindowedValueCoder<Integer> windowedValueCoder =
+        WindowedValue.getFullCoder(
+            VarIntCoder.of(),
+            windowingStrategy.getWindowFn().windowCoder());
+
+    TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+    DoFnOperator<Integer, String, WindowedValue<String>> doFnOperator = new DoFnOperator<>(
+        fn,
+        windowedValueCoder,
+        outputTag,
+        Collections.<TupleTag<?>>emptyList(),
+        new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<String>>(),
+        windowingStrategy,
+        new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+        Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+        PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+        VarIntCoder.of() /* key coder */);
+
+    OneInputStreamOperatorTestHarness<WindowedValue<Integer>, WindowedValue<String>> testHarness =
+        new KeyedOneInputStreamOperatorTestHarness<>(
+            doFnOperator,
+            new KeySelector<WindowedValue<Integer>, Integer>() {
+              @Override
+              public Integer getKey(WindowedValue<Integer> integerWindowedValue) throws Exception {
+                return integerWindowedValue.getValue();
+              }
+            },
+            new CoderTypeInformation<>(VarIntCoder.of()));
+
+    testHarness.open();
+
+    testHarness.processWatermark(0);
+
+    IntervalWindow window1 = new IntervalWindow(new Instant(0), Duration.millis(10));
+
+    // this should not be late
+    testHarness.processElement(
+        new StreamRecord<>(WindowedValue.of(13, new Instant(0), window1, PaneInfo.NO_FIRING)));
+
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        contains(WindowedValue.of("13", new Instant(0), window1, PaneInfo.NO_FIRING)));
+
+    testHarness.getOutput().clear();
+
+    testHarness.processWatermark(9);
+
+    // this should still not be considered late
+    testHarness.processElement(
+        new StreamRecord<>(WindowedValue.of(17, new Instant(0), window1, PaneInfo.NO_FIRING)));
+
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        contains(WindowedValue.of("17", new Instant(0), window1, PaneInfo.NO_FIRING)));
+
+    testHarness.getOutput().clear();
+
+    testHarness.processWatermark(10);
+
+    // this should now be considered late
+    testHarness.processElement(
+        new StreamRecord<>(WindowedValue.of(17, new Instant(0), window1, PaneInfo.NO_FIRING)));
+
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        emptyIterable());
+
+    testHarness.close();
+  }
+
+  @Test
+  public void testStateGCForStatefulFn() throws Exception {
+
+    WindowingStrategy<Object, IntervalWindow> windowingStrategy =
+        WindowingStrategy.of(FixedWindows.of(new Duration(10)));
+
+    final String timerId = "boo";
+    final String stateId = "dazzle";
+
+    final int offset = 5000;
+    final int timerOutput = 4093;
+
+    DoFn<KV<String, Integer>, KV<String, Integer>> fn =
+        new DoFn<KV<String, Integer>, KV<String, Integer>>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @StateId(stateId)
+          private final StateSpec<Object, ValueState<String>> stateSpec =
+              StateSpecs.value(StringUtf8Coder.of());
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext context,
+              @TimerId(timerId) Timer timer,
+              @StateId(stateId) ValueState<String> state,
+              BoundedWindow window) {
+            timer.set(window.maxTimestamp());
+            state.write(context.element().getKey());
+            context.output(
+                KV.of(context.element().getKey(), context.element().getValue() + offset));
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context, @StateId(stateId) ValueState<String> state) {
+            context.output(KV.of(state.read(), timerOutput));
+          }
+        };
+
+    WindowedValue.FullWindowedValueCoder<KV<String, Integer>> windowedValueCoder =
+        WindowedValue.getFullCoder(
+            KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
+            windowingStrategy.getWindowFn().windowCoder());
+
+    TupleTag<KV<String, Integer>> outputTag = new TupleTag<>("main-output");
+
+    DoFnOperator<
+        KV<String, Integer>, KV<String, Integer>, WindowedValue<KV<String, Integer>>> doFnOperator =
+        new DoFnOperator<>(
+            fn,
+            windowedValueCoder,
+            outputTag,
+            Collections.<TupleTag<?>>emptyList(),
+            new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<String, Integer>>>(),
+            windowingStrategy,
+            new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+            Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+            PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+            StringUtf8Coder.of() /* key coder */);
+
+    KeyedOneInputStreamOperatorTestHarness<
+        String,
+        WindowedValue<KV<String, Integer>>,
+        WindowedValue<KV<String, Integer>>> testHarness =
+        new KeyedOneInputStreamOperatorTestHarness<>(
+            doFnOperator,
+            new KeySelector<WindowedValue<KV<String, Integer>>, String>() {
+              @Override
+              public String getKey(
+                  WindowedValue<KV<String, Integer>> kvWindowedValue) throws Exception {
+                return kvWindowedValue.getValue().getKey();
+              }
+            },
+            new CoderTypeInformation<>(StringUtf8Coder.of()));
+
+    testHarness.open();
+
+    testHarness.processWatermark(0);
+
+    assertEquals(0, testHarness.numKeyedStateEntries());
+
+    IntervalWindow window1 = new IntervalWindow(new Instant(0), Duration.millis(10));
+
+    testHarness.processElement(
+        new StreamRecord<>(
+            WindowedValue.of(KV.of("key1", 5), new Instant(1), window1, PaneInfo.NO_FIRING)));
+
+    testHarness.processElement(
+        new StreamRecord<>(
+            WindowedValue.of(KV.of("key2", 7), new Instant(3), window1, PaneInfo.NO_FIRING)));
+
+    assertThat(
+        this.<KV<String, Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        contains(
+            WindowedValue.of(
+                KV.of("key1", 5 + offset), new Instant(1), window1, PaneInfo.NO_FIRING),
+            WindowedValue.of(
+                KV.of("key2", 7 + offset), new Instant(3), window1, PaneInfo.NO_FIRING)));
+
+    assertEquals(2, testHarness.numKeyedStateEntries());
+
+    testHarness.getOutput().clear();
+
+    // this should trigger both the window.maxTimestamp() timer and the GC timer
+    // this tests that the GC timer fires after the user timer
+    testHarness.processWatermark(15);
+
+    assertThat(
+        this.<KV<String, Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        contains(
+            WindowedValue.of(
+                KV.of("key1", timerOutput), new Instant(9), window1, PaneInfo.NO_FIRING),
+            WindowedValue.of(
+                KV.of("key2", timerOutput), new Instant(9), window1, PaneInfo.NO_FIRING)));
+
+    // ensure the state was garbage collected
+    assertEquals(0, testHarness.numKeyedStateEntries());
+
+    testHarness.close();
+  }
+
   public void testSideInputs(boolean keyed) throws Exception {
 
     WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =