You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/03/11 01:47:19 UTC
[1/9] beam git commit: Move GC timer checking to
StatefulDoFnRunner.CleanupTimer
Repository: beam
Updated Branches:
refs/heads/release-0.6.0 dc64c2fc0 -> ebc2ba5bf
Move GC timer checking to StatefulDoFnRunner.CleanupTimer
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a18b5b16
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a18b5b16
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a18b5b16
Branch: refs/heads/release-0.6.0
Commit: a18b5b1648489f14fd7a621f345e4d21c09b437f
Parents: dc64c2f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 08:29:27 2017 +0100
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:13:40 2017 -0800
----------------------------------------------------------------------
.../beam/runners/core/StatefulDoFnRunner.java | 29 ++++++++++++++++----
1 file changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a18b5b16/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 154d8bc..926345e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -115,15 +115,12 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
@Override
public void onTimer(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
- boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
- Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
- if (isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp)) {
+ if (cleanupTimer.isForWindow(timerId, window, timestamp, timeDomain)) {
stateCleaner.clearForWindow(window);
// There should invoke the onWindowExpiration of DoFn
} else {
- if (isEventTimer || !dropLateData(window)) {
- doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
- }
+ // a timer can never be late because we don't allow setting timers after GC time
+ doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
}
}
@@ -151,6 +148,16 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
* Set the garbage collect time of the window to timer.
*/
void setForWindow(BoundedWindow window);
+
+ /**
+ * Checks whether the given timer is a cleanup timer for the window.
+ */
+ boolean isForWindow(
+ String timerId,
+ BoundedWindow window,
+ Instant timestamp,
+ TimeDomain timeDomain);
+
}
/**
@@ -191,6 +198,16 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
}
+ @Override
+ public boolean isForWindow(
+ String timerId,
+ BoundedWindow window,
+ Instant timestamp,
+ TimeDomain timeDomain) {
+ boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
+ Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+ return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
+ }
}
/**
[7/9] beam git commit: Generate zip distribution for pyhthon
Posted by al...@apache.org.
Generate zip distribution for pyhthon
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bce94c4e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bce94c4e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bce94c4e
Branch: refs/heads/release-0.6.0
Commit: bce94c4e06bf2ee2428e7b6fa71d0d9144b7ee61
Parents: ef47c9f
Author: Ahmet Altay <al...@google.com>
Authored: Fri Mar 10 16:40:34 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:16:09 2017 -0800
----------------------------------------------------------------------
sdks/python/pom.xml | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/bce94c4e/sdks/python/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/python/pom.xml b/sdks/python/pom.xml
index 98b7fa3..2fcbaad 100644
--- a/sdks/python/pom.xml
+++ b/sdks/python/pom.xml
@@ -136,6 +136,8 @@
<argument>sdist</argument>
<argument>--dist-dir</argument>
<argument>${project.build.directory}</argument>
+ <argument>--formats</argument>
+ <argument>zip,gztar</argument>
</arguments>
<environmentVariables>
<PYTHONUSERBASE>${python.user.base}</PYTHONUSERBASE>
[5/9] beam git commit: Remove duplicated dependency from Dataflow
runner pom.xml
Posted by al...@apache.org.
Remove duplicated dependency from Dataflow runner pom.xml
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c7c4da28
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c7c4da28
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c7c4da28
Branch: refs/heads/release-0.6.0
Commit: c7c4da28b7925de38e7c10fcc4e9ef52a5ea76fc
Parents: 2b92b0d
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Mar 10 13:14:58 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:15:32 2017 -0800
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 4 ----
1 file changed, 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c7c4da28/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index d305e15..a2bca0d 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -357,9 +357,5 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-core-construction-java</artifactId>
- </dependency>
</dependencies>
</project>
[9/9] beam git commit: Revert "[maven-release-plugin] prepare release
v0.6.0-RC2"
Posted by al...@apache.org.
Revert "[maven-release-plugin] prepare release v0.6.0-RC2"
This reverts commit 7321c9afc5aeb3b786584bfe4b145cc3bf639830.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ebc2ba5b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ebc2ba5b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ebc2ba5b
Branch: refs/heads/release-0.6.0
Commit: ebc2ba5bf4cc368b25a9cd6131175bac3afffe13
Parents: 7321c9a
Author: Ahmet Altay <al...@google.com>
Authored: Fri Mar 10 17:46:44 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:46:44 2017 -0800
----------------------------------------------------------------------
examples/java/pom.xml | 2 +-
examples/java8/pom.xml | 2 +-
examples/pom.xml | 2 +-
pom.xml | 4 ++--
runners/apex/pom.xml | 2 +-
runners/core-construction-java/pom.xml | 2 +-
runners/core-java/pom.xml | 2 +-
runners/direct-java/pom.xml | 2 +-
runners/flink/examples/pom.xml | 2 +-
runners/flink/pom.xml | 2 +-
runners/flink/runner/pom.xml | 2 +-
runners/google-cloud-dataflow-java/pom.xml | 2 +-
runners/pom.xml | 2 +-
runners/spark/pom.xml | 2 +-
sdks/common/fn-api/pom.xml | 2 +-
sdks/common/pom.xml | 2 +-
sdks/common/runner-api/pom.xml | 2 +-
sdks/java/build-tools/pom.xml | 2 +-
sdks/java/core/pom.xml | 2 +-
sdks/java/extensions/jackson/pom.xml | 2 +-
sdks/java/extensions/join-library/pom.xml | 2 +-
sdks/java/extensions/pom.xml | 2 +-
sdks/java/extensions/sorter/pom.xml | 2 +-
sdks/java/harness/pom.xml | 2 +-
sdks/java/io/elasticsearch/pom.xml | 2 +-
sdks/java/io/google-cloud-platform/pom.xml | 2 +-
sdks/java/io/hadoop-common/pom.xml | 2 +-
sdks/java/io/hbase/pom.xml | 2 +-
sdks/java/io/hdfs/pom.xml | 2 +-
sdks/java/io/jdbc/pom.xml | 2 +-
sdks/java/io/jms/pom.xml | 2 +-
sdks/java/io/kafka/pom.xml | 2 +-
sdks/java/io/kinesis/pom.xml | 2 +-
sdks/java/io/mongodb/pom.xml | 2 +-
sdks/java/io/mqtt/pom.xml | 2 +-
sdks/java/io/pom.xml | 2 +-
sdks/java/java8tests/pom.xml | 2 +-
sdks/java/javadoc/pom.xml | 2 +-
sdks/java/maven-archetypes/examples-java8/pom.xml | 2 +-
sdks/java/maven-archetypes/examples/pom.xml | 2 +-
sdks/java/maven-archetypes/pom.xml | 2 +-
sdks/java/maven-archetypes/starter/pom.xml | 2 +-
sdks/java/pom.xml | 2 +-
sdks/pom.xml | 2 +-
sdks/python/pom.xml | 2 +-
45 files changed, 46 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 89d55b5..60e4fed 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-examples-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/examples/java8/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml
index d00dbc9..580a154 100644
--- a/examples/java8/pom.xml
+++ b/examples/java8/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-examples-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index f5f7175..550578b 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 109faf6..eded684 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
<url>http://beam.apache.org/</url>
<inceptionYear>2016</inceptionYear>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<licenses>
<license>
@@ -48,7 +48,7 @@
<connection>scm:git:https://git-wip-us.apache.org/repos/asf/beam.git</connection>
<developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/beam.git</developerConnection>
<url>https://git-wip-us.apache.org/repos/asf?p=beam.git;a=summary</url>
- <tag>v0.6.0-RC2</tag>
+ <tag>release-0.6.0</tag>
</scm>
<issueManagement>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 1401e3d..f5fe4bc 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index b45588f..4c2394c 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>beam-runners-parent</artifactId>
<groupId>org.apache.beam</groupId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index f59caee..c2c88a0 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index f961681..a7d5409 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/flink/examples/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml
index b4d77dd..1d426bd 100644
--- a/runners/flink/examples/pom.xml
+++ b/runners/flink/examples/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 7af4842..0030f61 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 852a173..13d5b10 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index e44eaf2..a2bca0d 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index f6379d0..3f74f7b 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index f58f439..0d4c413 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/common/fn-api/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/pom.xml b/sdks/common/fn-api/pom.xml
index 6ada6af..5a41d9e 100644
--- a/sdks/common/fn-api/pom.xml
+++ b/sdks/common/fn-api/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-common-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/pom.xml b/sdks/common/pom.xml
index 0c5e4d2..55db181 100644
--- a/sdks/common/pom.xml
+++ b/sdks/common/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/common/runner-api/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/pom.xml b/sdks/common/runner-api/pom.xml
index aadc560..9c6de1e 100644
--- a/sdks/common/runner-api/pom.xml
+++ b/sdks/common/runner-api/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-common-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/build-tools/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/pom.xml b/sdks/java/build-tools/pom.xml
index 9a55048..545f394 100644
--- a/sdks/java/build-tools/pom.xml
+++ b/sdks/java/build-tools/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 04f07bb..55cbcd7 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/extensions/jackson/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/jackson/pom.xml b/sdks/java/extensions/jackson/pom.xml
index c825e48..bdec58f 100644
--- a/sdks/java/extensions/jackson/pom.xml
+++ b/sdks/java/extensions/jackson/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/extensions/join-library/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/pom.xml b/sdks/java/extensions/join-library/pom.xml
index c5b6324..728c6f7 100644
--- a/sdks/java/extensions/join-library/pom.xml
+++ b/sdks/java/extensions/join-library/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
index 721495f..26d92de 100644
--- a/sdks/java/extensions/pom.xml
+++ b/sdks/java/extensions/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/extensions/sorter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/pom.xml b/sdks/java/extensions/sorter/pom.xml
index 0e794c1..693359e 100644
--- a/sdks/java/extensions/sorter/pom.xml
+++ b/sdks/java/extensions/sorter/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index 4a7fa0a..80b01ca 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/pom.xml b/sdks/java/io/elasticsearch/pom.xml
index c308fac..5ea4452 100644
--- a/sdks/java/io/elasticsearch/pom.xml
+++ b/sdks/java/io/elasticsearch/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 83d551a..66a4207 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/pom.xml b/sdks/java/io/hadoop-common/pom.xml
index 996d4ac..fcd984f 100644
--- a/sdks/java/io/hadoop-common/pom.xml
+++ b/sdks/java/io/hadoop-common/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
index efd7a64..85ebc87 100644
--- a/sdks/java/io/hbase/pom.xml
+++ b/sdks/java/io/hbase/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index e6d14a8..58dbf45 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index 87b0b27..fd5c52b 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/jms/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/pom.xml b/sdks/java/io/jms/pom.xml
index d510583..cc7fb27 100644
--- a/sdks/java/io/jms/pom.xml
+++ b/sdks/java/io/jms/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index be02dc6..f04361c 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/pom.xml b/sdks/java/io/kinesis/pom.xml
index bef65a4..5138368 100644
--- a/sdks/java/io/kinesis/pom.xml
+++ b/sdks/java/io/kinesis/pom.xml
@@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml
index a469501..7deae14 100644
--- a/sdks/java/io/mongodb/pom.xml
+++ b/sdks/java/io/mongodb/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/pom.xml b/sdks/java/io/mqtt/pom.xml
index 04f5416..8b1d848 100644
--- a/sdks/java/io/mqtt/pom.xml
+++ b/sdks/java/io/mqtt/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 22432b9..8d5b69b 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/java8tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/pom.xml b/sdks/java/java8tests/pom.xml
index aee81ef..fda1d63 100644
--- a/sdks/java/java8tests/pom.xml
+++ b/sdks/java/java8tests/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/javadoc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml
index 91c76fc..b785c98 100644
--- a/sdks/java/javadoc/pom.xml
+++ b/sdks/java/javadoc/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/maven-archetypes/examples-java8/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples-java8/pom.xml b/sdks/java/maven-archetypes/examples-java8/pom.xml
index 4603e11..2632d6d 100644
--- a/sdks/java/maven-archetypes/examples-java8/pom.xml
+++ b/sdks/java/maven-archetypes/examples-java8/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/maven-archetypes/examples/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/pom.xml b/sdks/java/maven-archetypes/examples/pom.xml
index 674ff60..09e5428 100644
--- a/sdks/java/maven-archetypes/examples/pom.xml
+++ b/sdks/java/maven-archetypes/examples/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/maven-archetypes/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/pom.xml b/sdks/java/maven-archetypes/pom.xml
index 3d1b539..194e5bd 100644
--- a/sdks/java/maven-archetypes/pom.xml
+++ b/sdks/java/maven-archetypes/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/maven-archetypes/starter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml
index 29df45e..092995a 100644
--- a/sdks/java/maven-archetypes/starter/pom.xml
+++ b/sdks/java/maven-archetypes/starter/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 7d7153a..a09a6be 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/pom.xml b/sdks/pom.xml
index cf21743..f130816 100644
--- a/sdks/pom.xml
+++ b/sdks/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/ebc2ba5b/sdks/python/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/python/pom.xml b/sdks/python/pom.xml
index 56a232c..2fcbaad 100644
--- a/sdks/python/pom.xml
+++ b/sdks/python/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-parent</artifactId>
- <version>0.6.0</version>
+ <version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
[3/9] beam git commit: Properly deal with late processing-time timers
Posted by al...@apache.org.
Properly deal with late processing-time timers
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/86522157
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/86522157
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/86522157
Branch: refs/heads/release-0.6.0
Commit: 86522157a79fd9a753436312ff8b746cb5740135
Parents: 8fa718d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 15:25:26 2017 +0100
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:13:57 2017 -0800
----------------------------------------------------------------------
.../beam/runners/core/StatefulDoFnRunner.java | 40 ++++++++++++--------
1 file changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/86522157/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index c672902..d27193c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -76,33 +76,31 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
}
@Override
- public void processElement(WindowedValue<InputT> compressedElem) {
+ public void processElement(WindowedValue<InputT> input) {
// StatefulDoFnRunner always observes windows, so we need to explode
- for (WindowedValue<InputT> value : compressedElem.explodeWindows()) {
+ for (WindowedValue<InputT> value : input.explodeWindows()) {
BoundedWindow window = value.getWindows().iterator().next();
- if (!dropLateData(window)) {
+ if (isLate(window)) {
+ // The element is too late for this window.
+ droppedDueToLateness.addValue(1L);
+ WindowTracing.debug(
+ "StatefulDoFnRunner.processElement: Dropping element at {}; window:{} "
+ + "since too far behind inputWatermark:{}",
+ input.getTimestamp(), window, cleanupTimer.currentInputWatermarkTime());
+ } else {
cleanupTimer.setForWindow(window);
doFnRunner.processElement(value);
}
}
}
- private boolean dropLateData(BoundedWindow window) {
+ private boolean isLate(BoundedWindow window) {
Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
Instant inputWM = cleanupTimer.currentInputWatermarkTime();
- if (gcTime.isBefore(inputWM)) {
- // The element is too late for this window.
- droppedDueToLateness.addValue(1L);
- WindowTracing.debug(
- "StatefulDoFnRunner.processElement/onTimer: Dropping element for window:{} "
- + "since too far behind inputWatermark:{}", window, inputWM);
- return true;
- } else {
- return false;
- }
+ return gcTime.isBefore(inputWM);
}
@Override
@@ -112,8 +110,18 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
stateCleaner.clearForWindow(window);
// There should invoke the onWindowExpiration of DoFn
} else {
- // a timer can never be late because we don't allow setting timers after GC time
- doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+ // An event-time timer can never be late because we don't allow setting timers after GC time.
+ // Ot can happen that a processing-time time fires for a late window, we need to ignore
+ // this.
+ if (!timeDomain.equals(TimeDomain.EVENT_TIME) && isLate(window)) {
+ // don't increment the dropped counter, only do that for elements
+ WindowTracing.debug(
+ "StatefulDoFnRunner.onTimer: Ignoring processing-time timer at {}; window:{} "
+ + "since window is too far behind inputWatermark:{}",
+ timestamp, window, cleanupTimer.currentInputWatermarkTime());
+ } else {
+ doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+ }
}
}
[4/9] beam git commit: Add README to python tarball.
Posted by al...@apache.org.
Add README to python tarball.
And, delete test created files, to avoid them being included in the tarball.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2b92b0d8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2b92b0d8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2b92b0d8
Branch: refs/heads/release-0.6.0
Commit: 2b92b0d851bcc5aedcc40ebf02ad4f39f3d67514
Parents: 8652215
Author: Ahmet Altay <al...@google.com>
Authored: Fri Mar 10 13:42:17 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:15:03 2017 -0800
----------------------------------------------------------------------
sdks/python/MANIFEST.in | 2 ++
sdks/python/tox.ini | 2 ++
2 files changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2b92b0d8/sdks/python/MANIFEST.in
----------------------------------------------------------------------
diff --git a/sdks/python/MANIFEST.in b/sdks/python/MANIFEST.in
index baa2fda..57f684e 100644
--- a/sdks/python/MANIFEST.in
+++ b/sdks/python/MANIFEST.in
@@ -17,3 +17,5 @@
# This file is used from Python to sync versions
include pom.xml
+
+include README.md
http://git-wip-us.apache.org/repos/asf/beam/blob/2b92b0d8/sdks/python/tox.ini
----------------------------------------------------------------------
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 8d8acfa..807fe3f 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -54,6 +54,8 @@ commands =
# Clean up all cython generated files.
find apache_beam -type f -name '*.c' -delete
find apache_beam -type f -name '*.so' -delete
+ find target/build -type f -name '*.c' -delete
+ find target/build -type f -name '*.so' -delete
passenv = TRAVIS*
[testenv:py27gcp]
[8/9] beam git commit: [maven-release-plugin] prepare release
v0.6.0-RC2
Posted by al...@apache.org.
[maven-release-plugin] prepare release v0.6.0-RC2
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7321c9af
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7321c9af
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7321c9af
Branch: refs/heads/release-0.6.0
Commit: 7321c9afc5aeb3b786584bfe4b145cc3bf639830
Parents: bce94c4
Author: Ahmet Altay <al...@google.com>
Authored: Fri Mar 10 17:41:21 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:41:21 2017 -0800
----------------------------------------------------------------------
examples/java/pom.xml | 2 +-
examples/java8/pom.xml | 2 +-
examples/pom.xml | 2 +-
pom.xml | 4 ++--
runners/apex/pom.xml | 2 +-
runners/core-construction-java/pom.xml | 2 +-
runners/core-java/pom.xml | 2 +-
runners/direct-java/pom.xml | 2 +-
runners/flink/examples/pom.xml | 2 +-
runners/flink/pom.xml | 2 +-
runners/flink/runner/pom.xml | 2 +-
runners/google-cloud-dataflow-java/pom.xml | 2 +-
runners/pom.xml | 2 +-
runners/spark/pom.xml | 2 +-
sdks/common/fn-api/pom.xml | 2 +-
sdks/common/pom.xml | 2 +-
sdks/common/runner-api/pom.xml | 2 +-
sdks/java/build-tools/pom.xml | 2 +-
sdks/java/core/pom.xml | 2 +-
sdks/java/extensions/jackson/pom.xml | 2 +-
sdks/java/extensions/join-library/pom.xml | 2 +-
sdks/java/extensions/pom.xml | 2 +-
sdks/java/extensions/sorter/pom.xml | 2 +-
sdks/java/harness/pom.xml | 2 +-
sdks/java/io/elasticsearch/pom.xml | 2 +-
sdks/java/io/google-cloud-platform/pom.xml | 2 +-
sdks/java/io/hadoop-common/pom.xml | 2 +-
sdks/java/io/hbase/pom.xml | 2 +-
sdks/java/io/hdfs/pom.xml | 2 +-
sdks/java/io/jdbc/pom.xml | 2 +-
sdks/java/io/jms/pom.xml | 2 +-
sdks/java/io/kafka/pom.xml | 2 +-
sdks/java/io/kinesis/pom.xml | 2 +-
sdks/java/io/mongodb/pom.xml | 2 +-
sdks/java/io/mqtt/pom.xml | 2 +-
sdks/java/io/pom.xml | 2 +-
sdks/java/java8tests/pom.xml | 2 +-
sdks/java/javadoc/pom.xml | 2 +-
sdks/java/maven-archetypes/examples-java8/pom.xml | 2 +-
sdks/java/maven-archetypes/examples/pom.xml | 2 +-
sdks/java/maven-archetypes/pom.xml | 2 +-
sdks/java/maven-archetypes/starter/pom.xml | 2 +-
sdks/java/pom.xml | 2 +-
sdks/pom.xml | 2 +-
sdks/python/pom.xml | 2 +-
45 files changed, 46 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 60e4fed..89d55b5 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-examples-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/examples/java8/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml
index 580a154..d00dbc9 100644
--- a/examples/java8/pom.xml
+++ b/examples/java8/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-examples-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 550578b..f5f7175 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index eded684..109faf6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
<url>http://beam.apache.org/</url>
<inceptionYear>2016</inceptionYear>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<licenses>
<license>
@@ -48,7 +48,7 @@
<connection>scm:git:https://git-wip-us.apache.org/repos/asf/beam.git</connection>
<developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/beam.git</developerConnection>
<url>https://git-wip-us.apache.org/repos/asf?p=beam.git;a=summary</url>
- <tag>release-0.6.0</tag>
+ <tag>v0.6.0-RC2</tag>
</scm>
<issueManagement>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index f5fe4bc..1401e3d 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index 4c2394c..b45588f 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>beam-runners-parent</artifactId>
<groupId>org.apache.beam</groupId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index c2c88a0..f59caee 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index a7d5409..f961681 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/flink/examples/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml
index 1d426bd..b4d77dd 100644
--- a/runners/flink/examples/pom.xml
+++ b/runners/flink/examples/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 0030f61..7af4842 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 13d5b10..852a173 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index a2bca0d..e44eaf2 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 3f74f7b..f6379d0 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 0d4c413..f58f439 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/common/fn-api/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/pom.xml b/sdks/common/fn-api/pom.xml
index 5a41d9e..6ada6af 100644
--- a/sdks/common/fn-api/pom.xml
+++ b/sdks/common/fn-api/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-common-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/pom.xml b/sdks/common/pom.xml
index 55db181..0c5e4d2 100644
--- a/sdks/common/pom.xml
+++ b/sdks/common/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/common/runner-api/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/pom.xml b/sdks/common/runner-api/pom.xml
index 9c6de1e..aadc560 100644
--- a/sdks/common/runner-api/pom.xml
+++ b/sdks/common/runner-api/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-common-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/build-tools/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/pom.xml b/sdks/java/build-tools/pom.xml
index 545f394..9a55048 100644
--- a/sdks/java/build-tools/pom.xml
+++ b/sdks/java/build-tools/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 55cbcd7..04f07bb 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/extensions/jackson/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/jackson/pom.xml b/sdks/java/extensions/jackson/pom.xml
index bdec58f..c825e48 100644
--- a/sdks/java/extensions/jackson/pom.xml
+++ b/sdks/java/extensions/jackson/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/extensions/join-library/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/pom.xml b/sdks/java/extensions/join-library/pom.xml
index 728c6f7..c5b6324 100644
--- a/sdks/java/extensions/join-library/pom.xml
+++ b/sdks/java/extensions/join-library/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
index 26d92de..721495f 100644
--- a/sdks/java/extensions/pom.xml
+++ b/sdks/java/extensions/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/extensions/sorter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/pom.xml b/sdks/java/extensions/sorter/pom.xml
index 693359e..0e794c1 100644
--- a/sdks/java/extensions/sorter/pom.xml
+++ b/sdks/java/extensions/sorter/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index 80b01ca..4a7fa0a 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/pom.xml b/sdks/java/io/elasticsearch/pom.xml
index 5ea4452..c308fac 100644
--- a/sdks/java/io/elasticsearch/pom.xml
+++ b/sdks/java/io/elasticsearch/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 66a4207..83d551a 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/pom.xml b/sdks/java/io/hadoop-common/pom.xml
index fcd984f..996d4ac 100644
--- a/sdks/java/io/hadoop-common/pom.xml
+++ b/sdks/java/io/hadoop-common/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
index 85ebc87..efd7a64 100644
--- a/sdks/java/io/hbase/pom.xml
+++ b/sdks/java/io/hbase/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index 58dbf45..e6d14a8 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index fd5c52b..87b0b27 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/jms/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/pom.xml b/sdks/java/io/jms/pom.xml
index cc7fb27..d510583 100644
--- a/sdks/java/io/jms/pom.xml
+++ b/sdks/java/io/jms/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index f04361c..be02dc6 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/pom.xml b/sdks/java/io/kinesis/pom.xml
index 5138368..bef65a4 100644
--- a/sdks/java/io/kinesis/pom.xml
+++ b/sdks/java/io/kinesis/pom.xml
@@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml
index 7deae14..a469501 100644
--- a/sdks/java/io/mongodb/pom.xml
+++ b/sdks/java/io/mongodb/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/pom.xml b/sdks/java/io/mqtt/pom.xml
index 8b1d848..04f5416 100644
--- a/sdks/java/io/mqtt/pom.xml
+++ b/sdks/java/io/mqtt/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 8d5b69b..22432b9 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/java8tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/pom.xml b/sdks/java/java8tests/pom.xml
index fda1d63..aee81ef 100644
--- a/sdks/java/java8tests/pom.xml
+++ b/sdks/java/java8tests/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/javadoc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml
index b785c98..91c76fc 100644
--- a/sdks/java/javadoc/pom.xml
+++ b/sdks/java/javadoc/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/maven-archetypes/examples-java8/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples-java8/pom.xml b/sdks/java/maven-archetypes/examples-java8/pom.xml
index 2632d6d..4603e11 100644
--- a/sdks/java/maven-archetypes/examples-java8/pom.xml
+++ b/sdks/java/maven-archetypes/examples-java8/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/maven-archetypes/examples/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/pom.xml b/sdks/java/maven-archetypes/examples/pom.xml
index 09e5428..674ff60 100644
--- a/sdks/java/maven-archetypes/examples/pom.xml
+++ b/sdks/java/maven-archetypes/examples/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/maven-archetypes/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/pom.xml b/sdks/java/maven-archetypes/pom.xml
index 194e5bd..3d1b539 100644
--- a/sdks/java/maven-archetypes/pom.xml
+++ b/sdks/java/maven-archetypes/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/maven-archetypes/starter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml
index 092995a..29df45e 100644
--- a/sdks/java/maven-archetypes/starter/pom.xml
+++ b/sdks/java/maven-archetypes/starter/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index a09a6be..7d7153a 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/pom.xml b/sdks/pom.xml
index f130816..cf21743 100644
--- a/sdks/pom.xml
+++ b/sdks/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/7321c9af/sdks/python/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/python/pom.xml b/sdks/python/pom.xml
index 2fcbaad..56a232c 100644
--- a/sdks/python/pom.xml
+++ b/sdks/python/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-parent</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.6.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
[6/9] beam git commit: Ignore results from the tox clean up phase
Posted by al...@apache.org.
Ignore results from the tox clean up phase
Some temporary files are generated only under certain conditions and
this should not fail tox.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ef47c9f5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ef47c9f5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ef47c9f5
Branch: refs/heads/release-0.6.0
Commit: ef47c9f511b0f5730b0dc417aefa703fd6f974c5
Parents: c7c4da2
Author: Ahmet Altay <al...@google.com>
Authored: Fri Mar 10 16:21:17 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:15:45 2017 -0800
----------------------------------------------------------------------
sdks/python/tox.ini | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ef47c9f5/sdks/python/tox.ini
----------------------------------------------------------------------
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 807fe3f..2ed21c6 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -51,11 +51,11 @@ commands =
pip install -e .[test]
python apache_beam/examples/complete/autocomplete_test.py
python setup.py test
- # Clean up all cython generated files.
- find apache_beam -type f -name '*.c' -delete
- find apache_beam -type f -name '*.so' -delete
- find target/build -type f -name '*.c' -delete
- find target/build -type f -name '*.so' -delete
+ # Clean up all cython generated files. Ignore if deletion fails.
+ - find apache_beam -type f -name '*.c' -delete
+ - find apache_beam -type f -name '*.so' -delete
+ - find target/build -type f -name '*.c' -delete
+ - find target/build -type f -name '*.so' -delete
passenv = TRAVIS*
[testenv:py27gcp]
[2/9] beam git commit: Introduce Flink-specific state GC
implementations
Posted by al...@apache.org.
Introduce Flink-specific state GC implementations
We now set the GC timer for window.maxTimestamp() + 1 to ensure that a
user timer set for window.maxTimestamp() still has all state.
This also adds tests for late data dropping and state GC specifically
for the Flink DoFnOperator.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8fa718db
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8fa718db
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8fa718db
Branch: refs/heads/release-0.6.0
Commit: 8fa718db5bc14efd1beefc2c757c331a5bdbf927
Parents: a18b5b1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 11:07:00 2017 +0100
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 10 17:13:49 2017 -0800
----------------------------------------------------------------------
.../apache/beam/runners/core/DoFnRunners.java | 15 +-
.../beam/runners/core/StatefulDoFnRunner.java | 87 -------
.../runners/core/StatefulDoFnRunnerTest.java | 110 ++++++++-
.../wrappers/streaming/DoFnOperator.java | 111 ++++++++-
.../flink/streaming/DoFnOperatorTest.java | 225 +++++++++++++++++++
5 files changed, 439 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8fa718db/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 9455eea..a1b7c8b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -21,9 +21,6 @@ import java.util.List;
import org.apache.beam.runners.core.ExecutionContext.StepContext;
import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer;
import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner;
-import org.apache.beam.runners.core.StatefulDoFnRunner.StateInternalsStateCleaner;
-import org.apache.beam.runners.core.StatefulDoFnRunner.TimeInternalsCleanupTimer;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
@@ -135,18 +132,13 @@ public class DoFnRunners {
DoFnRunner<InputT, OutputT> doFnRunner,
StepContext stepContext,
AggregatorFactory aggregatorFactory,
- WindowingStrategy<?, ?> windowingStrategy) {
+ WindowingStrategy<?, ?> windowingStrategy,
+ CleanupTimer cleanupTimer,
+ StateCleaner<W> stateCleaner) {
Aggregator<Long, Long> droppedDueToLateness = aggregatorFactory.createAggregatorForDoFn(
fn.getClass(), stepContext, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER,
Sum.ofLongs());
- CleanupTimer cleanupTimer =
- new TimeInternalsCleanupTimer(stepContext.timerInternals(), windowingStrategy);
-
- Coder<W> windowCoder = (Coder<W>) windowingStrategy.getWindowFn().windowCoder();
- StateCleaner<W> stateCleaner =
- new StateInternalsStateCleaner<>(fn, stepContext.stateInternals(), windowCoder);
-
return new StatefulDoFnRunner<>(
doFnRunner,
windowingStrategy,
@@ -154,5 +146,4 @@ public class DoFnRunners {
stateCleaner,
droppedDueToLateness);
}
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fa718db/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 926345e..c672902 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -17,12 +17,8 @@
*/
package org.apache.beam.runners.core;
-import java.util.Map;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -30,8 +26,6 @@ import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateSpec;
import org.joda.time.Instant;
/**
@@ -45,7 +39,6 @@ import org.joda.time.Instant;
public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
implements DoFnRunner<InputT, OutputT> {
- public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "StatefulParDoDropped";
private final DoFnRunner<InputT, OutputT> doFnRunner;
@@ -167,84 +160,4 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
void clearForWindow(W window);
}
-
- /**
- * A {@link CleanupTimer} implemented by TimerInternals.
- */
- public static class TimeInternalsCleanupTimer implements CleanupTimer {
-
- private final TimerInternals timerInternals;
- private final WindowingStrategy<?, ?> windowingStrategy;
- private final Coder<BoundedWindow> windowCoder;
-
- public TimeInternalsCleanupTimer(
- TimerInternals timerInternals,
- WindowingStrategy<?, ?> windowingStrategy) {
- this.windowingStrategy = windowingStrategy;
- WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
- windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder();
- this.timerInternals = timerInternals;
- }
-
- @Override
- public Instant currentInputWatermarkTime() {
- return timerInternals.currentInputWatermarkTime();
- }
-
- @Override
- public void setForWindow(BoundedWindow window) {
- Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
- timerInternals.setTimer(StateNamespaces.window(windowCoder, window),
- GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
- }
-
- @Override
- public boolean isForWindow(
- String timerId,
- BoundedWindow window,
- Instant timestamp,
- TimeDomain timeDomain) {
- boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
- Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
- return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
- }
- }
-
- /**
- * A {@link StateCleaner} implemented by StateInternals.
- */
- public static class StateInternalsStateCleaner<W extends BoundedWindow>
- implements StateCleaner<W> {
-
- private final DoFn<?, ?> fn;
- private final DoFnSignature signature;
- private final StateInternals<?> stateInternals;
- private final Coder<W> windowCoder;
-
- public StateInternalsStateCleaner(
- DoFn<?, ?> fn,
- StateInternals<?> stateInternals,
- Coder<W> windowCoder) {
- this.fn = fn;
- this.signature = DoFnSignatures.getSignature(fn.getClass());
- this.stateInternals = stateInternals;
- this.windowCoder = windowCoder;
- }
-
- @Override
- public void clearForWindow(W window) {
- for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
- signature.stateDeclarations().entrySet()) {
- try {
- StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn);
- State state = stateInternals.state(StateNamespaces.window(windowCoder, window),
- StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
- state.clear();
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fa718db/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index 54ac77e..fd6a73c 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
import com.google.common.base.MoreObjects;
import java.util.Collections;
+import java.util.Map;
import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -31,14 +32,18 @@ import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.NullSideInputReader;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.util.state.StateSpecs;
import org.apache.beam.sdk.util.state.ValueState;
@@ -114,7 +119,14 @@ public class StatefulDoFnRunnerTest {
DoFn<KV<String, Integer>, Integer> fn = new MyDoFn();
DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
- fn, getDoFnRunner(fn), mockStepContext, aggregatorFactory, WINDOWING_STRATEGY);
+ fn,
+ getDoFnRunner(fn),
+ mockStepContext,
+ aggregatorFactory,
+ WINDOWING_STRATEGY,
+ new TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
+ new StateInternalsStateCleaner<>(
+ fn, stateInternals, (Coder) WINDOWING_STRATEGY.getWindowFn().windowCoder()));
runner.startBundle();
@@ -125,13 +137,6 @@ public class StatefulDoFnRunnerTest {
WindowedValue.of(KV.of("hello", 1), timestamp, window, PaneInfo.NO_FIRING));
assertEquals(1L, droppedDueToLateness.sum);
- runner.onTimer("processTimer", window, timestamp, TimeDomain.PROCESSING_TIME);
- assertEquals(2L, droppedDueToLateness.sum);
-
- runner.onTimer("synchronizedProcessTimer", window, timestamp,
- TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
- assertEquals(3L, droppedDueToLateness.sum);
-
runner.finishBundle();
}
@@ -143,7 +148,14 @@ public class StatefulDoFnRunnerTest {
StateTag<Object, ValueState<Integer>> stateTag = StateTags.tagForSpec(fn.stateId, fn.intState);
DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
- fn, getDoFnRunner(fn), mockStepContext, aggregatorFactory, WINDOWING_STRATEGY);
+ fn,
+ getDoFnRunner(fn),
+ mockStepContext,
+ aggregatorFactory,
+ WINDOWING_STRATEGY,
+ new TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
+ new StateInternalsStateCleaner<>(
+ fn, stateInternals, (Coder) WINDOWING_STRATEGY.getWindowFn().windowCoder()));
Instant elementTime = new Instant(1);
@@ -252,4 +264,84 @@ public class StatefulDoFnRunnerTest {
}
}
+ /**
+ * A {@link StatefulDoFnRunner.CleanupTimer} implemented by TimerInternals.
+ */
+ public static class TimeInternalsCleanupTimer implements StatefulDoFnRunner.CleanupTimer {
+
+ public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
+
+ private final TimerInternals timerInternals;
+ private final WindowingStrategy<?, ?> windowingStrategy;
+ private final Coder<BoundedWindow> windowCoder;
+
+ public TimeInternalsCleanupTimer(
+ TimerInternals timerInternals,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ this.windowingStrategy = windowingStrategy;
+ WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
+ windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder();
+ this.timerInternals = timerInternals;
+ }
+
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return timerInternals.currentInputWatermarkTime();
+ }
+
+ @Override
+ public void setForWindow(BoundedWindow window) {
+ Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+ timerInternals.setTimer(StateNamespaces.window(windowCoder, window),
+ GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
+ }
+
+ @Override
+ public boolean isForWindow(
+ String timerId,
+ BoundedWindow window,
+ Instant timestamp,
+ TimeDomain timeDomain) {
+ boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
+ Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+ return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
+ }
+ }
+
+ /**
+ * A {@link StatefulDoFnRunner.StateCleaner} implemented by StateInternals.
+ */
+ public static class StateInternalsStateCleaner<W extends BoundedWindow>
+ implements StatefulDoFnRunner.StateCleaner<W> {
+
+ private final DoFn<?, ?> fn;
+ private final DoFnSignature signature;
+ private final StateInternals<?> stateInternals;
+ private final Coder<W> windowCoder;
+
+ public StateInternalsStateCleaner(
+ DoFn<?, ?> fn,
+ StateInternals<?> stateInternals,
+ Coder<W> windowCoder) {
+ this.fn = fn;
+ this.signature = DoFnSignatures.getSignature(fn.getClass());
+ this.stateInternals = stateInternals;
+ this.windowCoder = windowCoder;
+ }
+
+ @Override
+ public void clearForWindow(W window) {
+ for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
+ signature.stateDeclarations().entrySet()) {
+ try {
+ StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn);
+ State state = stateInternals.state(StateNamespaces.window(windowCoder, window),
+ StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
+ state.clear();
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fa718db/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index c4622ba..a8ce680 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -43,6 +43,7 @@ import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
@@ -61,13 +62,18 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.NullSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -286,6 +292,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
//
// for some K, V
+
doFnRunner = DoFnRunners.lateDataDroppingRunner(
(DoFnRunner) doFnRunner,
stepContext,
@@ -293,8 +300,27 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
((GroupAlsoByWindowViaWindowSetNewDoFn) doFn).getDroppedDueToLatenessAggregator());
} else if (keyCoder != null) {
// It is a stateful DoFn
+
+ StatefulDoFnRunner.CleanupTimer cleanupTimer =
+ new TimeInternalsCleanupTimer(stepContext.timerInternals(), windowingStrategy);
+
+ // we don't know the window type
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ StatefulDoFnRunner.StateCleaner<?> stateCleaner =
+ new StateInternalsStateCleaner<>(
+ doFn, stepContext.stateInternals(), windowCoder);
+
doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
- doFn, doFnRunner, stepContext, aggregatorFactory, windowingStrategy);
+ doFn,
+ doFnRunner,
+ stepContext,
+ aggregatorFactory,
+ windowingStrategy,
+ cleanupTimer,
+ stateCleaner);
}
pushbackDoFnRunner =
@@ -746,7 +772,90 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
public Instant currentOutputWatermarkTime() {
return new Instant(currentOutputWatermark);
}
+ }
+
+
+ /**
+ * A {@link StatefulDoFnRunner.CleanupTimer} implemented by TimerInternals.
+ */
+ public static class TimeInternalsCleanupTimer implements StatefulDoFnRunner.CleanupTimer {
+
+ public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
+
+ private final TimerInternals timerInternals;
+ private final WindowingStrategy<?, ?> windowingStrategy;
+ private final Coder<BoundedWindow> windowCoder;
+
+ public TimeInternalsCleanupTimer(
+ TimerInternals timerInternals,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ this.windowingStrategy = windowingStrategy;
+ WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
+ windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder();
+ this.timerInternals = timerInternals;
+ }
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return timerInternals.currentInputWatermarkTime();
+ }
+
+ @Override
+ public void setForWindow(BoundedWindow window) {
+ Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+ // make sure this fires after any window.maxTimestamp() timers
+ gcTime = gcTime.plus(1L);
+ timerInternals.setTimer(StateNamespaces.window(windowCoder, window),
+ GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
+ }
+
+ @Override
+ public boolean isForWindow(
+ String timerId,
+ BoundedWindow window,
+ Instant timestamp,
+ TimeDomain timeDomain) {
+ boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
+ Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+ gcTime = gcTime.plus(1L);
+ return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
+ }
}
+ /**
+ * A {@link StatefulDoFnRunner.StateCleaner} implemented by StateInternals.
+ */
+ public static class StateInternalsStateCleaner<W extends BoundedWindow>
+ implements StatefulDoFnRunner.StateCleaner<W> {
+
+ private final DoFn<?, ?> fn;
+ private final DoFnSignature signature;
+ private final StateInternals<?> stateInternals;
+ private final Coder<W> windowCoder;
+
+ public StateInternalsStateCleaner(
+ DoFn<?, ?> fn,
+ StateInternals<?> stateInternals,
+ Coder<W> windowCoder) {
+ this.fn = fn;
+ this.signature = DoFnSignatures.getSignature(fn.getClass());
+ this.stateInternals = stateInternals;
+ this.windowCoder = windowCoder;
+ }
+
+ @Override
+ public void clearForWindow(W window) {
+ for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
+ signature.stateDeclarations().entrySet()) {
+ try {
+ StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn);
+ State state = stateInternals.state(StateNamespaces.window(windowCoder, window),
+ StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
+ state.clear();
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fa718db/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 7d14a87..bbd3428 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -17,7 +17,9 @@
*/
package org.apache.beam.runners.flink.streaming;
+import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import com.google.common.base.Function;
@@ -29,9 +31,12 @@ import java.util.Collections;
import java.util.HashMap;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PCollectionViewTesting;
import org.apache.beam.sdk.transforms.DoFn;
@@ -40,14 +45,23 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
@@ -169,6 +183,217 @@ public class DoFnOperatorTest {
testHarness.close();
}
+ @Test
+ public void testLateDroppingForStatefulFn() throws Exception {
+
+ WindowingStrategy<Object, IntervalWindow> windowingStrategy =
+ WindowingStrategy.of(FixedWindows.of(new Duration(10)));
+
+ DoFn<Integer, String> fn = new DoFn<Integer, String>() {
+
+ @StateId("state")
+ private final StateSpec<Object, ValueState<String>> stateSpec =
+ StateSpecs.value(StringUtf8Coder.of());
+
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ context.output(context.element().toString());
+ }
+ };
+
+ WindowedValue.FullWindowedValueCoder<Integer> windowedValueCoder =
+ WindowedValue.getFullCoder(
+ VarIntCoder.of(),
+ windowingStrategy.getWindowFn().windowCoder());
+
+ TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+ DoFnOperator<Integer, String, WindowedValue<String>> doFnOperator = new DoFnOperator<>(
+ fn,
+ windowedValueCoder,
+ outputTag,
+ Collections.<TupleTag<?>>emptyList(),
+ new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<String>>(),
+ windowingStrategy,
+ new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+ Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+ PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ VarIntCoder.of() /* key coder */);
+
+ OneInputStreamOperatorTestHarness<WindowedValue<Integer>, WindowedValue<String>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ doFnOperator,
+ new KeySelector<WindowedValue<Integer>, Integer>() {
+ @Override
+ public Integer getKey(WindowedValue<Integer> integerWindowedValue) throws Exception {
+ return integerWindowedValue.getValue();
+ }
+ },
+ new CoderTypeInformation<>(VarIntCoder.of()));
+
+ testHarness.open();
+
+ testHarness.processWatermark(0);
+
+ IntervalWindow window1 = new IntervalWindow(new Instant(0), Duration.millis(10));
+
+ // this should not be late
+ testHarness.processElement(
+ new StreamRecord<>(WindowedValue.of(13, new Instant(0), window1, PaneInfo.NO_FIRING)));
+
+ assertThat(
+ this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+ contains(WindowedValue.of("13", new Instant(0), window1, PaneInfo.NO_FIRING)));
+
+ testHarness.getOutput().clear();
+
+ testHarness.processWatermark(9);
+
+ // this should still not be considered late
+ testHarness.processElement(
+ new StreamRecord<>(WindowedValue.of(17, new Instant(0), window1, PaneInfo.NO_FIRING)));
+
+ assertThat(
+ this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+ contains(WindowedValue.of("17", new Instant(0), window1, PaneInfo.NO_FIRING)));
+
+ testHarness.getOutput().clear();
+
+ testHarness.processWatermark(10);
+
+ // this should now be considered late
+ testHarness.processElement(
+ new StreamRecord<>(WindowedValue.of(17, new Instant(0), window1, PaneInfo.NO_FIRING)));
+
+ assertThat(
+ this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+ emptyIterable());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testStateGCForStatefulFn() throws Exception {
+
+ WindowingStrategy<Object, IntervalWindow> windowingStrategy =
+ WindowingStrategy.of(FixedWindows.of(new Duration(10)));
+
+ final String timerId = "boo";
+ final String stateId = "dazzle";
+
+ final int offset = 5000;
+ final int timerOutput = 4093;
+
+ DoFn<KV<String, Integer>, KV<String, Integer>> fn =
+ new DoFn<KV<String, Integer>, KV<String, Integer>>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @StateId(stateId)
+ private final StateSpec<Object, ValueState<String>> stateSpec =
+ StateSpecs.value(StringUtf8Coder.of());
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext context,
+ @TimerId(timerId) Timer timer,
+ @StateId(stateId) ValueState<String> state,
+ BoundedWindow window) {
+ timer.set(window.maxTimestamp());
+ state.write(context.element().getKey());
+ context.output(
+ KV.of(context.element().getKey(), context.element().getValue() + offset));
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OnTimerContext context, @StateId(stateId) ValueState<String> state) {
+ context.output(KV.of(state.read(), timerOutput));
+ }
+ };
+
+ WindowedValue.FullWindowedValueCoder<KV<String, Integer>> windowedValueCoder =
+ WindowedValue.getFullCoder(
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
+ windowingStrategy.getWindowFn().windowCoder());
+
+ TupleTag<KV<String, Integer>> outputTag = new TupleTag<>("main-output");
+
+ DoFnOperator<
+ KV<String, Integer>, KV<String, Integer>, WindowedValue<KV<String, Integer>>> doFnOperator =
+ new DoFnOperator<>(
+ fn,
+ windowedValueCoder,
+ outputTag,
+ Collections.<TupleTag<?>>emptyList(),
+ new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<String, Integer>>>(),
+ windowingStrategy,
+ new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+ Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+ PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ StringUtf8Coder.of() /* key coder */);
+
+ KeyedOneInputStreamOperatorTestHarness<
+ String,
+ WindowedValue<KV<String, Integer>>,
+ WindowedValue<KV<String, Integer>>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ doFnOperator,
+ new KeySelector<WindowedValue<KV<String, Integer>>, String>() {
+ @Override
+ public String getKey(
+ WindowedValue<KV<String, Integer>> kvWindowedValue) throws Exception {
+ return kvWindowedValue.getValue().getKey();
+ }
+ },
+ new CoderTypeInformation<>(StringUtf8Coder.of()));
+
+ testHarness.open();
+
+ testHarness.processWatermark(0);
+
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ IntervalWindow window1 = new IntervalWindow(new Instant(0), Duration.millis(10));
+
+ testHarness.processElement(
+ new StreamRecord<>(
+ WindowedValue.of(KV.of("key1", 5), new Instant(1), window1, PaneInfo.NO_FIRING)));
+
+ testHarness.processElement(
+ new StreamRecord<>(
+ WindowedValue.of(KV.of("key2", 7), new Instant(3), window1, PaneInfo.NO_FIRING)));
+
+ assertThat(
+ this.<KV<String, Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+ contains(
+ WindowedValue.of(
+ KV.of("key1", 5 + offset), new Instant(1), window1, PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ KV.of("key2", 7 + offset), new Instant(3), window1, PaneInfo.NO_FIRING)));
+
+ assertEquals(2, testHarness.numKeyedStateEntries());
+
+ testHarness.getOutput().clear();
+
+ // this should trigger both the window.maxTimestamp() timer and the GC timer
+ // this tests that the GC timer fires after the user timer
+ testHarness.processWatermark(15);
+
+ assertThat(
+ this.<KV<String, Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+ contains(
+ WindowedValue.of(
+ KV.of("key1", timerOutput), new Instant(9), window1, PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ KV.of("key2", timerOutput), new Instant(9), window1, PaneInfo.NO_FIRING)));
+
+ // ensure the state was garbage collected
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ testHarness.close();
+ }
+
public void testSideInputs(boolean keyed) throws Exception {
WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =