You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/10 04:49:31 UTC

[01/43] beam git commit: Properly convert milliseconds whether there's less than 3/more than 9 digits. TimeUtil did not properly convert (and returned null) when the number of digits for fractions of seconds was less than 3 digits or more than 9 digits.

Repository: beam
Updated Branches:
  refs/heads/gearpump-runner 627ae0bc3 -> 1ce60b488


Properly convert milliseconds whether there's less than 3/more than 9 digits.
TimeUtil did not properly convert (and returned null) when the number of
digits for fractions of seconds was less than 3 digits or more than 9 digits.
The solution is to pad with zeros when there is less than 3 digits and to
truncate when there is more than 3.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b8cd640
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b8cd640
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b8cd640

Branch: refs/heads/gearpump-runner
Commit: 7b8cd6401cb5ed6e184ed36571a89d3ae324dd5f
Parents: 893bf42
Author: Jeremie Lenfant-Engelmann <je...@google.com>
Authored: Wed Jun 28 18:32:56 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Jun 30 08:59:19 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/util/TimeUtil.java    | 24 ++++++++------------
 .../runners/dataflow/util/TimeUtilTest.java     |  6 +++++
 2 files changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7b8cd640/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java
index bff379f..172dc6e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.util;
 
+import com.google.common.base.Strings;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
@@ -98,26 +99,19 @@ public final class TimeUtil {
     int hour = Integer.valueOf(matcher.group(4));
     int minute = Integer.valueOf(matcher.group(5));
     int second = Integer.valueOf(matcher.group(6));
-    int millis = 0;
-
-    String frac = matcher.group(7);
-    if (frac != null) {
-      int fracs = Integer.valueOf(frac);
-      if (frac.length() == 3) {  // millisecond resolution
-        millis = fracs;
-      } else if (frac.length() == 6) {  // microsecond resolution
-        millis = fracs / 1000;
-      } else if (frac.length() == 9) {  // nanosecond resolution
-        millis = fracs / 1000000;
-      } else {
-        return null;
-      }
-    }
+    int millis = computeMillis(matcher.group(7));
 
     return new DateTime(year, month, day, hour, minute, second, millis,
         ISOChronology.getInstanceUTC()).toInstant();
   }
 
+  private static int computeMillis(String frac) {
+    if (frac == null) {
+      return 0;
+    }
+    return Integer.valueOf(frac.length() > 3 ? frac.substring(0, 3) : Strings.padEnd(frac, 3, '0'));
+  }
+
   /**
    * Converts a {@link ReadableDuration} into a Dataflow API duration string.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/7b8cd640/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java
index e0785d4..1ac9fab 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java
@@ -47,8 +47,14 @@ public final class TimeUtilTest {
     assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001001Z"));
     assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000000Z"));
     assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000001Z"));
+    assertEquals(new Instant(0), fromCloudTime("1970-01-01T00:00:00.0Z"));
+    assertEquals(new Instant(0), fromCloudTime("1970-01-01T00:00:00.00Z"));
+    assertEquals(new Instant(420), fromCloudTime("1970-01-01T00:00:00.42Z"));
+    assertEquals(new Instant(300), fromCloudTime("1970-01-01T00:00:00.3Z"));
+    assertEquals(new Instant(20), fromCloudTime("1970-01-01T00:00:00.02Z"));
     assertNull(fromCloudTime(""));
     assertNull(fromCloudTime("1970-01-01T00:00:00"));
+    assertNull(fromCloudTime("1970-01-01T00:00:00.1e3Z"));
   }
 
   @Test


[15/43] beam git commit: This closes #3485

Posted by ke...@apache.org.
This closes #3485


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f0549b46
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f0549b46
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f0549b46

Branch: refs/heads/gearpump-runner
Commit: f0549b464737d0d11ae1da8e0534622b10247208
Parents: 14fa7f7 06897b1
Author: Thomas Groh <tg...@google.com>
Authored: Wed Jul 5 09:22:20 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Jul 5 09:22:20 2017 -0700

----------------------------------------------------------------------
 .../jenkins/common_job_properties.groovy        |  5 +-
 .../job_beam_PreCommit_Website_Merge.groovy     | 59 ++++++++++++++++++++
 2 files changed, 62 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[20/43] beam git commit: This closes #3489: Fix DoFn javadoc: StateSpec does not require a key

Posted by ke...@apache.org.
This closes #3489: Fix DoFn javadoc: StateSpec does not require a key


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63f66d6d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63f66d6d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63f66d6d

Branch: refs/heads/gearpump-runner
Commit: 63f66d6d81652dba5a7804648cde0e7debffd0f6
Parents: 39c80f6 f99ab1a
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jul 5 15:00:09 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Jul 5 15:00:09 2017 -0700

----------------------------------------------------------------------
 .../core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------



[09/43] beam git commit: [BEAM-2530] This closes #3460

Posted by ke...@apache.org.
[BEAM-2530] This closes #3460


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0bd47c07
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0bd47c07
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0bd47c07

Branch: refs/heads/gearpump-runner
Commit: 0bd47c07659909c6b3c2d04fbdb96cc30a6e0243
Parents: 11010cf 75475ef
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Mon Jul 3 14:23:18 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Mon Jul 3 14:23:18 2017 +0200

----------------------------------------------------------------------
 runners/apex/pom.xml          |  7 +++++++
 runners/spark/pom.xml         |  7 +++++++
 sdks/java/io/hbase/pom.xml    |  7 +++++++
 sdks/java/io/hcatalog/pom.xml | 12 ++++++++++++
 4 files changed, 33 insertions(+)
----------------------------------------------------------------------



[21/43] beam git commit: Update SDK dependencies

Posted by ke...@apache.org.
Update SDK dependencies


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/23e385fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/23e385fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/23e385fa

Branch: refs/heads/gearpump-runner
Commit: 23e385faa193a00f9b10e3f8f0afe832087bff06
Parents: 63f66d6
Author: Ahmet Altay <al...@google.com>
Authored: Wed Jul 5 14:34:07 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Jul 5 16:11:37 2017 -0700

----------------------------------------------------------------------
 sdks/python/setup.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/23e385fa/sdks/python/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 6646a58..8a0c9ae 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -120,9 +120,9 @@ GCP_REQUIREMENTS = [
   'google-apitools>=0.5.10,<=0.5.11',
   'proto-google-cloud-datastore-v1>=0.90.0,<=0.90.4',
   'googledatastore==7.0.1',
-  'google-cloud-pubsub==0.25.0',
+  'google-cloud-pubsub==0.26.0',
   # GCP packages required by tests
-  'google-cloud-bigquery>=0.23.0,<0.25.0',
+  'google-cloud-bigquery>=0.23.0,<0.26.0',
 ]
 
 


[05/43] beam git commit: Fix DoFn javadoc: StateSpec does not require a key

Posted by ke...@apache.org.
Fix DoFn javadoc: StateSpec does not require a key


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f99ab1a4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f99ab1a4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f99ab1a4

Branch: refs/heads/gearpump-runner
Commit: f99ab1a472868e4ce175a86e5d76823b1c09c10b
Parents: 0e429b3
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jun 30 21:42:17 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 30 21:42:17 2017 -0700

----------------------------------------------------------------------
 .../core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f99ab1a4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index fb6d0ee..a2e5c16 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -385,7 +385,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    * <pre><code>{@literal new DoFn<KV<Key, Foo>, Baz>()} {
    *
    *  {@literal @StateId("my-state-id")}
-   *  {@literal private final StateSpec<K, ValueState<MyState>>} myStateSpec =
+   *  {@literal private final StateSpec<ValueState<MyState>>} myStateSpec =
    *       StateSpecs.value(new MyStateCoder());
    *
    *  {@literal @ProcessElement}


[32/43] beam git commit: This closes #3429: [BEAM-2502, BEAM-2505] More fixes to ReduceFnRunner.onTimers

Posted by ke...@apache.org.
This closes #3429: [BEAM-2502, BEAM-2505] More fixes to ReduceFnRunner.onTimers

  Process timer firings for a window together
  Ignore processing time timers in expired windows


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/628dace9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/628dace9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/628dace9

Branch: refs/heads/gearpump-runner
Commit: 628dace9c173182299f322fa6ed974c75dbff4b5
Parents: b8f8d18 935c077
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 6 14:13:56 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 6 14:13:56 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/game/LeaderBoardTest.java |   2 +
 .../beam/runners/core/ReduceFnRunner.java       | 106 +++++++++++++------
 .../beam/runners/core/ReduceFnRunnerTest.java   |  81 +++++++++++++-
 3 files changed, 156 insertions(+), 33 deletions(-)
----------------------------------------------------------------------



[41/43] beam git commit: Upgrade to gearpump 0.8.4

Posted by ke...@apache.org.
Upgrade to gearpump 0.8.4


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/22068274
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/22068274
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/22068274

Branch: refs/heads/gearpump-runner
Commit: 22068274428666da93c793a170810ddb42755704
Parents: c2d3fbc
Author: manuzhang <ow...@gmail.com>
Authored: Fri Jul 7 16:41:13 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Jul 7 22:08:37 2017 +0800

----------------------------------------------------------------------
 runners/gearpump/pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/22068274/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 3c98d5e..54c8d5c 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -43,7 +43,7 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-    <gearpump.version>0.8.4-SNAPSHOT</gearpump.version>
+    <gearpump.version>0.8.4</gearpump.version>
   </properties>
 
   <profiles>
@@ -58,7 +58,7 @@
             <artifactId>maven-surefire-plugin</artifactId>
             <executions>
               <execution>
-                <id>runnable-on-service-tests</id>
+                <id>validates-runner-tests</id>
                 <phase>integration-test</phase>
                 <goals>
                   <goal>test</goal>


[34/43] beam git commit: This closes #3467: Made DataflowRunner TransformTranslator public

Posted by ke...@apache.org.
This closes #3467: Made DataflowRunner TransformTranslator public


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/17d7e598
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/17d7e598
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/17d7e598

Branch: refs/heads/gearpump-runner
Commit: 17d7e5980e58a6ee668f41f1518a09bed274b5b2
Parents: 628dace da92256
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 6 21:18:05 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 6 21:18:05 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/runners/dataflow/TransformTranslator.java     | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[24/43] beam git commit: Fix PValue input in _PubSubReadEvaluator

Posted by ke...@apache.org.
Fix PValue input in _PubSubReadEvaluator


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a75202f3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a75202f3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a75202f3

Branch: refs/heads/gearpump-runner
Commit: a75202f344f22be5c5fdf62b3eb54a151ad29af6
Parents: e8fdd37
Author: Charles Chen <cc...@google.com>
Authored: Wed Jul 5 16:18:51 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Jul 5 18:08:59 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/direct/transform_evaluator.py | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a75202f3/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 641291d..cb2ace2 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -436,8 +436,9 @@ class _PubSubReadEvaluator(_TransformEvaluator):
       bundles = [bundle]
     else:
       bundles = []
-    input_pvalue = self._applied_ptransform.inputs
-    if not input_pvalue:
+    if self._applied_ptransform.inputs:
+      input_pvalue = self._applied_ptransform.inputs[0]
+    else:
       input_pvalue = pvalue.PBegin(self._applied_ptransform.transform.pipeline)
     unprocessed_bundle = self._evaluation_context.create_bundle(
         input_pvalue)


[42/43] beam git commit: Fix ParDoTest#testPipelineOptionsParameter

Posted by ke...@apache.org.
Fix ParDoTest#testPipelineOptionsParameter


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/725f547f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/725f547f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/725f547f

Branch: refs/heads/gearpump-runner
Commit: 725f547f5e487dd3e84d5d0f95c0fa3efa853279
Parents: 2206827
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jul 8 00:13:19 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jul 8 00:13:19 2017 +0800

----------------------------------------------------------------------
 .../gearpump/translators/io/GearpumpSource.java  | 12 ++----------
 .../translators/utils/DoFnRunnerFactory.java     |  5 +++--
 .../translators/utils/TranslatorUtils.java       | 19 +++++++++++++++++++
 3 files changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/725f547f/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index daa8c81..2f53139 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -18,9 +18,6 @@
 
 package org.apache.beam.runners.gearpump.translators.io;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 import java.io.IOException;
 import java.time.Instant;
 
@@ -48,11 +45,7 @@ public abstract class GearpumpSource<T> implements DataSource {
   private boolean available = false;
 
   GearpumpSource(PipelineOptions options) {
-    try {
-      this.serializedOptions = new ObjectMapper().writeValueAsBytes(options);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
+    this.serializedOptions = TranslatorUtils.serializePipelineOptions(options);
   }
 
   protected abstract Source.Reader<T> createReader(PipelineOptions options) throws IOException;
@@ -60,8 +53,7 @@ public abstract class GearpumpSource<T> implements DataSource {
   @Override
   public void open(TaskContext context, Instant startTime) {
     try {
-      PipelineOptions options = new ObjectMapper()
-          .readValue(serializedOptions, PipelineOptions.class);
+      PipelineOptions options = TranslatorUtils.deserializePipelineOptions(serializedOptions);
       this.reader = createReader(options);
       this.available = reader.start();
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/725f547f/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
index 35cf2b5..375b696 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -43,7 +43,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
 
   private static final long serialVersionUID = -4109539010014189725L;
   private final DoFn<InputT, OutputT> fn;
-  private final transient PipelineOptions options;
+  private final byte[] serializedOptions;
   private final Collection<PCollectionView<?>> sideInputs;
   private final DoFnRunners.OutputManager outputManager;
   private final TupleTag<OutputT> mainOutputTag;
@@ -61,7 +61,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
       StepContext stepContext,
       WindowingStrategy<?, ?> windowingStrategy) {
     this.fn = doFn;
-    this.options = pipelineOptions;
+    this.serializedOptions = TranslatorUtils.serializePipelineOptions(pipelineOptions);
     this.sideInputs = sideInputs;
     this.outputManager = outputManager;
     this.mainOutputTag = mainOutputTag;
@@ -72,6 +72,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
 
   public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
       ReadyCheckingSideInputReader sideInputReader) {
+    PipelineOptions options = TranslatorUtils.deserializePipelineOptions(serializedOptions);
     DoFnRunner<InputT, OutputT> underlying = DoFnRunners.simpleRunner(
         options, fn, sideInputReader, outputManager, mainOutputTag,
         sideOutputTags, stepContext, windowingStrategy);

http://git-wip-us.apache.org/repos/asf/beam/blob/725f547f/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index b1cd61c..c14298f 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -18,8 +18,11 @@
 
 package org.apache.beam.runners.gearpump.translators.utils;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 
+import java.io.IOException;
 import java.time.Instant;
 import java.util.Collection;
 import java.util.HashMap;
@@ -27,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.beam.runners.gearpump.translators.TranslationContext;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -141,6 +145,21 @@ public class TranslatorUtils {
     }
   }
 
+  public static byte[] serializePipelineOptions(PipelineOptions options) {
+    try {
+      return new ObjectMapper().writeValueAsBytes(options);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static PipelineOptions deserializePipelineOptions(byte[] serializedOptions) {
+    try {
+      return new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
 
   /**
    * This is copied from org.apache.beam.sdk.transforms.join.RawUnionValue.


[26/43] beam git commit: [BEAM-2534] Handle offset gaps in Kafka messages.

Posted by ke...@apache.org.
[BEAM-2534] Handle offset gaps in Kafka messages.

KafkaIO logged a warning when there is a gap in offstes for messages.
Kafka also support 'KV' store style topics where some of the messages
are deleted leading gaps in offsets. This PR removes the log and
accounts for offset gaps in backlog estimate.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2259c309
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2259c309
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2259c309

Branch: refs/heads/gearpump-runner
Commit: 2259c309c5b81a5d1e32732dd35e1102766401fa
Parents: bf6dda3
Author: Raghu Angadi <ra...@google.com>
Authored: Wed Jun 28 12:07:06 2017 -0700
Committer: JingsongLi <lz...@aliyun.com>
Committed: Fri Jul 7 00:44:02 2017 +0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 49 ++++++++++++--------
 1 file changed, 29 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2259c309/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 702bdd3..e520367 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -904,6 +904,22 @@ public class KafkaIO {
       return name;
     }
 
+    // Maintains approximate average over last 1000 elements
+    private static class MovingAvg {
+      private static final int MOVING_AVG_WINDOW = 1000;
+      private double avg = 0;
+      private long numUpdates = 0;
+
+      void update(double quantity) {
+        numUpdates++;
+        avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates);
+      }
+
+      double get() {
+        return avg;
+      }
+    }
+
     // maintains state of each assigned partition (buffered records, consumed offset, etc)
     private static class PartitionState {
       private final TopicPartition topicPartition;
@@ -911,9 +927,8 @@ public class KafkaIO {
       private long latestOffset;
       private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();
 
-      // simple moving average for size of each record in bytes
-      private double avgRecordSize = 0;
-      private static final int movingAvgWindow = 1000; // very roughly avg of last 1000 elements
+      private MovingAvg avgRecordSize = new MovingAvg();
+      private MovingAvg avgOffsetGap = new MovingAvg(); // > 0 only when log compaction is enabled.
 
       PartitionState(TopicPartition partition, long nextOffset) {
         this.topicPartition = partition;
@@ -921,17 +936,13 @@ public class KafkaIO {
         this.latestOffset = UNINITIALIZED_OFFSET;
       }
 
-      // update consumedOffset and avgRecordSize
-      void recordConsumed(long offset, int size) {
+      // Update consumedOffset, avgRecordSize, and avgOffsetGap
+      void recordConsumed(long offset, int size, long offsetGap) {
         nextOffset = offset + 1;
 
-        // this is always updated from single thread. probably not worth making it an AtomicDouble
-        if (avgRecordSize <= 0) {
-          avgRecordSize = size;
-        } else {
-          // initially, first record heavily contributes to average.
-          avgRecordSize += ((size - avgRecordSize) / movingAvgWindow);
-        }
+        // This is always updated from single thread. Probably not worth making atomic.
+        avgRecordSize.update(size);
+        avgOffsetGap.update(offsetGap);
       }
 
       synchronized void setLatestOffset(long latestOffset) {
@@ -944,14 +955,15 @@ public class KafkaIO {
         if (backlogMessageCount == UnboundedReader.BACKLOG_UNKNOWN) {
           return UnboundedReader.BACKLOG_UNKNOWN;
         }
-        return (long) (backlogMessageCount * avgRecordSize);
+        return (long) (backlogMessageCount * avgRecordSize.get());
       }
 
       synchronized long backlogMessageCount() {
         if (latestOffset < 0 || nextOffset < 0) {
           return UnboundedReader.BACKLOG_UNKNOWN;
         }
-        return Math.max(0, (latestOffset - nextOffset));
+        double remaining = (latestOffset - nextOffset) / (1 + avgOffsetGap.get());
+        return Math.max(0, (long) Math.ceil(remaining));
       }
     }
 
@@ -1154,14 +1166,11 @@ public class KafkaIO {
             continue;
           }
 
-          // sanity check
-          if (offset != expected) {
-            LOG.warn("{}: gap in offsets for {} at {}. {} records missing.",
-                this, pState.topicPartition, expected, offset - expected);
-          }
+          long offsetGap = offset - expected; // could be > 0 when Kafka log compaction is enabled.
 
           if (curRecord == null) {
             LOG.info("{}: first record offset {}", name, offset);
+            offsetGap = 0;
           }
 
           curRecord = null; // user coders below might throw.
@@ -1182,7 +1191,7 @@ public class KafkaIO {
 
           int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length)
               + (rawRecord.value() == null ? 0 : rawRecord.value().length);
-          pState.recordConsumed(offset, recordSize);
+          pState.recordConsumed(offset, recordSize, offsetGap);
           bytesRead.inc(recordSize);
           bytesReadBySplit.inc(recordSize);
           return true;


[31/43] beam git commit: Process timer firings for a window together

Posted by ke...@apache.org.
Process timer firings for a window together


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/935c0773
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/935c0773
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/935c0773

Branch: refs/heads/gearpump-runner
Commit: 935c077341de580dddd4b29ffee3926795acf403
Parents: bd631b8
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 18:43:39 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 6 14:12:39 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/game/LeaderBoardTest.java |  2 +
 .../beam/runners/core/ReduceFnRunner.java       | 98 +++++++++++++-------
 .../beam/runners/core/ReduceFnRunnerTest.java   | 49 +++++++++-
 3 files changed, 115 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/935c0773/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
index 745c210..611e2b3 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
@@ -276,6 +276,8 @@ public class LeaderBoardTest implements Serializable {
         .addElements(event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)),
             event(TestUser.BLUE_TWO, 3, Duration.ZERO),
             event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(3)))
+        // Move the watermark to the end of the window to output on time
+        .advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION))
         // Move the watermark past the end of the allowed lateness plus the end of the window
         .advanceWatermarkTo(baseTime.plus(ALLOWED_LATENESS)
             .plus(TEAM_WINDOW_DURATION).plus(Duration.standardMinutes(1)))

http://git-wip-us.apache.org/repos/asf/beam/blob/935c0773/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 0632c05..634a2d1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -29,7 +29,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -638,11 +637,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
   }
 
   /**
-   * Enriches TimerData with state necessary for processing a timer as well as
-   * common queries about a timer.
+   * A descriptor of the activation for a window based on a timer.
    */
-  private class EnrichedTimerData {
-    public final Instant timestamp;
+  private class WindowActivation {
     public final ReduceFn<K, InputT, OutputT, W>.Context directContext;
     public final ReduceFn<K, InputT, OutputT, W>.Context renamedContext;
     // If this is an end-of-window timer then we may need to set a garbage collection timer
@@ -653,19 +650,34 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     // end-of-window time to be a signal to garbage collect.
     public final boolean isGarbageCollection;
 
-    EnrichedTimerData(
-        TimerData timer,
+    WindowActivation(
         ReduceFn<K, InputT, OutputT, W>.Context directContext,
         ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
-      this.timestamp = timer.getTimestamp();
       this.directContext = directContext;
       this.renamedContext = renamedContext;
       W window = directContext.window();
-      this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
-          && timer.getTimestamp().equals(window.maxTimestamp());
-      Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
+
+      // The output watermark is before the end of the window if it is either unknown
+      // or it is known to be before it. If it is unknown, that means that there hasn't been
+      // enough data to advance it.
+      boolean outputWatermarkBeforeEOW =
+              timerInternals.currentOutputWatermarkTime() == null
+          || !timerInternals.currentOutputWatermarkTime().isAfter(window.maxTimestamp());
+
+      // The "end of the window" is reached when the local input watermark (for this key) surpasses
+      // it but the local output watermark (also for this key) has not. After data is emitted and
+      // the output watermark hold is released, the output watermark on this key will immediately
+      // exceed the end of the window (otherwise we could see multiple ON_TIME outputs)
+      this.isEndOfWindow =
+          timerInternals.currentInputWatermarkTime().isAfter(window.maxTimestamp())
+              && outputWatermarkBeforeEOW;
+
+      // The "GC time" is reached when the input watermark surpasses the end of the window
+      // plus allowed lateness. After this, the window is expired and expunged.
       this.isGarbageCollection =
-          TimeDomain.EVENT_TIME == timer.getDomain() && !timer.getTimestamp().isBefore(cleanupTime);
+          timerInternals
+              .currentInputWatermarkTime()
+              .isAfter(LateDataUtils.garbageCollectionTime(window, windowingStrategy));
     }
 
     // Has this window had its trigger finish?
@@ -684,9 +696,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       return;
     }
 
-    // Create a reusable context for each timer and begin prefetching necessary
+    // Create a reusable context for each window and begin prefetching necessary
     // state.
-    List<EnrichedTimerData> enrichedTimers = new LinkedList();
+    Map<BoundedWindow, WindowActivation> windowActivations = new HashMap();
+
     for (TimerData timer : timers) {
       checkArgument(timer.getNamespace() instanceof WindowNamespace,
           "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
@@ -694,7 +707,24 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
         WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
       W window = windowNamespace.getWindow();
 
-      if (TimeDomain.PROCESSING_TIME == timer.getDomain() && windowIsExpired(window)) {
+      WindowTracing.debug("{}: Received timer key:{}; window:{}; data:{} with "
+              + "inputWatermark:{}; outputWatermark:{}",
+          ReduceFnRunner.class.getSimpleName(),
+          key, window, timer,
+          timerInternals.currentInputWatermarkTime(),
+          timerInternals.currentOutputWatermarkTime());
+
+      // Processing time timers for an expired window are ignored, just like elements
+      // that show up too late. Window GC is management by an event time timer
+      if (TimeDomain.EVENT_TIME != timer.getDomain() && windowIsExpired(window)) {
+        continue;
+      }
+
+      // How a window is processed is a function only of the current state, not the details
+      // of the timer. This makes us robust to large leaps in processing time and watermark
+      // time, where both EOW and GC timers come in together and we need to GC and emit
+      // the final pane.
+      if (windowActivations.containsKey(window)) {
         continue;
       }
 
@@ -702,11 +732,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
           contextFactory.base(window, StateStyle.DIRECT);
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
           contextFactory.base(window, StateStyle.RENAMED);
-      EnrichedTimerData enrichedTimer = new EnrichedTimerData(timer, directContext, renamedContext);
-      enrichedTimers.add(enrichedTimer);
+      WindowActivation windowActivation = new WindowActivation(directContext, renamedContext);
+      windowActivations.put(window, windowActivation);
 
       // Perform prefetching of state to determine if the trigger should fire.
-      if (enrichedTimer.isGarbageCollection) {
+      if (windowActivation.isGarbageCollection) {
         triggerRunner.prefetchIsClosed(directContext.state());
       } else {
         triggerRunner.prefetchShouldFire(directContext.window(), directContext.state());
@@ -714,7 +744,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     }
 
     // For those windows that are active and open, prefetch the triggering or emitting state.
-    for (EnrichedTimerData timer : enrichedTimers) {
+    for (WindowActivation timer : windowActivations.values()) {
       if (timer.windowIsActiveAndOpen()) {
         ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext;
         if (timer.isGarbageCollection) {
@@ -727,25 +757,27 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     }
 
     // Perform processing now that everything is prefetched.
-    for (EnrichedTimerData timer : enrichedTimers) {
-      ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext;
-      ReduceFn<K, InputT, OutputT, W>.Context renamedContext = timer.renamedContext;
+    for (WindowActivation windowActivation : windowActivations.values()) {
+      ReduceFn<K, InputT, OutputT, W>.Context directContext = windowActivation.directContext;
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext = windowActivation.renamedContext;
 
-      if (timer.isGarbageCollection) {
-        WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with "
-                + "inputWatermark:{}; outputWatermark:{}",
-            key, directContext.window(), timer.timestamp,
+      if (windowActivation.isGarbageCollection) {
+        WindowTracing.debug(
+            "{}: Cleaning up for key:{}; window:{} with inputWatermark:{}; outputWatermark:{}",
+            ReduceFnRunner.class.getSimpleName(),
+            key,
+            directContext.window(),
             timerInternals.currentInputWatermarkTime(),
             timerInternals.currentOutputWatermarkTime());
 
-        boolean windowIsActiveAndOpen = timer.windowIsActiveAndOpen();
+        boolean windowIsActiveAndOpen = windowActivation.windowIsActiveAndOpen();
         if (windowIsActiveAndOpen) {
           // We need to call onTrigger to emit the final pane if required.
           // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
           // and the watermark has passed the end of the window.
           @Nullable
           Instant newHold = onTrigger(
-              directContext, renamedContext, true /* isFinished */, timer.isEndOfWindow);
+              directContext, renamedContext, true /* isFinished */, windowActivation.isEndOfWindow);
           checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold);
         }
 
@@ -753,18 +785,20 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
         // see elements for it again.
         clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
       } else {
-        WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with "
+        WindowTracing.debug(
+            "{}.onTimers: Triggering for key:{}; window:{} at {} with "
                 + "inputWatermark:{}; outputWatermark:{}",
-            key, directContext.window(), timer.timestamp,
+            key,
+            directContext.window(),
             timerInternals.currentInputWatermarkTime(),
             timerInternals.currentOutputWatermarkTime());
-        if (timer.windowIsActiveAndOpen()
+        if (windowActivation.windowIsActiveAndOpen()
             && triggerRunner.shouldFire(
                    directContext.window(), directContext.timers(), directContext.state())) {
           emit(directContext, renamedContext);
         }
 
-        if (timer.isEndOfWindow) {
+        if (windowActivation.isEndOfWindow) {
           // If the window strategy trigger includes a watermark trigger then at this point
           // there should be no data holds, either because we'd already cleared them on an
           // earlier onTrigger, or because we just cleared them on the above emit.

http://git-wip-us.apache.org/repos/asf/beam/blob/935c0773/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 79ee91b..4f13af1 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -55,6 +55,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -79,7 +80,6 @@ import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -246,6 +246,52 @@ public class ReduceFnRunnerTest {
     tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
   }
 
+  /**
+   * Tests that with the default trigger we will not produce two ON_TIME panes, even
+   * if there are two outputs that are both candidates.
+   */
+  @Test
+  public void testOnlyOneOnTimePane() throws Exception {
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
+            .withTrigger(DefaultTrigger.of())
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(100));
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+    tester.advanceInputWatermark(new Instant(0));
+
+    int value1 = 1;
+    int value2 = 3;
+
+    // A single element that should be in the ON_TIME output
+    tester.injectElements(
+        TimestampedValue.of(value1, new Instant(1)));
+
+    // Should fire ON_TIME
+    tester.advanceInputWatermark(new Instant(10));
+
+    // The DefaultTrigger should cause output labeled LATE, even though it does not have to be
+    // labeled as such.
+    tester.injectElements(
+        TimestampedValue.of(value2, new Instant(3)));
+
+    List<WindowedValue<Integer>> output = tester.extractOutput();
+    assertEquals(2, output.size());
+
+    assertThat(output.get(0), WindowMatchers.isWindowedValue(equalTo(value1)));
+    assertThat(output.get(1), WindowMatchers.isWindowedValue(equalTo(value1 + value2)));
+
+    assertThat(
+        output.get(0),
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)));
+    assertThat(
+        output.get(1),
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.LATE, 1, 1)));
+  }
+
   @Test
   public void testOnElementCombiningDiscarding() throws Exception {
     // Test basic execution of a trigger using a non-combining window set and discarding mode.
@@ -458,7 +504,6 @@ public class ReduceFnRunnerTest {
    * marked as final.
    */
   @Test
-  @Ignore("https://issues.apache.org/jira/browse/BEAM-2505")
   public void testCombiningAccumulatingEventTime() throws Exception {
     WindowingStrategy<?, IntervalWindow> strategy =
         WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))


[08/43] beam git commit: [BEAM-2530] Fix compilation of modules with Java 9 that depend on jdk.tools

Posted by ke...@apache.org.
[BEAM-2530] Fix compilation of modules with Java 9 that depend on jdk.tools


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/75475ef3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/75475ef3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/75475ef3

Branch: refs/heads/gearpump-runner
Commit: 75475ef3dc23a09fa9bbba478d6fdbc468f7dd2e
Parents: 11010cf
Author: Ismaël Mejía <ie...@gmail.com>
Authored: Wed Jun 28 16:58:55 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Mon Jul 3 10:10:04 2017 +0200

----------------------------------------------------------------------
 runners/apex/pom.xml          |  7 +++++++
 runners/spark/pom.xml         |  7 +++++++
 sdks/java/io/hbase/pom.xml    |  7 +++++++
 sdks/java/io/hcatalog/pom.xml | 12 ++++++++++++
 4 files changed, 33 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/75475ef3/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 88ff0f2..20f2d28 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -75,6 +75,13 @@
       <artifactId>apex-engine</artifactId>
       <version>${apex.core.version}</version>
       <scope>runtime</scope>
+      <exclusions>
+        <!-- Fix build on JDK-9 -->
+        <exclusion>
+          <groupId>jdk.tools</groupId>
+          <artifactId>jdk.tools</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <!-- Beam -->

http://git-wip-us.apache.org/repos/asf/beam/blob/75475ef3/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 1d93427..8a69496 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -149,6 +149,13 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <scope>provided</scope>
+      <exclusions>
+        <!-- Fix build on JDK-9 -->
+        <exclusion>
+          <groupId>jdk.tools</groupId>
+          <artifactId>jdk.tools</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/75475ef3/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
index 9d5e2aa..40ac8df 100644
--- a/sdks/java/io/hbase/pom.xml
+++ b/sdks/java/io/hbase/pom.xml
@@ -121,6 +121,13 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <scope>test</scope>
+      <exclusions>
+        <!-- Fix build on JDK-9 -->
+        <exclusion>
+          <groupId>jdk.tools</groupId>
+          <artifactId>jdk.tools</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/75475ef3/sdks/java/io/hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/pom.xml b/sdks/java/io/hcatalog/pom.xml
index 8af740d..a31ff86 100644
--- a/sdks/java/io/hcatalog/pom.xml
+++ b/sdks/java/io/hcatalog/pom.xml
@@ -61,6 +61,13 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
+      <exclusions>
+        <!-- Fix build on JDK-9 -->
+        <exclusion>
+          <groupId>jdk.tools</groupId>
+          <artifactId>jdk.tools</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
@@ -109,6 +116,11 @@
           <groupId>com.google.protobuf</groupId>
           <artifactId>protobuf-java</artifactId>
         </exclusion>
+        <!-- Fix build on JDK-9 -->
+        <exclusion>
+          <groupId>jdk.tools</groupId>
+          <artifactId>jdk.tools</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
 


[39/43] beam git commit: This closes #3508

Posted by ke...@apache.org.
This closes #3508


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9e565f20
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9e565f20
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9e565f20

Branch: refs/heads/gearpump-runner
Commit: 9e565f20992eba8f52d16067c8c3be7f474c6cdf
Parents: 440c7d4 35061e8
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Fri Jul 7 10:31:53 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Fri Jul 7 10:31:53 2017 +0200

----------------------------------------------------------------------
 pom.xml                   | 18 ++++++++++++++++++
 sdks/java/javadoc/pom.xml | 15 +++++++++++++++
 2 files changed, 33 insertions(+)
----------------------------------------------------------------------



[25/43] beam git commit: This closes #3499

Posted by ke...@apache.org.
This closes #3499


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bf6dda32
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bf6dda32
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bf6dda32

Branch: refs/heads/gearpump-runner
Commit: bf6dda32047e0bfba18de5aa27b1b6a593f86fc9
Parents: e8fdd37 a75202f
Author: Ahmet Altay <al...@google.com>
Authored: Wed Jul 5 18:09:05 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Jul 5 18:09:05 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/direct/transform_evaluator.py | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[38/43] beam git commit: Fix javadoc generation for AmqpIO, CassandraIO and HCatalogIO

Posted by ke...@apache.org.
Fix javadoc generation for AmqpIO, CassandraIO and HCatalogIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/35061e88
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/35061e88
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/35061e88

Branch: refs/heads/gearpump-runner
Commit: 35061e88066589d1dbfa81aa37fbb270274d70c5
Parents: 440c7d4
Author: Ismaël Mejía <ie...@gmail.com>
Authored: Thu Jul 6 17:37:33 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Fri Jul 7 10:31:29 2017 +0200

----------------------------------------------------------------------
 pom.xml                   | 18 ++++++++++++++++++
 sdks/java/javadoc/pom.xml | 15 +++++++++++++++
 2 files changed, 33 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/35061e88/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 01474c1..d9ab9ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -428,6 +428,18 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-io-amqp</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-io-cassandra</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-io-elasticsearch</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -465,6 +477,12 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-io-hcatalog</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-io-jdbc</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/35061e88/sdks/java/javadoc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml
index ddb92cf..51109fb 100644
--- a/sdks/java/javadoc/pom.xml
+++ b/sdks/java/javadoc/pom.xml
@@ -99,6 +99,16 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-amqp</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-cassandra</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-io-elasticsearch</artifactId>
     </dependency>
 
@@ -124,6 +134,11 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-hcatalog</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-io-jdbc</artifactId>
     </dependency>
 


[16/43] beam git commit: Website Mergebot Job

Posted by ke...@apache.org.
Website Mergebot Job

Signed-off-by: Jason Kuster <ja...@google.com>


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/06897b1c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/06897b1c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/06897b1c

Branch: refs/heads/gearpump-runner
Commit: 06897b1cc142f658437ac7779c849e5182e331f1
Parents: 14fa7f7
Author: Jason Kuster <ja...@google.com>
Authored: Fri Jun 9 01:39:15 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Jul 5 09:22:20 2017 -0700

----------------------------------------------------------------------
 .../jenkins/common_job_properties.groovy        |  5 +-
 .../job_beam_PreCommit_Website_Merge.groovy     | 59 ++++++++++++++++++++
 2 files changed, 62 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/06897b1c/.test-infra/jenkins/common_job_properties.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/common_job_properties.groovy b/.test-infra/jenkins/common_job_properties.groovy
index 0e047ea..70534c6 100644
--- a/.test-infra/jenkins/common_job_properties.groovy
+++ b/.test-infra/jenkins/common_job_properties.groovy
@@ -23,11 +23,12 @@
 class common_job_properties {
 
   // Sets common top-level job properties for website repository jobs.
-  static void setTopLevelWebsiteJobProperties(context) {
+  static void setTopLevelWebsiteJobProperties(context,
+                                              String branch = 'asf-site') {
     setTopLevelJobProperties(
             context,
             'beam-site',
-            'asf-site',
+            branch,
             'beam',
             30)
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/06897b1c/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy b/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy
new file mode 100644
index 0000000..0e2ae3f
--- /dev/null
+++ b/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import common_job_properties
+
+// Defines a job.
+job('beam_PreCommit_Website_Merge') {
+  description('Runs website tests for mergebot.')
+
+  // Set common parameters.
+  common_job_properties.setTopLevelWebsiteJobProperties(delegate, 'mergebot')
+
+  triggers {
+    githubPush()
+  }
+
+  steps {
+    // Run the following shell script as a build step.
+    shell '''
+        # Install RVM per instructions at https://rvm.io/rvm/install.
+        RVM_GPG_KEY=409B6B1796C275462A1703113804BB82D39DC0E3
+        gpg --keyserver hkp://keys.gnupg.net --recv-keys $RVM_GPG_KEY
+            
+        \\curl -sSL https://get.rvm.io | bash
+        source /home/jenkins/.rvm/scripts/rvm
+
+        # Install Ruby.
+        RUBY_VERSION_NUM=2.3.0
+        rvm install ruby $RUBY_VERSION_NUM --autolibs=read-only
+
+        # Install Bundler gem
+        PATH=~/.gem/ruby/$RUBY_VERSION_NUM/bin:$PATH
+        GEM_PATH=~/.gem/ruby/$RUBY_VERSION_NUM/:$GEM_PATH
+        gem install bundler --user-install
+
+        # Install all needed gems.
+        bundle install --path ~/.gem/
+
+        # Build the new site and test it.
+        rm -fr ./content/
+        bundle exec rake test
+    '''.stripIndent().trim()
+  }
+}


[29/43] beam git commit: This closes #3492: [BEAM-2551] KafkaIO reader blocks indefinitely in case of network issues

Posted by ke...@apache.org.
This closes #3492: [BEAM-2551] KafkaIO reader blocks indefinitely in case of network issues

  Add timeout to initialization of partition in KafkaIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b8f8d18a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b8f8d18a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b8f8d18a

Branch: refs/heads/gearpump-runner
Commit: b8f8d18ae2cdbb4874d9a0f45038037ecc2381d1
Parents: 85a99e2 526037b
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 6 11:59:18 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 6 11:59:18 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 81 +++++++++++++++-----
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 30 ++++++++
 2 files changed, 92 insertions(+), 19 deletions(-)
----------------------------------------------------------------------



[02/43] beam git commit: Properly convert milliseconds whether there's less than 3/more than 9 digits.

Posted by ke...@apache.org.
Properly convert milliseconds whether there's less than 3/more than 9 digits.

This closes #3469


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f75dfe71
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f75dfe71
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f75dfe71

Branch: refs/heads/gearpump-runner
Commit: f75dfe711b357800762a93dc5059d9a19da9ee61
Parents: 893bf42 7b8cd64
Author: Luke Cwik <lc...@google.com>
Authored: Fri Jun 30 09:00:13 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Jun 30 09:00:13 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/util/TimeUtil.java    | 24 ++++++++------------
 .../runners/dataflow/util/TimeUtilTest.java     |  6 +++++
 2 files changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------



[13/43] beam git commit: [maven-release-plugin] prepare for next development iteration

Posted by ke...@apache.org.
[maven-release-plugin] prepare for next development iteration


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7f0723cf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7f0723cf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7f0723cf

Branch: refs/heads/gearpump-runner
Commit: 7f0723cf7fd587c4f8dbe8a8b1c9d298a7b1e5e3
Parents: 51877a3
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed Jul 5 16:47:38 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed Jul 5 16:47:38 2017 +0200

----------------------------------------------------------------------
 examples/java/pom.xml                                   | 2 +-
 examples/java8/pom.xml                                  | 2 +-
 examples/pom.xml                                        | 2 +-
 pom.xml                                                 | 4 ++--
 runners/apex/pom.xml                                    | 2 +-
 runners/core-construction-java/pom.xml                  | 2 +-
 runners/core-java/pom.xml                               | 2 +-
 runners/direct-java/pom.xml                             | 2 +-
 runners/flink/pom.xml                                   | 2 +-
 runners/google-cloud-dataflow-java/pom.xml              | 2 +-
 runners/pom.xml                                         | 2 +-
 runners/spark/pom.xml                                   | 2 +-
 sdks/common/fn-api/pom.xml                              | 2 +-
 sdks/common/pom.xml                                     | 2 +-
 sdks/common/runner-api/pom.xml                          | 2 +-
 sdks/java/build-tools/pom.xml                           | 2 +-
 sdks/java/core/pom.xml                                  | 2 +-
 sdks/java/extensions/google-cloud-platform-core/pom.xml | 2 +-
 sdks/java/extensions/jackson/pom.xml                    | 2 +-
 sdks/java/extensions/join-library/pom.xml               | 2 +-
 sdks/java/extensions/pom.xml                            | 2 +-
 sdks/java/extensions/protobuf/pom.xml                   | 2 +-
 sdks/java/extensions/sorter/pom.xml                     | 2 +-
 sdks/java/harness/pom.xml                               | 2 +-
 sdks/java/io/amqp/pom.xml                               | 2 +-
 sdks/java/io/cassandra/pom.xml                          | 2 +-
 sdks/java/io/common/pom.xml                             | 2 +-
 sdks/java/io/elasticsearch/pom.xml                      | 2 +-
 sdks/java/io/google-cloud-platform/pom.xml              | 2 +-
 sdks/java/io/hadoop-common/pom.xml                      | 2 +-
 sdks/java/io/hadoop-file-system/pom.xml                 | 2 +-
 sdks/java/io/hadoop/input-format/pom.xml                | 2 +-
 sdks/java/io/hadoop/jdk1.8-tests/pom.xml                | 2 +-
 sdks/java/io/hadoop/pom.xml                             | 2 +-
 sdks/java/io/hbase/pom.xml                              | 2 +-
 sdks/java/io/hcatalog/pom.xml                           | 2 +-
 sdks/java/io/jdbc/pom.xml                               | 2 +-
 sdks/java/io/jms/pom.xml                                | 2 +-
 sdks/java/io/kafka/pom.xml                              | 2 +-
 sdks/java/io/kinesis/pom.xml                            | 2 +-
 sdks/java/io/mongodb/pom.xml                            | 2 +-
 sdks/java/io/mqtt/pom.xml                               | 2 +-
 sdks/java/io/pom.xml                                    | 2 +-
 sdks/java/io/xml/pom.xml                                | 2 +-
 sdks/java/java8tests/pom.xml                            | 2 +-
 sdks/java/javadoc/pom.xml                               | 2 +-
 sdks/java/maven-archetypes/examples-java8/pom.xml       | 2 +-
 sdks/java/maven-archetypes/examples/pom.xml             | 2 +-
 sdks/java/maven-archetypes/pom.xml                      | 2 +-
 sdks/java/maven-archetypes/starter/pom.xml              | 2 +-
 sdks/java/pom.xml                                       | 2 +-
 sdks/pom.xml                                            | 2 +-
 sdks/python/pom.xml                                     | 2 +-
 53 files changed, 54 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 7ae4e6a..ae64a79 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-examples-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/examples/java8/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml
index a0ce708..6fd29a4 100644
--- a/examples/java8/pom.xml
+++ b/examples/java8/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-examples-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index a7e61dd..51f4c35 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 057954a..49e6edd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
   <url>http://beam.apache.org/</url>
   <inceptionYear>2016</inceptionYear>
 
-  <version>2.1.0-SNAPSHOT</version>
+  <version>2.2.0-SNAPSHOT</version>
 
   <licenses>
     <license>
@@ -48,7 +48,7 @@
     <connection>scm:git:https://git-wip-us.apache.org/repos/asf/beam.git</connection>
     <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/beam.git</developerConnection>
     <url>https://git-wip-us.apache.org/repos/asf?p=beam.git;a=summary</url>
-    <tag>release-2.1.0</tag>
+    <tag>HEAD</tag>
   </scm>
 
   <issueManagement>

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 20f2d28..fd5aafb 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index 67951e9..b85b5f5 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>beam-runners-parent</artifactId>
     <groupId>org.apache.beam</groupId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index c3a8d25..8c8e599 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 5b5aec2..0e1f73a 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 339aa8e..c063a2d 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 2ba163b..91908cd 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 38aada8..b00ba9c 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 8a69496..7f70204 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/common/fn-api/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/pom.xml b/sdks/common/fn-api/pom.xml
index 77a9ba5..6810667 100644
--- a/sdks/common/fn-api/pom.xml
+++ b/sdks/common/fn-api/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-common-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/pom.xml b/sdks/common/pom.xml
index c621ed5..40eefa7 100644
--- a/sdks/common/pom.xml
+++ b/sdks/common/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/common/runner-api/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/pom.xml b/sdks/common/runner-api/pom.xml
index f5536a7..8bc4123 100644
--- a/sdks/common/runner-api/pom.xml
+++ b/sdks/common/runner-api/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-common-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/build-tools/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/pom.xml b/sdks/java/build-tools/pom.xml
index 5a2c498..d7d25f6 100644
--- a/sdks/java/build-tools/pom.xml
+++ b/sdks/java/build-tools/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../../../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 11b68e6..3f12dc4 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/extensions/google-cloud-platform-core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/pom.xml b/sdks/java/extensions/google-cloud-platform-core/pom.xml
index e4e951b..7d54990 100644
--- a/sdks/java/extensions/google-cloud-platform-core/pom.xml
+++ b/sdks/java/extensions/google-cloud-platform-core/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-extensions-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/extensions/jackson/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/jackson/pom.xml b/sdks/java/extensions/jackson/pom.xml
index 4b09c11..7fd38e0 100644
--- a/sdks/java/extensions/jackson/pom.xml
+++ b/sdks/java/extensions/jackson/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-extensions-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/extensions/join-library/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/pom.xml b/sdks/java/extensions/join-library/pom.xml
index 556ec40..ea24b75 100644
--- a/sdks/java/extensions/join-library/pom.xml
+++ b/sdks/java/extensions/join-library/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-extensions-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
index 3d63626..1222476 100644
--- a/sdks/java/extensions/pom.xml
+++ b/sdks/java/extensions/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/extensions/protobuf/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/pom.xml b/sdks/java/extensions/protobuf/pom.xml
index ae909ab..63855f8 100644
--- a/sdks/java/extensions/protobuf/pom.xml
+++ b/sdks/java/extensions/protobuf/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-extensions-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/extensions/sorter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/pom.xml b/sdks/java/extensions/sorter/pom.xml
index ac61f76..395c73f 100644
--- a/sdks/java/extensions/sorter/pom.xml
+++ b/sdks/java/extensions/sorter/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-extensions-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index a35481d..9cfadc2 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/amqp/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/pom.xml b/sdks/java/io/amqp/pom.xml
index 45b295d..8da9448 100644
--- a/sdks/java/io/amqp/pom.xml
+++ b/sdks/java/io/amqp/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/pom.xml b/sdks/java/io/cassandra/pom.xml
index 8249f57..c74477e 100644
--- a/sdks/java/io/cassandra/pom.xml
+++ b/sdks/java/io/cassandra/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/pom.xml b/sdks/java/io/common/pom.xml
index f7525fd..df0d94b 100644
--- a/sdks/java/io/common/pom.xml
+++ b/sdks/java/io/common/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-io-parent</artifactId>
-        <version>2.1.0-SNAPSHOT</version>
+        <version>2.2.0-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/pom.xml b/sdks/java/io/elasticsearch/pom.xml
index c8e308c..e0a7f21 100644
--- a/sdks/java/io/elasticsearch/pom.xml
+++ b/sdks/java/io/elasticsearch/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 09a430a..a1495f2 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/pom.xml b/sdks/java/io/hadoop-common/pom.xml
index 8749243..4bcbcd7 100644
--- a/sdks/java/io/hadoop-common/pom.xml
+++ b/sdks/java/io/hadoop-common/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/hadoop-file-system/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/pom.xml b/sdks/java/io/hadoop-file-system/pom.xml
index a54977e..a9c2e57 100644
--- a/sdks/java/io/hadoop-file-system/pom.xml
+++ b/sdks/java/io/hadoop-file-system/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/hadoop/input-format/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/pom.xml b/sdks/java/io/hadoop/input-format/pom.xml
index 06f9f11..0953119 100644
--- a/sdks/java/io/hadoop/input-format/pom.xml
+++ b/sdks/java/io/hadoop/input-format/pom.xml
@@ -20,7 +20,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-hadoop-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
   <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
index baaa982..12944f4 100644
--- a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
+++ b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
@@ -26,7 +26,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-hadoop-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
   <artifactId>beam-sdks-java-io-hadoop-jdk1.8-tests</artifactId>

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/pom.xml b/sdks/java/io/hadoop/pom.xml
index a1c7a2e..bc3569d 100644
--- a/sdks/java/io/hadoop/pom.xml
+++ b/sdks/java/io/hadoop/pom.xml
@@ -20,7 +20,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
   <packaging>pom</packaging>

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
index 40ac8df..40f516a 100644
--- a/sdks/java/io/hbase/pom.xml
+++ b/sdks/java/io/hbase/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/pom.xml b/sdks/java/io/hcatalog/pom.xml
index a31ff86..2aa661e 100644
--- a/sdks/java/io/hcatalog/pom.xml
+++ b/sdks/java/io/hcatalog/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index 45ec06c..050fc6a 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/jms/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/pom.xml b/sdks/java/io/jms/pom.xml
index 58009a1..c2074af 100644
--- a/sdks/java/io/jms/pom.xml
+++ b/sdks/java/io/jms/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index 29350cc..1256c46 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/pom.xml b/sdks/java/io/kinesis/pom.xml
index cb7064b..46d5e26 100644
--- a/sdks/java/io/kinesis/pom.xml
+++ b/sdks/java/io/kinesis/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml
index 912e20c..d93cc41 100644
--- a/sdks/java/io/mongodb/pom.xml
+++ b/sdks/java/io/mongodb/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/pom.xml b/sdks/java/io/mqtt/pom.xml
index baaf771..9fa1dc0 100644
--- a/sdks/java/io/mqtt/pom.xml
+++ b/sdks/java/io/mqtt/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 458dfaf..b7909fa 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/io/xml/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/pom.xml b/sdks/java/io/xml/pom.xml
index cf7dd33..7b5804e 100644
--- a/sdks/java/io/xml/pom.xml
+++ b/sdks/java/io/xml/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/java8tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/pom.xml b/sdks/java/java8tests/pom.xml
index b90a757..2378014 100644
--- a/sdks/java/java8tests/pom.xml
+++ b/sdks/java/java8tests/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/javadoc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml
index 08d5ec6..ddb92cf 100644
--- a/sdks/java/javadoc/pom.xml
+++ b/sdks/java/javadoc/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../../../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/maven-archetypes/examples-java8/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples-java8/pom.xml b/sdks/java/maven-archetypes/examples-java8/pom.xml
index b57644d..b60a695 100644
--- a/sdks/java/maven-archetypes/examples-java8/pom.xml
+++ b/sdks/java/maven-archetypes/examples-java8/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/maven-archetypes/examples/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/pom.xml b/sdks/java/maven-archetypes/examples/pom.xml
index c1378cb..2a02039 100644
--- a/sdks/java/maven-archetypes/examples/pom.xml
+++ b/sdks/java/maven-archetypes/examples/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/maven-archetypes/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/pom.xml b/sdks/java/maven-archetypes/pom.xml
index b7fe274..d676b31 100644
--- a/sdks/java/maven-archetypes/pom.xml
+++ b/sdks/java/maven-archetypes/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/maven-archetypes/starter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml
index 06b41c8..8024b52 100644
--- a/sdks/java/maven-archetypes/starter/pom.xml
+++ b/sdks/java/maven-archetypes/starter/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 250c85a..3144193 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/pom.xml b/sdks/pom.xml
index 27b9610..aec8762 100644
--- a/sdks/pom.xml
+++ b/sdks/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7f0723cf/sdks/python/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/python/pom.xml b/sdks/python/pom.xml
index 1295654..1077689 100644
--- a/sdks/python/pom.xml
+++ b/sdks/python/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 


[14/43] beam git commit: Update Python SDK version

Posted by ke...@apache.org.
Update Python SDK version


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/14fa7f79
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/14fa7f79
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/14fa7f79

Branch: refs/heads/gearpump-runner
Commit: 14fa7f79f0830739122b7573e032ad0aea172a98
Parents: 7f0723c
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed Jul 5 16:52:48 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed Jul 5 16:52:48 2017 +0200

----------------------------------------------------------------------
 sdks/python/apache_beam/version.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/14fa7f79/sdks/python/apache_beam/version.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/version.py b/sdks/python/apache_beam/version.py
index ae92a23..8b0a430 100644
--- a/sdks/python/apache_beam/version.py
+++ b/sdks/python/apache_beam/version.py
@@ -18,4 +18,4 @@
 """Apache Beam SDK version information and utilities."""
 
 
-__version__ = '2.1.0.dev'
+__version__ = '2.2.0.dev'


[33/43] beam git commit: Made DataflowRunner TransformTranslator public

Posted by ke...@apache.org.
Made DataflowRunner TransformTranslator public


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/da92256b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/da92256b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/da92256b

Branch: refs/heads/gearpump-runner
Commit: da92256ba64c5f4777776bd6283db2484bd72293
Parents: 1e16aa2
Author: Jeremie Lenfant-Engelmann <je...@google.com>
Authored: Wed Jun 28 16:11:21 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 6 21:15:05 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/runners/dataflow/TransformTranslator.java     | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/da92256b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
index a7452b2..7f61b6c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -36,7 +36,8 @@ import org.apache.beam.sdk.values.TupleTag;
  * A {@link TransformTranslator} knows how to translate a particular subclass of {@link PTransform}
  * for the Cloud Dataflow service. It does so by mutating the {@link TranslationContext}.
  */
-interface TransformTranslator<TransformT extends PTransform> {
+@Internal
+public interface TransformTranslator<TransformT extends PTransform> {
   void translate(TransformT transform, TranslationContext context);
 
   /**


[23/43] beam git commit: This closes #3496: Disallow Combiner Lifting for multi-window WindowFns

Posted by ke...@apache.org.
This closes #3496: Disallow Combiner Lifting for multi-window WindowFns


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e8fdd373
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e8fdd373
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e8fdd373

Branch: refs/heads/gearpump-runner
Commit: e8fdd373c7b86652fda94ff76ef7246357e6288c
Parents: ea9f7fb 29c2bca
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jul 5 16:41:16 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Jul 5 16:41:16 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/dataflow/DataflowPipelineTranslator.java    | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------



[35/43] beam git commit: Fix bad merge

Posted by ke...@apache.org.
Fix bad merge


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/17bc3b14
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/17bc3b14
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/17bc3b14

Branch: refs/heads/gearpump-runner
Commit: 17bc3b140c7c7315880ce18d4e15d6ac512c35d2
Parents: 17d7e59
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 6 21:45:39 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 6 21:45:39 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/runners/dataflow/TransformTranslator.java  | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/17bc3b14/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
index 7f61b6c..06ed1e0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.OutputReference;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;


[22/43] beam git commit: This closes #3497

Posted by ke...@apache.org.
This closes #3497


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ea9f7fbd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ea9f7fbd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ea9f7fbd

Branch: refs/heads/gearpump-runner
Commit: ea9f7fbdde5ede5e5f5060da5dfe2749992a2fa7
Parents: 63f66d6 23e385f
Author: Ahmet Altay <al...@google.com>
Authored: Wed Jul 5 16:11:46 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Jul 5 16:11:46 2017 -0700

----------------------------------------------------------------------
 sdks/python/setup.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[04/43] beam git commit: This closes #3355: Add support for PipelineOptions parameters

Posted by ke...@apache.org.
This closes #3355: Add support for PipelineOptions parameters


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e429b33
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e429b33
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e429b33

Branch: refs/heads/gearpump-runner
Commit: 0e429b33ff85eba08da5018c9febd0b99b44f720
Parents: f75dfe7 56cb6c5
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jun 30 15:39:20 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 30 15:39:20 2017 -0700

----------------------------------------------------------------------
 ...eBoundedSplittableProcessElementInvoker.java |  5 ++
 .../beam/runners/core/SimpleDoFnRunner.java     | 20 +++++++
 .../apache/beam/sdk/transforms/DoFnTester.java  |  5 ++
 .../reflect/ByteBuddyDoFnInvokerFactory.java    |  6 ++
 .../sdk/transforms/reflect/DoFnInvoker.java     | 13 +++-
 .../sdk/transforms/reflect/DoFnSignature.java   | 23 +++++++
 .../sdk/transforms/reflect/DoFnSignatures.java  | 22 ++++++-
 .../apache/beam/sdk/transforms/ParDoTest.java   | 63 ++++++++++++++++++++
 .../transforms/reflect/DoFnSignaturesTest.java  | 14 +++++
 9 files changed, 169 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[17/43] beam git commit: [BEAM-2553] Update Maven exec plugin to 1.6.0 to incorporate messaging improvements

Posted by ke...@apache.org.
[BEAM-2553] Update Maven exec plugin to 1.6.0 to incorporate messaging improvements


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6ca410a9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6ca410a9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6ca410a9

Branch: refs/heads/gearpump-runner
Commit: 6ca410a908f1f4e7ac1e141ee1335f7a537bb150
Parents: f0549b4
Author: Luke Cwik <lc...@google.com>
Authored: Wed Jul 5 10:38:44 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 5 10:38:44 2017 -0700

----------------------------------------------------------------------
 pom.xml                                                            | 2 +-
 .../starter/src/test/resources/projects/basic/reference/pom.xml    | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6ca410a9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 49e6edd..01474c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,7 +159,7 @@
     <failsafe-plugin.version>2.20</failsafe-plugin.version>
     <maven-compiler-plugin.version>3.6.1</maven-compiler-plugin.version>
     <maven-dependency-plugin.version>3.0.1</maven-dependency-plugin.version>
-    <maven-exec-plugin.version>1.4.0</maven-exec-plugin.version>
+    <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
     <maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
     <maven-resources-plugin.version>3.0.2</maven-resources-plugin.version>
     <maven-shade-plugin.version>3.0.0</maven-shade-plugin.version>

http://git-wip-us.apache.org/repos/asf/beam/blob/6ca410a9/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
index 60405e6..6056fb0 100644
--- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
+++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
@@ -28,7 +28,7 @@
     <beam.version>@project.version@</beam.version>
 
     <maven-compiler-plugin.version>3.6.1</maven-compiler-plugin.version>
-    <maven-exec-plugin.version>1.4.0</maven-exec-plugin.version>
+    <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
     <slf4j.version>1.7.14</slf4j.version>
   </properties>
 


[37/43] beam git commit: This closes #3470: [BEAM-940] ByteBuddyOnTimerInvokerFactory: key the cache with a (Class, id) tuple or OnTimerMethod

Posted by ke...@apache.org.
This closes #3470: [BEAM-940] ByteBuddyOnTimerInvokerFactory: key the cache with a (Class, id) tuple or OnTimerMethod

  Simplified ByteBuddyOnTimerInvokerFactory


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/440c7d45
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/440c7d45
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/440c7d45

Branch: refs/heads/gearpump-runner
Commit: 440c7d45b21e64d49566f0674a2dc91f6d284257
Parents: 17bc3b1 c8d9833
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 6 21:47:11 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 6 21:47:11 2017 -0700

----------------------------------------------------------------------
 .../reflect/ByteBuddyOnTimerInvokerFactory.java | 73 ++++++++------------
 .../reflect/OnTimerMethodSpecifier.java         | 37 ++++++++++
 2 files changed, 65 insertions(+), 45 deletions(-)
----------------------------------------------------------------------



[36/43] beam git commit: Simplified ByteBuddyOnTimerInvokerFactory

Posted by ke...@apache.org.
Simplified ByteBuddyOnTimerInvokerFactory


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c8d98336
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c8d98336
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c8d98336

Branch: refs/heads/gearpump-runner
Commit: c8d983363efd3f3d93825ecc8e8abae2dfa4e008
Parents: 17bc3b1
Author: Innocent Djiofack <dj...@gmail.com>
Authored: Wed Jun 28 22:15:11 2017 -0400
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 6 21:46:53 2017 -0700

----------------------------------------------------------------------
 .../reflect/ByteBuddyOnTimerInvokerFactory.java | 73 ++++++++------------
 .../reflect/OnTimerMethodSpecifier.java         | 37 ++++++++++
 2 files changed, 65 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c8d98336/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
index e031337..5e31f2e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
+
 import com.google.common.base.CharMatcher;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -61,13 +62,14 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory {
 
     @SuppressWarnings("unchecked")
     Class<? extends DoFn<?, ?>> fnClass = (Class<? extends DoFn<?, ?>>) fn.getClass();
-
     try {
-      Constructor<?> constructor = constructorCache.get(fnClass).get(timerId);
-      @SuppressWarnings("unchecked")
-      OnTimerInvoker<InputT, OutputT> invoker =
+        OnTimerMethodSpecifier onTimerMethodSpecifier =
+                OnTimerMethodSpecifier.forClassAndTimerId(fnClass, timerId);
+        Constructor<?> constructor = constructorCache.get(onTimerMethodSpecifier);
+
+        OnTimerInvoker<InputT, OutputT> invoker =
           (OnTimerInvoker<InputT, OutputT>) constructor.newInstance(fn);
-      return invoker;
+        return invoker;
     } catch (InstantiationException
         | IllegalAccessException
         | IllegalArgumentException
@@ -97,50 +99,31 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory {
   private static final String FN_DELEGATE_FIELD_NAME = "delegate";
 
   /**
-   * A cache of constructors of generated {@link OnTimerInvoker} classes, keyed by {@link DoFn}
-   * class and then by {@link TimerId}.
+   * A cache of constructors of generated {@link OnTimerInvoker} classes,
+   * keyed by {@link OnTimerMethodSpecifier}.
    *
    * <p>Needed because generating an invoker class is expensive, and to avoid generating an
    * excessive number of classes consuming PermGen memory in Java's that still have PermGen.
    */
-  private final LoadingCache<Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>>
-      constructorCache =
-          CacheBuilder.newBuilder()
-              .build(
-                  new CacheLoader<
-                      Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>>() {
-                    @Override
-                    public LoadingCache<String, Constructor<?>> load(
-                        final Class<? extends DoFn<?, ?>> fnClass) throws Exception {
-                      return CacheBuilder.newBuilder().build(new OnTimerConstructorLoader(fnClass));
-                    }
-                  });
-
-  /**
-   * A cache loader fixed to a particular {@link DoFn} class that loads constructors for the
-   * invokers for its {@link OnTimer @OnTimer} methods.
-   */
-  private static class OnTimerConstructorLoader extends CacheLoader<String, Constructor<?>> {
-
-    private final DoFnSignature signature;
-
-    public OnTimerConstructorLoader(Class<? extends DoFn<?, ?>> clazz) {
-      this.signature = DoFnSignatures.getSignature(clazz);
-    }
-
-    @Override
-    public Constructor<?> load(String timerId) throws Exception {
-      Class<? extends OnTimerInvoker<?, ?>> invokerClass =
-          generateOnTimerInvokerClass(signature, timerId);
-      try {
-        return invokerClass.getConstructor(signature.fnClass());
-      } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  /**
+  private final LoadingCache<OnTimerMethodSpecifier, Constructor<?>> constructorCache =
+          CacheBuilder.newBuilder().build(
+          new CacheLoader<OnTimerMethodSpecifier, Constructor<?>>() {
+              @Override
+              public Constructor<?> load(final OnTimerMethodSpecifier onTimerMethodSpecifier)
+                      throws Exception {
+                  DoFnSignature signature =
+                          DoFnSignatures.getSignature(onTimerMethodSpecifier.fnClass());
+                  Class<? extends OnTimerInvoker<?, ?>> invokerClass =
+                          generateOnTimerInvokerClass(signature, onTimerMethodSpecifier.timerId());
+                  try {
+                      return invokerClass.getConstructor(signature.fnClass());
+                  } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
+                      throw new RuntimeException(e);
+                  }
+
+              }
+          });
+    /**
    * Generates a {@link OnTimerInvoker} class for the given {@link DoFnSignature} and {@link
    * TimerId}.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/c8d98336/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java
new file mode 100644
index 0000000..edf7e3c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Used by {@link ByteBuddyOnTimerInvokerFactory} to Dynamically generate
+ * {@link OnTimerInvoker} instances for invoking a particular
+ * {@link DoFn.TimerId} on a particular {@link DoFn}.
+ */
+
+@AutoValue
+abstract class OnTimerMethodSpecifier {
+    public abstract Class<? extends DoFn<?, ?>> fnClass();
+    public abstract String timerId();
+    public static OnTimerMethodSpecifier
+    forClassAndTimerId(Class<? extends DoFn<?, ?>> fnClass, String timerId){
+        return  new AutoValue_OnTimerMethodSpecifier(fnClass, timerId);
+    }
+}


[07/43] beam git commit: This closes #3291

Posted by ke...@apache.org.
This closes #3291


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/11010cf0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/11010cf0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/11010cf0

Branch: refs/heads/gearpump-runner
Commit: 11010cf08ec9ca90a843cc15ea8f0fd4910f7fab
Parents: 0e429b3 ce4e517
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Mon Jul 3 09:26:40 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Mon Jul 3 09:26:40 2017 +0200

----------------------------------------------------------------------
 examples/java/pom.xml                           | 18 ++++--
 examples/java8/pom.xml                          | 18 ++++--
 pom.xml                                         | 65 +++++++++++++++++++-
 runners/apex/pom.xml                            |  2 +-
 runners/spark/pom.xml                           |  7 ---
 sdks/java/extensions/sorter/pom.xml             |  6 --
 sdks/java/io/hadoop-file-system/pom.xml         | 31 ----------
 sdks/java/io/hadoop/jdk1.8-tests/pom.xml        |  2 -
 sdks/java/io/hbase/pom.xml                      |  9 ++-
 sdks/java/io/hcatalog/pom.xml                   |  6 +-
 sdks/java/io/jdbc/pom.xml                       |  2 -
 sdks/java/io/pom.xml                            | 31 ----------
 sdks/java/javadoc/pom.xml                       |  2 -
 .../main/resources/archetype-resources/pom.xml  |  1 -
 .../main/resources/archetype-resources/pom.xml  |  1 -
 15 files changed, 98 insertions(+), 103 deletions(-)
----------------------------------------------------------------------



[43/43] beam git commit: This closes #3515: Sync gearpump-runner with master and upgrade to gearpump 0.8.4

Posted by ke...@apache.org.
This closes #3515: Sync gearpump-runner with master and upgrade to gearpump 0.8.4

  Fix ParDoTest#testPipelineOptionsParameter
  Upgrade to gearpump 0.8.4
  Fix javadoc generation for AmqpIO, CassandraIO and HCatalogIO
  Simplified ByteBuddyOnTimerInvokerFactory
  Fix bad merge
  Made DataflowRunner TransformTranslator public
  Process timer firings for a window together
  Ignore processing time timers in expired windows
  Add timeout to initialization of partition in KafkaIO
  [BEAM-2534] Handle offset gaps in Kafka messages.
  Fix PValue input in _PubSubReadEvaluator
  Update SDK dependencies
  Disallow Combiner Lifting for multi-window WindowFns
  [BEAM-2553] Update Maven exec plugin to 1.6.0 to incorporate messaging improvements
  Website Mergebot Job
  Update Python SDK version
  [maven-release-plugin] prepare for next development iteration
  [maven-release-plugin] prepare branch release-2.1.0
  For GCS operations use an http client with a default timeout value.
  [BEAM-2530] Fix compilation of modules with Java 9 that depend on jdk.tools
  Make modules that depend on Hadoop and Spark use the same version property
  Fix DoFn javadoc: StateSpec does not require a key
  Add support for PipelineOptions parameters
  Properly convert milliseconds whether there's less than 3/more than 9 digits. TimeUtil did not properly convert (and returned null) when the number of digits for fractions of seconds was less than 3 digits or more than 9 digits. The solution is to pad with zeros when there is less than 3 digits and to truncate when there is more than 3.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ce60b48
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ce60b48
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ce60b48

Branch: refs/heads/gearpump-runner
Commit: 1ce60b488e991562712937aaaf13564e3a909117
Parents: 627ae0b 725f547
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Jul 9 21:47:12 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Jul 9 21:47:12 2017 -0700

----------------------------------------------------------------------
 .../jenkins/common_job_properties.groovy        |   5 +-
 .../job_beam_PreCommit_Website_Merge.groovy     |  59 +++++++++
 examples/java/pom.xml                           |  20 ++-
 examples/java8/pom.xml                          |  20 ++-
 .../examples/complete/game/LeaderBoardTest.java |   2 +
 examples/pom.xml                                |   2 +-
 pom.xml                                         |  87 ++++++++++++-
 runners/apex/pom.xml                            |  11 +-
 runners/core-construction-java/pom.xml          |   2 +-
 runners/core-java/pom.xml                       |   2 +-
 ...eBoundedSplittableProcessElementInvoker.java |   5 +
 .../beam/runners/core/ReduceFnRunner.java       | 106 ++++++++++-----
 .../beam/runners/core/SimpleDoFnRunner.java     |  20 +++
 .../beam/runners/core/ReduceFnRunnerTest.java   |  81 +++++++++++-
 runners/direct-java/pom.xml                     |   4 +-
 runners/flink/pom.xml                           |   2 +-
 runners/gearpump/pom.xml                        |   4 +-
 .../gearpump/translators/io/GearpumpSource.java |  12 +-
 .../translators/utils/DoFnRunnerFactory.java    |   5 +-
 .../translators/utils/TranslatorUtils.java      |  19 +++
 runners/google-cloud-dataflow-java/pom.xml      |   2 +-
 .../dataflow/DataflowPipelineTranslator.java    |   1 +
 .../runners/dataflow/TransformTranslator.java   |   4 +-
 .../beam/runners/dataflow/util/TimeUtil.java    |  24 ++--
 .../runners/dataflow/util/TimeUtilTest.java     |   6 +
 runners/pom.xml                                 |   2 +-
 runners/spark/pom.xml                           |  16 +--
 sdks/common/fn-api/pom.xml                      |   2 +-
 sdks/common/pom.xml                             |   2 +-
 sdks/common/runner-api/pom.xml                  |   2 +-
 sdks/java/build-tools/pom.xml                   |   2 +-
 sdks/java/core/pom.xml                          |   2 +-
 .../org/apache/beam/sdk/transforms/DoFn.java    |   2 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |   5 +
 .../reflect/ByteBuddyDoFnInvokerFactory.java    |   6 +
 .../reflect/ByteBuddyOnTimerInvokerFactory.java |  73 ++++-------
 .../sdk/transforms/reflect/DoFnInvoker.java     |  13 +-
 .../sdk/transforms/reflect/DoFnSignature.java   |  23 ++++
 .../sdk/transforms/reflect/DoFnSignatures.java  |  22 +++-
 .../reflect/OnTimerMethodSpecifier.java         |  37 ++++++
 .../apache/beam/sdk/transforms/ParDoTest.java   |  63 +++++++++
 .../transforms/reflect/DoFnSignaturesTest.java  |  14 ++
 .../google-cloud-platform-core/pom.xml          |   2 +-
 sdks/java/extensions/jackson/pom.xml            |   2 +-
 sdks/java/extensions/join-library/pom.xml       |   2 +-
 sdks/java/extensions/pom.xml                    |   2 +-
 sdks/java/extensions/protobuf/pom.xml           |   2 +-
 sdks/java/extensions/sorter/pom.xml             |   8 +-
 sdks/java/harness/pom.xml                       |   2 +-
 sdks/java/io/amqp/pom.xml                       |   2 +-
 sdks/java/io/cassandra/pom.xml                  |   2 +-
 sdks/java/io/common/pom.xml                     |   2 +-
 sdks/java/io/elasticsearch/pom.xml              |   2 +-
 sdks/java/io/google-cloud-platform/pom.xml      |   2 +-
 sdks/java/io/hadoop-common/pom.xml              |   2 +-
 sdks/java/io/hadoop-file-system/pom.xml         |  33 +----
 sdks/java/io/hadoop/input-format/pom.xml        |   2 +-
 sdks/java/io/hadoop/jdk1.8-tests/pom.xml        |   4 +-
 sdks/java/io/hadoop/pom.xml                     |   2 +-
 sdks/java/io/hbase/pom.xml                      |  18 ++-
 sdks/java/io/hcatalog/pom.xml                   |  20 ++-
 sdks/java/io/jdbc/pom.xml                       |   4 +-
 sdks/java/io/jms/pom.xml                        |   2 +-
 sdks/java/io/kafka/pom.xml                      |   2 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 130 +++++++++++++------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   |  30 +++++
 sdks/java/io/kinesis/pom.xml                    |   2 +-
 sdks/java/io/mongodb/pom.xml                    |   2 +-
 sdks/java/io/mqtt/pom.xml                       |   2 +-
 sdks/java/io/pom.xml                            |  33 +----
 sdks/java/io/xml/pom.xml                        |   2 +-
 sdks/java/java8tests/pom.xml                    |   2 +-
 sdks/java/javadoc/pom.xml                       |  19 ++-
 .../maven-archetypes/examples-java8/pom.xml     |   2 +-
 .../main/resources/archetype-resources/pom.xml  |   1 -
 sdks/java/maven-archetypes/examples/pom.xml     |   2 +-
 .../main/resources/archetype-resources/pom.xml  |   1 -
 sdks/java/maven-archetypes/pom.xml              |   2 +-
 sdks/java/maven-archetypes/starter/pom.xml      |   2 +-
 .../resources/projects/basic/reference/pom.xml  |   2 +-
 sdks/java/pom.xml                               |   2 +-
 sdks/pom.xml                                    |   2 +-
 sdks/python/apache_beam/io/gcp/gcsio.py         |  10 +-
 .../runners/direct/transform_evaluator.py       |   5 +-
 sdks/python/apache_beam/version.py              |   2 +-
 sdks/python/pom.xml                             |   2 +-
 sdks/python/setup.py                            |   4 +-
 87 files changed, 854 insertions(+), 317 deletions(-)
----------------------------------------------------------------------



[27/43] beam git commit: This closes #3461

Posted by ke...@apache.org.
This closes #3461


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/85a99e29
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/85a99e29
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/85a99e29

Branch: refs/heads/gearpump-runner
Commit: 85a99e29448670ae6728a8ee2e4cd3ef95877c3e
Parents: bf6dda3 2259c30
Author: JingsongLi <lz...@aliyun.com>
Authored: Fri Jul 7 00:48:36 2017 +0800
Committer: JingsongLi <lz...@aliyun.com>
Committed: Fri Jul 7 00:48:36 2017 +0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 49 ++++++++++++--------
 1 file changed, 29 insertions(+), 20 deletions(-)
----------------------------------------------------------------------



[30/43] beam git commit: Ignore processing time timers in expired windows

Posted by ke...@apache.org.
Ignore processing time timers in expired windows


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd631b89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd631b89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd631b89

Branch: refs/heads/gearpump-runner
Commit: bd631b89a8434f0756e1596875e89013fb623ab5
Parents: b8f8d18
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 18:09:11 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 6 14:12:39 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/ReduceFnRunner.java       | 10 ++++++
 .../beam/runners/core/ReduceFnRunnerTest.java   | 32 ++++++++++++++++++++
 2 files changed, 42 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bd631b89/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index ef33bef..0632c05 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -693,6 +693,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       @SuppressWarnings("unchecked")
         WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
       W window = windowNamespace.getWindow();
+
+      if (TimeDomain.PROCESSING_TIME == timer.getDomain() && windowIsExpired(window)) {
+        continue;
+      }
+
       ReduceFn<K, InputT, OutputT, W>.Context directContext =
           contextFactory.base(window, StateStyle.DIRECT);
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
@@ -1090,4 +1095,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     }
   }
 
+  private boolean windowIsExpired(BoundedWindow w) {
+    return timerInternals
+        .currentInputWatermarkTime()
+        .isAfter(w.maxTimestamp().plus(windowingStrategy.getAllowedLateness()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bd631b89/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 3a2c220..79ee91b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -286,6 +286,38 @@ public class ReduceFnRunnerTest {
 
   /**
    * Tests that when a processing time timer comes in after a window is expired
+   * it is just ignored.
+   */
+  @Test
+  public void testLateProcessingTimeTimer() throws Exception {
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.ZERO)
+            .withTrigger(
+                Repeatedly.forever(
+                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+    tester.advanceProcessingTime(new Instant(5000));
+    injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
+    injectElement(tester, 5);
+
+    // After this advancement, the window is expired and only the GC process
+    // should be allowed to touch it
+    tester.advanceInputWatermarkNoTimers(new Instant(100));
+
+    // This should not output
+    tester.advanceProcessingTime(new Instant(6000));
+
+    assertThat(tester.extractOutput(), emptyIterable());
+  }
+
+  /**
+   * Tests that when a processing time timer comes in after a window is expired
    * but in the same bundle it does not cause a spurious output.
    */
   @Test


[03/43] beam git commit: Add support for PipelineOptions parameters

Posted by ke...@apache.org.
Add support for PipelineOptions parameters

This is a step towards eliminating catch-all context parameters and
making DoFns express their fine-grained data needs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/56cb6c51
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/56cb6c51
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/56cb6c51

Branch: refs/heads/gearpump-runner
Commit: 56cb6c51748fde6ad56522733ab10edca062e802
Parents: f75dfe7
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 13 10:29:50 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 30 15:38:47 2017 -0700

----------------------------------------------------------------------
 ...eBoundedSplittableProcessElementInvoker.java |  5 ++
 .../beam/runners/core/SimpleDoFnRunner.java     | 20 +++++++
 .../apache/beam/sdk/transforms/DoFnTester.java  |  5 ++
 .../reflect/ByteBuddyDoFnInvokerFactory.java    |  6 ++
 .../sdk/transforms/reflect/DoFnInvoker.java     | 13 +++-
 .../sdk/transforms/reflect/DoFnSignature.java   | 23 +++++++
 .../sdk/transforms/reflect/DoFnSignatures.java  | 22 ++++++-
 .../apache/beam/sdk/transforms/ParDoTest.java   | 63 ++++++++++++++++++++
 .../transforms/reflect/DoFnSignaturesTest.java  | 14 +++++
 9 files changed, 169 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 2db6531..475abf2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -118,6 +118,11 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
           }
 
           @Override
+          public PipelineOptions pipelineOptions() {
+            return pipelineOptions;
+          }
+
+          @Override
           public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
             throw new IllegalStateException(
                 "Should not access startBundleContext() from @"

http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 7d7babd..c3bfef6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -233,6 +233,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     }
 
     @Override
+    public PipelineOptions pipelineOptions() {
+      return getPipelineOptions();
+    }
+
+    @Override
     public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
       return this;
     }
@@ -298,6 +303,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     }
 
     @Override
+    public PipelineOptions pipelineOptions() {
+      return getPipelineOptions();
+    }
+
+    @Override
     public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
       throw new UnsupportedOperationException(
           "Cannot access StartBundleContext outside of @StartBundle method.");
@@ -467,6 +477,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     }
 
     @Override
+    public PipelineOptions pipelineOptions() {
+      return getPipelineOptions();
+    }
+
+    @Override
     public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
       throw new UnsupportedOperationException("StartBundleContext parameters are not supported.");
     }
@@ -568,6 +583,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     }
 
     @Override
+    public PipelineOptions pipelineOptions() {
+      return getPipelineOptions();
+    }
+
+    @Override
     public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
       throw new UnsupportedOperationException("StartBundleContext parameters are not supported.");
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 4da9a80..b2377dd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -290,6 +290,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
             }
 
             @Override
+            public PipelineOptions pipelineOptions() {
+              return getPipelineOptions();
+            }
+
+            @Override
             public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(
                 DoFn<InputT, OutputT> doFn) {
               throw new UnsupportedOperationException(

http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 4f67db4..8378204 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -90,6 +90,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
   public static final String PROCESS_CONTEXT_PARAMETER_METHOD = "processContext";
   public static final String ON_TIMER_CONTEXT_PARAMETER_METHOD = "onTimerContext";
   public static final String WINDOW_PARAMETER_METHOD = "window";
+  public static final String PIPELINE_OPTIONS_PARAMETER_METHOD = "pipelineOptions";
   public static final String RESTRICTION_TRACKER_PARAMETER_METHOD = "restrictionTracker";
   public static final String STATE_PARAMETER_METHOD = "state";
   public static final String TIMER_PARAMETER_METHOD = "timer";
@@ -627,6 +628,11 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
                     getExtraContextFactoryMethodDescription(TIMER_PARAMETER_METHOD, String.class)),
                 TypeCasting.to(new TypeDescription.ForLoadedType(Timer.class)));
           }
+
+          @Override
+          public StackManipulation dispatch(DoFnSignature.Parameter.PipelineOptionsParameter p) {
+            return simpleExtraContextParameter(PIPELINE_OPTIONS_PARAMETER_METHOD);
+          }
         });
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index ed81f42..3b22fda 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms.reflect;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -102,7 +103,12 @@ public interface DoFnInvoker<InputT, OutputT> {
      */
     BoundedWindow window();
 
-    /** Provide a {@link DoFn.StartBundleContext} to use with the given {@link DoFn}. */
+    /** Provide {@link PipelineOptions}. */
+    PipelineOptions pipelineOptions();
+
+    /**
+     * Provide a {@link DoFn.StartBundleContext} to use with the given {@link DoFn}.
+     */
     DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn);
 
     /** Provide a {@link DoFn.FinishBundleContext} to use with the given {@link DoFn}. */
@@ -140,6 +146,11 @@ public interface DoFnInvoker<InputT, OutputT> {
     }
 
     @Override
+    public PipelineOptions pipelineOptions() {
+      return null;
+    }
+
+    @Override
     public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 0b4bf90..6eeed8e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.Timer;
@@ -193,6 +194,8 @@ public abstract class DoFnSignature {
         return cases.dispatch((StateParameter) this);
       } else if (this instanceof TimerParameter) {
         return cases.dispatch((TimerParameter) this);
+      } else if (this instanceof PipelineOptionsParameter) {
+        return cases.dispatch((PipelineOptionsParameter) this);
       } else {
         throw new IllegalStateException(
             String.format("Attempt to case match on unknown %s subclass %s",
@@ -212,6 +215,7 @@ public abstract class DoFnSignature {
       ResultT dispatch(RestrictionTrackerParameter p);
       ResultT dispatch(StateParameter p);
       ResultT dispatch(TimerParameter p);
+      ResultT dispatch(PipelineOptionsParameter p);
 
       /**
        * A base class for a visitor with a default method for cases it is not interested in.
@@ -259,6 +263,11 @@ public abstract class DoFnSignature {
         public ResultT dispatch(TimerParameter p) {
           return dispatchDefault(p);
         }
+
+        @Override
+        public ResultT dispatch(PipelineOptionsParameter p) {
+          return dispatchDefault(p);
+        }
       }
     }
 
@@ -287,6 +296,11 @@ public abstract class DoFnSignature {
       return new AutoValue_DoFnSignature_Parameter_WindowParameter(windowT);
     }
 
+    /** Returns a {@link PipelineOptionsParameter}. */
+    public static PipelineOptionsParameter pipelineOptions() {
+      return new AutoValue_DoFnSignature_Parameter_PipelineOptionsParameter();
+    }
+
     /**
      * Returns a {@link RestrictionTrackerParameter}.
      */
@@ -306,6 +320,14 @@ public abstract class DoFnSignature {
     }
 
     /**
+     * Descriptor for a {@link Parameter} of a subtype of {@link PipelineOptions}.
+     */
+    @AutoValue
+    public abstract static class PipelineOptionsParameter extends Parameter {
+      PipelineOptionsParameter() {}
+    }
+
+    /**
      * Descriptor for a {@link Parameter} of type {@link DoFn.StartBundleContext}.
      *
      * <p>All such descriptors are equal.
@@ -314,6 +336,7 @@ public abstract class DoFnSignature {
     public abstract static class StartBundleContextParameter extends Parameter {
       StartBundleContextParameter() {}
     }
+
     /**
      * Descriptor for a {@link Parameter} of type {@link DoFn.FinishBundleContext}.
      *

http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index bb191b1..1b27e66 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -42,6 +42,7 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.Timer;
@@ -78,19 +79,23 @@ public class DoFnSignatures {
       ImmutableList.of(
           Parameter.ProcessContextParameter.class,
           Parameter.WindowParameter.class,
+          Parameter.PipelineOptionsParameter.class,
           Parameter.TimerParameter.class,
           Parameter.StateParameter.class);
 
   private static final Collection<Class<? extends Parameter>>
       ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS =
           ImmutableList.of(
-              Parameter.ProcessContextParameter.class, Parameter.RestrictionTrackerParameter.class);
+              Parameter.PipelineOptionsParameter.class,
+              Parameter.ProcessContextParameter.class,
+              Parameter.RestrictionTrackerParameter.class);
 
   private static final Collection<Class<? extends Parameter>>
       ALLOWED_ON_TIMER_PARAMETERS =
           ImmutableList.of(
               Parameter.OnTimerContextParameter.class,
               Parameter.WindowParameter.class,
+              Parameter.PipelineOptionsParameter.class,
               Parameter.TimerParameter.class,
               Parameter.StateParameter.class);
 
@@ -187,6 +192,15 @@ public class DoFnSignatures {
           extraParameters, Predicates.instanceOf(WindowParameter.class));
     }
 
+    /**
+     * Indicates whether a {@link Parameter.PipelineOptionsParameter} is
+     * known in this context.
+     */
+    public boolean hasPipelineOptionsParamter() {
+      return Iterables.any(
+          extraParameters, Predicates.instanceOf(Parameter.PipelineOptionsParameter.class));
+    }
+
     /** The window type, if any, used by this method. */
     @Nullable
     public TypeDescriptor<? extends BoundedWindow> getWindowType() {
@@ -789,6 +803,12 @@ public class DoFnSignatures {
           "Multiple %s parameters",
           BoundedWindow.class.getSimpleName());
       return Parameter.boundedWindow((TypeDescriptor<? extends BoundedWindow>) paramT);
+    } else if (PipelineOptions.class.equals(rawType)) {
+      methodErrors.checkArgument(
+          !methodContext.hasPipelineOptionsParamter(),
+          "Multiple %s parameters",
+          PipelineOptions.class.getSimpleName());
+      return Parameter.pipelineOptions();
     } else if (RestrictionTracker.class.isAssignableFrom(rawType)) {
       methodErrors.checkArgument(
           !methodContext.hasRestrictionTrackerParameter(),

http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index c67cf2a..5b60ef3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -62,6 +62,8 @@ import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.CombiningState;
 import org.apache.beam.sdk.state.MapState;
@@ -2942,4 +2944,65 @@ public class ParDoTest implements Serializable {
 
     // If it doesn't crash, we made it!
   }
+
+  /** A {@link PipelineOptions} subclass for testing passing to a {@link DoFn}. */
+  public interface MyOptions extends PipelineOptions {
+    @Default.String("fake option")
+    String getFakeOption();
+    void setFakeOption(String value);
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testPipelineOptionsParameter() {
+    PCollection<String> results = pipeline
+        .apply(Create.of(1))
+        .apply(
+            ParDo.of(
+                new DoFn<Integer, String>() {
+                  @ProcessElement
+                  public void process(ProcessContext c, PipelineOptions options) {
+                    c.output(options.as(MyOptions.class).getFakeOption());
+                  }
+                }));
+
+    String testOptionValue = "not fake anymore";
+    pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue);
+    PAssert.that(results).containsInAnyOrder("not fake anymore");
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+  public void testPipelineOptionsParameterOnTimer() {
+    final String timerId = "thisTimer";
+
+    PCollection<String> results =
+        pipeline
+            .apply(Create.of(KV.of(0, 0)))
+            .apply(
+                ParDo.of(
+                    new DoFn<KV<Integer, Integer>, String>() {
+                      @TimerId(timerId)
+                      private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+                      @ProcessElement
+                      public void process(
+                          ProcessContext c, BoundedWindow w, @TimerId(timerId) Timer timer) {
+                        timer.set(w.maxTimestamp());
+                      }
+
+                      @OnTimer(timerId)
+                      public void onTimer(OnTimerContext c, PipelineOptions options) {
+                        c.output(options.as(MyOptions.class).getFakeOption());
+                      }
+                    }));
+
+    String testOptionValue = "not fake anymore";
+    pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue);
+    PAssert.that(results).containsInAnyOrder("not fake anymore");
+
+    pipeline.run();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index cffb0ad..70c8dfd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -29,6 +29,7 @@ import static org.junit.Assert.fail;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.TimeDomain;
@@ -329,6 +330,19 @@ public class DoFnSignaturesTest {
   }
 
   @Test
+  public void testPipelineOptionsParameter() throws Exception {
+    DoFnSignature sig =
+        DoFnSignatures.getSignature(new DoFn<String, String>() {
+          @ProcessElement
+          public void process(ProcessContext c, PipelineOptions options) {}
+        }.getClass());
+
+    assertThat(
+        sig.processElement().extraParameters(),
+        Matchers.<Parameter>hasItem(instanceOf(Parameter.PipelineOptionsParameter.class)));
+  }
+
+  @Test
   public void testDeclAndUsageOfTimerInSuperclass() throws Exception {
     DoFnSignature sig =
         DoFnSignatures.getSignature(new DoFnOverridingAbstractTimerUse().getClass());


[10/43] beam git commit: For GCS operations use an http client with a default timeout value.

Posted by ke...@apache.org.
For GCS operations use an http client with a default timeout value.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/68f1fb64
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/68f1fb64
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/68f1fb64

Branch: refs/heads/gearpump-runner
Commit: 68f1fb64fd2565e287e322d715ca778d01e7137b
Parents: 0bd47c0
Author: Ahmet Altay <al...@google.com>
Authored: Fri Jun 30 17:37:33 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Jul 4 11:07:25 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/gcsio.py | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/68f1fb64/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index d43c8ba..643fbc7 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -31,6 +31,7 @@ import re
 import threading
 import time
 import traceback
+import httplib2
 
 from apache_beam.utils import retry
 
@@ -68,6 +69,10 @@ except ImportError:
 # +---------------+------------+-------------+-------------+-------------+
 DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
 
+# This is the number of seconds the library will wait for GCS operations to
+# complete.
+DEFAULT_HTTP_TIMEOUT_SECONDS = 60
+
 # This is the number of seconds the library will wait for a partial-file read
 # operation from GCS to complete before retrying.
 DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS = 60
@@ -99,6 +104,7 @@ class GcsIO(object):
 
   def __new__(cls, storage_client=None):
     if storage_client:
+      # This path is only used for testing.
       return super(GcsIO, cls).__new__(cls, storage_client)
     else:
       # Create a single storage client for each thread.  We would like to avoid
@@ -108,7 +114,9 @@ class GcsIO(object):
       local_state = threading.local()
       if getattr(local_state, 'gcsio_instance', None) is None:
         credentials = auth.get_service_credentials()
-        storage_client = storage.StorageV1(credentials=credentials)
+        storage_client = storage.StorageV1(
+            credentials=credentials,
+            http=httplib2.Http(timeout=DEFAULT_HTTP_TIMEOUT_SECONDS))
         local_state.gcsio_instance = (
             super(GcsIO, cls).__new__(cls, storage_client))
         local_state.gcsio_instance.client = storage_client


[11/43] beam git commit: This closes #3486

Posted by ke...@apache.org.
This closes #3486


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b8ac3264
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b8ac3264
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b8ac3264

Branch: refs/heads/gearpump-runner
Commit: b8ac32641183b06a87517a169c8ecc5371996aed
Parents: 0bd47c0 68f1fb6
Author: Ahmet Altay <al...@google.com>
Authored: Tue Jul 4 11:07:31 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Jul 4 11:07:31 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/gcsio.py | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[12/43] beam git commit: [maven-release-plugin] prepare branch release-2.1.0

Posted by ke...@apache.org.
[maven-release-plugin] prepare branch release-2.1.0


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51877a34
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51877a34
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51877a34

Branch: refs/heads/gearpump-runner
Commit: 51877a3405dbf778c3bb88f19bb194e54c3b3def
Parents: b8ac326
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed Jul 5 16:47:29 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed Jul 5 16:47:29 2017 +0200

----------------------------------------------------------------------
 pom.xml                     | 2 +-
 runners/direct-java/pom.xml | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/51877a34/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c0207ef..057954a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,7 +48,7 @@
     <connection>scm:git:https://git-wip-us.apache.org/repos/asf/beam.git</connection>
     <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/beam.git</developerConnection>
     <url>https://git-wip-us.apache.org/repos/asf?p=beam.git;a=summary</url>
-    <tag>HEAD</tag>
+    <tag>release-2.1.0</tag>
   </scm>
 
   <issueManagement>

http://git-wip-us.apache.org/repos/asf/beam/blob/51877a34/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 6346575..5b5aec2 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -117,7 +117,7 @@
                   </relocation>
                 </relocations>
                 <transformers>
-                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                 </transformers>
               </configuration>
             </execution>


[06/43] beam git commit: Make modules that depend on Hadoop and Spark use the same version property

Posted by ke...@apache.org.
Make modules that depend on Hadoop and Spark use the same version property


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ce4e5174
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ce4e5174
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ce4e5174

Branch: refs/heads/gearpump-runner
Commit: ce4e51747501111ae2c4b1691c6994bd0f92e161
Parents: 0e429b3
Author: Ismaël Mejía <ie...@apache.org>
Authored: Sun Jun 4 22:55:05 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Mon Jul 3 08:19:02 2017 +0200

----------------------------------------------------------------------
 examples/java/pom.xml                           | 18 ++++--
 examples/java8/pom.xml                          | 18 ++++--
 pom.xml                                         | 65 +++++++++++++++++++-
 runners/apex/pom.xml                            |  2 +-
 runners/spark/pom.xml                           |  7 ---
 sdks/java/extensions/sorter/pom.xml             |  6 --
 sdks/java/io/hadoop-file-system/pom.xml         | 31 ----------
 sdks/java/io/hadoop/jdk1.8-tests/pom.xml        |  2 -
 sdks/java/io/hbase/pom.xml                      |  9 ++-
 sdks/java/io/hcatalog/pom.xml                   |  6 +-
 sdks/java/io/jdbc/pom.xml                       |  2 -
 sdks/java/io/pom.xml                            | 31 ----------
 sdks/java/javadoc/pom.xml                       |  2 -
 .../main/resources/archetype-resources/pom.xml  |  1 -
 .../main/resources/archetype-resources/pom.xml  |  1 -
 15 files changed, 98 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ce4e5174/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 701e4fe..7ae4e6a 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -34,10 +34,6 @@
 
   <packaging>jar</packaging>
 
-  <properties>
-    <spark.version>1.6.2</spark.version>
-  </properties>
-
   <profiles>
 
     <!--
@@ -66,6 +62,12 @@
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-runners-apex</artifactId>
           <scope>runtime</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>javax.servlet</groupId>
+              <artifactId>servlet-api</artifactId>
+            </exclusion>
+          </exclusions>
         </dependency>
         <!--
           Apex depends on httpclient version 4.3.5, project has a transitive dependency to httpclient 4.0.1 from
@@ -95,6 +97,12 @@
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-runners-flink_2.10</artifactId>
           <scope>runtime</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>javax.servlet</groupId>
+              <artifactId>servlet-api</artifactId>
+            </exclusion>
+          </exclusions>
         </dependency>
       </dependencies>
     </profile>
@@ -116,13 +124,11 @@
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-streaming_2.10</artifactId>
-          <version>${spark.version}</version>
           <scope>runtime</scope>
         </dependency>
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-core_2.10</artifactId>
-          <version>${spark.version}</version>
           <scope>runtime</scope>
           <exclusions>
             <exclusion>

http://git-wip-us.apache.org/repos/asf/beam/blob/ce4e5174/examples/java8/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml
index 56295a4..a0ce708 100644
--- a/examples/java8/pom.xml
+++ b/examples/java8/pom.xml
@@ -35,10 +35,6 @@
 
   <packaging>jar</packaging>
 
-  <properties>
-    <spark.version>1.6.2</spark.version>
-  </properties>
-
   <profiles>
     <!--
       The direct runner is available by default.
@@ -66,6 +62,12 @@
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-runners-apex</artifactId>
           <scope>runtime</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>javax.servlet</groupId>
+              <artifactId>servlet-api</artifactId>
+            </exclusion>
+          </exclusions>
         </dependency>
         <!--
           Apex depends on httpclient version 4.3.5, project has a transitive dependency to httpclient 4.0.1 from
@@ -95,6 +97,12 @@
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-runners-flink_2.10</artifactId>
           <scope>runtime</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>javax.servlet</groupId>
+              <artifactId>servlet-api</artifactId>
+            </exclusion>
+          </exclusions>
         </dependency>
       </dependencies>
     </profile>
@@ -116,13 +124,11 @@
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-streaming_2.10</artifactId>
-          <version>${spark.version}</version>
           <scope>runtime</scope>
         </dependency>
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-core_2.10</artifactId>
-          <version>${spark.version}</version>
           <scope>runtime</scope>
           <exclusions>
             <exclusion>

http://git-wip-us.apache.org/repos/asf/beam/blob/ce4e5174/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fe51660..c0207ef 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,6 +127,12 @@
     <guava.version>20.0</guava.version>
     <grpc.version>1.2.0</grpc.version>
     <grpc-google-common-protos.version>0.1.9</grpc-google-common-protos.version>
+    <!--
+      This is the version of Hadoop used to compile the module that depend on Hadoop.
+      This dependency is defined with a provided scope.
+      Users must supply their own Hadoop version at runtime.
+    -->
+    <hadoop.version>2.7.3</hadoop.version>
     <hamcrest.version>1.3</hamcrest.version>
     <jackson.version>2.8.9</jackson.version>
     <findbugs.version>3.0.1</findbugs.version>
@@ -139,7 +145,7 @@
     <pubsub.version>v1-rev10-1.22.0</pubsub.version>
     <slf4j.version>1.7.14</slf4j.version>
     <spanner.version>0.20.0-beta</spanner.version>
-    <spark.version>1.6.2</spark.version>
+    <spark.version>1.6.3</spark.version>
     <spring.version>4.3.5.RELEASE</spring.version>
     <stax2.version>3.1.4</stax2.version>
     <storage.version>v1-rev71-1.22.0</storage.version>
@@ -1075,6 +1081,42 @@
         <version>${snappy-java.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-client</artifactId>
+        <version>${hadoop.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-common</artifactId>
+        <version>${hadoop.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-mapreduce-client-core</artifactId>
+        <version>${hadoop.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.spark</groupId>
+        <artifactId>spark-core_2.10</artifactId>
+        <version>${spark.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.spark</groupId>
+        <artifactId>spark-streaming_2.10</artifactId>
+        <version>${spark.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.spark</groupId>
+        <artifactId>spark-network-common_2.10</artifactId>
+        <version>${spark.version}</version>
+      </dependency>
+
       <!-- Testing -->
 
       <dependency>
@@ -1144,6 +1186,27 @@
         <scope>test</scope>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-minicluster</artifactId>
+        <version>${hadoop.version}</version>
+        <scope>test</scope>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-hdfs</artifactId>
+        <version>${hadoop.version}</version>
+        <scope>test</scope>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-hdfs</artifactId>
+        <version>${hadoop.version}</version>
+        <classifier>tests</classifier>
+        <scope>test</scope>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ce4e5174/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 2c54654..88ff0f2 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -261,7 +261,7 @@
                 <ignoredUsedUndeclaredDependency>com.esotericsoftware.kryo:kryo::${apex.kryo.version}</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>com.datatorrent:netlet::1.3.0</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>org.slf4j:slf4j-api:jar:1.7.14</ignoredUsedUndeclaredDependency>
-                <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:2.6.0</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:${hadoop.version}</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>joda-time:joda-time:jar:2.4</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>com.google.guava:guava:jar:20.0</ignoredUsedUndeclaredDependency>
               </ignoredUsedUndeclaredDependencies>

http://git-wip-us.apache.org/repos/asf/beam/blob/ce4e5174/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index ee72dd9..1d93427 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -34,8 +34,6 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-    <spark.version>1.6.3</spark.version>
-    <hadoop.version>2.2.0</hadoop.version>
     <kafka.version>0.9.0.1</kafka.version>
     <jackson.version>2.4.4</jackson.version>
     <dropwizard.metrics.version>3.1.2</dropwizard.metrics.version>
@@ -135,31 +133,26 @@
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_2.10</artifactId>
-      <version>${spark.version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_2.10</artifactId>
-      <version>${spark.version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-network-common_2.10</artifactId>
-      <version>${spark.version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
-      <version>${hadoop.version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>
-      <version>${hadoop.version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/ce4e5174/sdks/java/extensions/sorter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/pom.xml b/sdks/java/extensions/sorter/pom.xml
index 9d25f9d..ac61f76 100644
--- a/sdks/java/extensions/sorter/pom.xml
+++ b/sdks/java/extensions/sorter/pom.xml
@@ -29,10 +29,6 @@
   <artifactId>beam-sdks-java-extensions-sorter</artifactId>
   <name>Apache Beam :: SDKs :: Java :: Extensions :: Sorter</name>
 
-  <properties>
-    <hadoop.version>2.7.1</hadoop.version>
-  </properties>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.beam</groupId>
@@ -42,14 +38,12 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>
-      <version>${hadoop.version}</version>
       <scope>provided</scope>
     </dependency>
     
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
-      <version>${hadoop.version}</version>
       <scope>provided</scope>
     </dependency>
     

http://git-wip-us.apache.org/repos/asf/beam/blob/ce4e5174/sdks/java/io/hadoop-file-system/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/pom.xml b/sdks/java/io/hadoop-file-system/pom.xml
index db5a1db..a54977e 100644
--- a/sdks/java/io/hadoop-file-system/pom.xml
+++ b/sdks/java/io/hadoop-file-system/pom.xml
@@ -44,37 +44,6 @@
     </plugins>
   </build>
 
-  <properties>
-    <!--
-      This is the version of Hadoop used to compile the hadoop-common module.
-      This dependency is defined with a provided scope.
-      Users must supply their own Hadoop version at runtime.
-    -->
-    <hadoop.version>2.7.3</hadoop.version>
-  </properties>
-
-  <dependencyManagement>
-    <!--
-       We define dependencies here instead of sdks/java/io because
-       of a version mimatch between this Hadoop version and other
-       Hadoop versions declared in other io submodules.
-    -->
-    <dependencies>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-hdfs</artifactId>
-        <classifier>tests</classifier>
-        <version>${hadoop.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-minicluster</artifactId>
-        <version>${hadoop.version}</version>
-      </dependency>
-    </dependencies>
-  </dependencyManagement>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.beam</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/ce4e5174/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
index 9f84e88..baaa982 100644
--- a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
+++ b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
@@ -108,13 +108,11 @@
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-streaming_2.10</artifactId>
-          <version>${spark.version}</version>
           <scope>runtime</scope>
         </dependency>
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-core_2.10</artifactId>
-          <version>${spark.version}</version>
           <scope>runtime</scope>
           <exclusions>
             <exclusion>

http://git-wip-us.apache.org/repos/asf/beam/blob/ce4e5174/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
index 4d9d600..9d5e2aa 100644
--- a/sdks/java/io/hbase/pom.xml
+++ b/sdks/java/io/hbase/pom.xml
@@ -32,7 +32,6 @@
 
   <properties>
     <hbase.version>1.2.6</hbase.version>
-    <hbase.hadoop.version>2.5.1</hbase.hadoop.version>
   </properties>
 
   <build>
@@ -109,14 +108,18 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-minicluster</artifactId>
-      <version>${hbase.hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
       <scope>test</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
-      <version>${hbase.hadoop.version}</version>
       <scope>test</scope>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ce4e5174/sdks/java/io/hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/pom.xml b/sdks/java/io/hcatalog/pom.xml
index 19b62a5..8af740d 100644
--- a/sdks/java/io/hcatalog/pom.xml
+++ b/sdks/java/io/hcatalog/pom.xml
@@ -39,14 +39,14 @@
     <plugins>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
+        <artifactId>maven-shade-plugin</artifactId>
         <configuration>
-          <redirectTestOutputToFile>true</redirectTestOutputToFile>
+          <createDependencyReducedPom>false</createDependencyReducedPom>
         </configuration>
       </plugin>
     </plugins>
   </build>
-  
+
   <dependencies>
     <dependency>
       <groupId>org.apache.beam</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/ce4e5174/sdks/java/io/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index 17c26a0..45ec06c 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -49,13 +49,11 @@
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-streaming_2.10</artifactId>
-          <version>${spark.version}</version>
           <scope>runtime</scope>
         </dependency>
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-core_2.10</artifactId>
-          <version>${spark.version}</version>
           <scope>runtime</scope>
           <exclusions>
             <exclusion>

http://git-wip-us.apache.org/repos/asf/beam/blob/ce4e5174/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index e5db41b..458dfaf 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -32,37 +32,6 @@
   <description>Beam SDK Java IO provides different connectivity components
   (sources and sinks) to consume and produce data from systems.</description>
 
-  <properties>
-    <!--
-      This is the version of Hadoop used to compile the hadoop-common module.
-      This dependency is defined with a provided scope.
-      Users must supply their own Hadoop version at runtime.
-    -->
-    <hadoop.version>2.7.3</hadoop.version>
-  </properties>
-
-  <dependencyManagement>
-    <dependencies>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-client</artifactId>
-        <version>${hadoop.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-common</artifactId>
-        <version>${hadoop.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-mapreduce-client-core</artifactId>
-        <version>${hadoop.version}</version>
-      </dependency>
-    </dependencies>
-  </dependencyManagement>
-
   <modules>
     <module>amqp</module>
     <module>cassandra</module>

http://git-wip-us.apache.org/repos/asf/beam/blob/ce4e5174/sdks/java/javadoc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml
index 54dae3a..08d5ec6 100644
--- a/sdks/java/javadoc/pom.xml
+++ b/sdks/java/javadoc/pom.xml
@@ -196,13 +196,11 @@
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_2.10</artifactId>
-      <version>${spark.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_2.10</artifactId>
-      <version>${spark.version}</version>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ce4e5174/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
index af4fbd3..4517861 100644
--- a/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
@@ -242,7 +242,6 @@
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-streaming_2.10</artifactId>
-          <version>${spark.version}</version>
           <scope>runtime</scope>
           <exclusions>
             <exclusion>

http://git-wip-us.apache.org/repos/asf/beam/blob/ce4e5174/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
index b8b9c9f..d039ddb 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
@@ -241,7 +241,6 @@
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-streaming_2.10</artifactId>
-          <version>${spark.version}</version>
           <scope>runtime</scope>
           <exclusions>
             <exclusion>


[28/43] beam git commit: Add timeout to initialization of partition in KafkaIO

Posted by ke...@apache.org.
Add timeout to initialization of partition in KafkaIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/526037b6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/526037b6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/526037b6

Branch: refs/heads/gearpump-runner
Commit: 526037b6786315b9f9fdca6edb636baeb6f83e3f
Parents: 85a99e2
Author: Raghu Angadi <ra...@google.com>
Authored: Mon Jul 3 23:54:10 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 6 11:58:41 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 81 +++++++++++++++-----
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 30 ++++++++
 2 files changed, 92 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/526037b6/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index e520367..026313a 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -49,9 +49,11 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -1061,8 +1063,32 @@ public class KafkaIO {
       curBatch = Iterators.cycle(nonEmpty);
     }
 
+    private void setupInitialOffset(PartitionState pState) {
+      Read<K, V> spec = source.spec;
+
+      if (pState.nextOffset != UNINITIALIZED_OFFSET) {
+        consumer.seek(pState.topicPartition, pState.nextOffset);
+      } else {
+        // nextOffset is unininitialized here, meaning start reading from latest record as of now
+        // ('latest' is the default, and is configurable) or 'look up offset by startReadTime.
+        // Remember the current position without waiting until the first record is read. This
+        // ensures checkpoint is accurate even if the reader is closed before reading any records.
+        Instant startReadTime = spec.getStartReadTime();
+        if (startReadTime != null) {
+          pState.nextOffset =
+              consumerSpEL.offsetForTime(consumer, pState.topicPartition, spec.getStartReadTime());
+          consumer.seek(pState.topicPartition, pState.nextOffset);
+        } else {
+          pState.nextOffset = consumer.position(pState.topicPartition);
+        }
+      }
+    }
+
     @Override
     public boolean start() throws IOException {
+      final int defaultPartitionInitTimeout = 60 * 1000;
+      final int kafkaRequestTimeoutMultiple = 2;
+
       Read<K, V> spec = source.spec;
       consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
       consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions());
@@ -1077,25 +1103,38 @@ public class KafkaIO {
       keyDeserializerInstance.configure(spec.getConsumerConfig(), true);
       valueDeserializerInstance.configure(spec.getConsumerConfig(), false);
 
-      for (PartitionState p : partitionStates) {
-        if (p.nextOffset != UNINITIALIZED_OFFSET) {
-          consumer.seek(p.topicPartition, p.nextOffset);
-        } else {
-          // nextOffset is unininitialized here, meaning start reading from latest record as of now
-          // ('latest' is the default, and is configurable) or 'look up offset by startReadTime.
-          // Remember the current position without waiting until the first record is read. This
-          // ensures checkpoint is accurate even if the reader is closed before reading any records.
-          Instant startReadTime = spec.getStartReadTime();
-          if (startReadTime != null) {
-            p.nextOffset =
-                consumerSpEL.offsetForTime(consumer, p.topicPartition, spec.getStartReadTime());
-            consumer.seek(p.topicPartition, p.nextOffset);
-          } else {
-            p.nextOffset = consumer.position(p.topicPartition);
+      // Seek to start offset for each partition. This is the first interaction with the server.
+      // Unfortunately it can block forever in case of network issues like incorrect ACLs.
+      // Initialize partition in a separate thread and cancel it if takes longer than a minute.
+      for (final PartitionState pState : partitionStates) {
+        Future<?> future =  consumerPollThread.submit(new Runnable() {
+          public void run() {
+            setupInitialOffset(pState);
           }
-        }
+        });
 
-        LOG.info("{}: reading from {} starting at offset {}", name, p.topicPartition, p.nextOffset);
+        try {
+          // Timeout : 1 minute OR 2 * Kafka consumer request timeout if it is set.
+          Integer reqTimeout = (Integer) source.spec.getConsumerConfig().get(
+              ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+          future.get(reqTimeout != null ? kafkaRequestTimeoutMultiple * reqTimeout
+                         : defaultPartitionInitTimeout,
+                     TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          consumer.wakeup(); // This unblocks consumer stuck on network I/O.
+          // Likely reason : Kafka servers are configured to advertise internal ips, but
+          // those ips are not accessible from workers outside.
+          String msg = String.format(
+              "%s: Timeout while initializing partition '%s'. "
+                  + "Kafka client may not be able to connect to servers.",
+              this, pState.topicPartition);
+          LOG.error("{}", msg);
+          throw new IOException(msg);
+        } catch (Exception e) {
+          throw new IOException(e);
+        }
+        LOG.info("{}: reading from {} starting at offset {}",
+                 name, pState.topicPartition, pState.nextOffset);
       }
 
       // Start consumer read loop.
@@ -1329,8 +1368,12 @@ public class KafkaIO {
       // might block to enqueue right after availableRecordsQueue.poll() below.
       while (!isShutdown) {
 
-        consumer.wakeup();
-        offsetConsumer.wakeup();
+        if (consumer != null) {
+          consumer.wakeup();
+        }
+        if (offsetConsumer != null) {
+          offsetConsumer.wakeup();
+        }
         availableRecordsQueue.poll(); // drain unread batch, this unblocks consumer thread.
         try {
           isShutdown = consumerPollThread.awaitTermination(10, TimeUnit.SECONDS)

http://git-wip-us.apache.org/repos/asf/beam/blob/526037b6/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index b69bc83..482f5a2 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -83,6 +83,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -364,6 +365,35 @@ public class KafkaIOTest {
   }
 
   @Test
+  public void testUnreachableKafkaBrokers() {
+    // Expect an exception when the Kafka brokers are not reachable on the workers.
+    // We specify partitions explicitly so that splitting does not involve server interaction.
+    // Set request timeout to 10ms so that test does not take long.
+
+    thrown.expect(Exception.class);
+    thrown.expectMessage("Reader-0: Timeout while initializing partition 'test-0'");
+
+    int numElements = 1000;
+    PCollection<Long> input = p
+        .apply(KafkaIO.<Integer, Long>read()
+            .withBootstrapServers("8.8.8.8:9092") // Google public DNS ip.
+            .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 0)))
+            .withKeyDeserializer(IntegerDeserializer.class)
+            .withValueDeserializer(LongDeserializer.class)
+            .updateConsumerProperties(ImmutableMap.<String, Object>of(
+                ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10,
+                ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5,
+                ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 8,
+                ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 8))
+            .withMaxNumRecords(10)
+            .withoutMetadata())
+        .apply(Values.<Long>create());
+
+    addCountingAsserts(input, numElements);
+    p.run();
+  }
+
+  @Test
   public void testUnboundedSourceWithSingleTopic() {
     // same as testUnboundedSource, but with single topic
 


[18/43] beam git commit: [BEAM-2553] Update Maven exec plugin to 1.6.0 to incorporate messaging improvements

Posted by ke...@apache.org.
[BEAM-2553] Update Maven exec plugin to 1.6.0 to incorporate messaging improvements

This closes #3494


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/39c80f64
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/39c80f64
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/39c80f64

Branch: refs/heads/gearpump-runner
Commit: 39c80f640749fed5f55a0c8f25c849e5f7c3d179
Parents: f0549b4 6ca410a
Author: Luke Cwik <lc...@google.com>
Authored: Wed Jul 5 12:26:28 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 5 12:26:28 2017 -0700

----------------------------------------------------------------------
 pom.xml                                                            | 2 +-
 .../starter/src/test/resources/projects/basic/reference/pom.xml    | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[19/43] beam git commit: Disallow Combiner Lifting for multi-window WindowFns

Posted by ke...@apache.org.
Disallow Combiner Lifting for multi-window WindowFns


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/29c2bca4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/29c2bca4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/29c2bca4

Branch: refs/heads/gearpump-runner
Commit: 29c2bca4649317f2ebb1c89f92bf97fbb27602ca
Parents: 39c80f6
Author: Thomas Groh <tg...@google.com>
Authored: Wed Jul 5 14:16:50 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Jul 5 14:16:50 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/dataflow/DataflowPipelineTranslator.java    | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/29c2bca4/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 28fd1bb..f1783de 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -793,6 +793,7 @@ public class DataflowPipelineTranslator {
                 context.getPipelineOptions().as(StreamingOptions.class).isStreaming();
             boolean disallowCombinerLifting =
                 !windowingStrategy.getWindowFn().isNonMerging()
+                    || !windowingStrategy.getWindowFn().assignsToOneWindow()
                     || (isStreaming && !transform.fewKeys())
                     // TODO: Allow combiner lifting on the non-default trigger, as appropriate.
                     || !(windowingStrategy.getTrigger() instanceof DefaultTrigger);


[40/43] beam git commit: Merge branch 'master' of https://github.com/apache/beam into sync-master

Posted by ke...@apache.org.
Merge branch 'master' of https://github.com/apache/beam into sync-master


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2d3fbc3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2d3fbc3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2d3fbc3

Branch: refs/heads/gearpump-runner
Commit: c2d3fbc3a85a276850a2a56e040eccf2e2079339
Parents: 627ae0b 9e565f2
Author: manuzhang <ow...@gmail.com>
Authored: Fri Jul 7 16:33:03 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Jul 7 16:33:03 2017 +0800

----------------------------------------------------------------------
 .../jenkins/common_job_properties.groovy        |   5 +-
 .../job_beam_PreCommit_Website_Merge.groovy     |  59 +++++++++
 examples/java/pom.xml                           |  20 ++-
 examples/java8/pom.xml                          |  20 ++-
 .../examples/complete/game/LeaderBoardTest.java |   2 +
 examples/pom.xml                                |   2 +-
 pom.xml                                         |  87 ++++++++++++-
 runners/apex/pom.xml                            |  11 +-
 runners/core-construction-java/pom.xml          |   2 +-
 runners/core-java/pom.xml                       |   2 +-
 ...eBoundedSplittableProcessElementInvoker.java |   5 +
 .../beam/runners/core/ReduceFnRunner.java       | 106 ++++++++++-----
 .../beam/runners/core/SimpleDoFnRunner.java     |  20 +++
 .../beam/runners/core/ReduceFnRunnerTest.java   |  81 +++++++++++-
 runners/direct-java/pom.xml                     |   4 +-
 runners/flink/pom.xml                           |   2 +-
 runners/google-cloud-dataflow-java/pom.xml      |   2 +-
 .../dataflow/DataflowPipelineTranslator.java    |   1 +
 .../runners/dataflow/TransformTranslator.java   |   4 +-
 .../beam/runners/dataflow/util/TimeUtil.java    |  24 ++--
 .../runners/dataflow/util/TimeUtilTest.java     |   6 +
 runners/pom.xml                                 |   2 +-
 runners/spark/pom.xml                           |  16 +--
 sdks/common/fn-api/pom.xml                      |   2 +-
 sdks/common/pom.xml                             |   2 +-
 sdks/common/runner-api/pom.xml                  |   2 +-
 sdks/java/build-tools/pom.xml                   |   2 +-
 sdks/java/core/pom.xml                          |   2 +-
 .../org/apache/beam/sdk/transforms/DoFn.java    |   2 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |   5 +
 .../reflect/ByteBuddyDoFnInvokerFactory.java    |   6 +
 .../reflect/ByteBuddyOnTimerInvokerFactory.java |  73 ++++-------
 .../sdk/transforms/reflect/DoFnInvoker.java     |  13 +-
 .../sdk/transforms/reflect/DoFnSignature.java   |  23 ++++
 .../sdk/transforms/reflect/DoFnSignatures.java  |  22 +++-
 .../reflect/OnTimerMethodSpecifier.java         |  37 ++++++
 .../apache/beam/sdk/transforms/ParDoTest.java   |  63 +++++++++
 .../transforms/reflect/DoFnSignaturesTest.java  |  14 ++
 .../google-cloud-platform-core/pom.xml          |   2 +-
 sdks/java/extensions/jackson/pom.xml            |   2 +-
 sdks/java/extensions/join-library/pom.xml       |   2 +-
 sdks/java/extensions/pom.xml                    |   2 +-
 sdks/java/extensions/protobuf/pom.xml           |   2 +-
 sdks/java/extensions/sorter/pom.xml             |   8 +-
 sdks/java/harness/pom.xml                       |   2 +-
 sdks/java/io/amqp/pom.xml                       |   2 +-
 sdks/java/io/cassandra/pom.xml                  |   2 +-
 sdks/java/io/common/pom.xml                     |   2 +-
 sdks/java/io/elasticsearch/pom.xml              |   2 +-
 sdks/java/io/google-cloud-platform/pom.xml      |   2 +-
 sdks/java/io/hadoop-common/pom.xml              |   2 +-
 sdks/java/io/hadoop-file-system/pom.xml         |  33 +----
 sdks/java/io/hadoop/input-format/pom.xml        |   2 +-
 sdks/java/io/hadoop/jdk1.8-tests/pom.xml        |   4 +-
 sdks/java/io/hadoop/pom.xml                     |   2 +-
 sdks/java/io/hbase/pom.xml                      |  18 ++-
 sdks/java/io/hcatalog/pom.xml                   |  20 ++-
 sdks/java/io/jdbc/pom.xml                       |   4 +-
 sdks/java/io/jms/pom.xml                        |   2 +-
 sdks/java/io/kafka/pom.xml                      |   2 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 130 +++++++++++++------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   |  30 +++++
 sdks/java/io/kinesis/pom.xml                    |   2 +-
 sdks/java/io/mongodb/pom.xml                    |   2 +-
 sdks/java/io/mqtt/pom.xml                       |   2 +-
 sdks/java/io/pom.xml                            |  33 +----
 sdks/java/io/xml/pom.xml                        |   2 +-
 sdks/java/java8tests/pom.xml                    |   2 +-
 sdks/java/javadoc/pom.xml                       |  19 ++-
 .../maven-archetypes/examples-java8/pom.xml     |   2 +-
 .../main/resources/archetype-resources/pom.xml  |   1 -
 sdks/java/maven-archetypes/examples/pom.xml     |   2 +-
 .../main/resources/archetype-resources/pom.xml  |   1 -
 sdks/java/maven-archetypes/pom.xml              |   2 +-
 sdks/java/maven-archetypes/starter/pom.xml      |   2 +-
 .../resources/projects/basic/reference/pom.xml  |   2 +-
 sdks/java/pom.xml                               |   2 +-
 sdks/pom.xml                                    |   2 +-
 sdks/python/apache_beam/io/gcp/gcsio.py         |  10 +-
 .../runners/direct/transform_evaluator.py       |   5 +-
 sdks/python/apache_beam/version.py              |   2 +-
 sdks/python/pom.xml                             |   2 +-
 sdks/python/setup.py                            |   4 +-
 83 files changed, 828 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c2d3fbc3/examples/java/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/beam/blob/c2d3fbc3/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/beam/blob/c2d3fbc3/runners/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/beam/blob/c2d3fbc3/sdks/java/pom.xml
----------------------------------------------------------------------