You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/09/15 14:59:31 UTC

[1/2] incubator-beam git commit: [BEAM-634] Be able to import Beam codebase in Eclipse and support m2e

Repository: incubator-beam
Updated Branches:
  refs/heads/master d71d828b7 -> c4036753f


[BEAM-634] Be able to import Beam codebase in Eclipse and support m2e


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a0ae04be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a0ae04be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a0ae04be

Branch: refs/heads/master
Commit: a0ae04bef40149cdf54d0ab50909f18a444f3023
Parents: d71d828
Author: Daniel Kulp <dk...@apache.org>
Authored: Mon Sep 12 17:31:22 2016 -0400
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Thu Sep 15 16:26:05 2016 +0200

----------------------------------------------------------------------
 pom.xml                                         | 35 ++++++++++++++++++--
 runners/direct-java/pom.xml                     |  8 ++++-
 .../direct/BoundedReadEvaluatorFactory.java     |  9 ++---
 .../direct/ParDoMultiEvaluatorFactory.java      | 12 +++----
 .../direct/ParDoSingleEvaluatorFactory.java     |  9 ++---
 .../direct/UnboundedReadEvaluatorFactory.java   |  9 ++---
 .../apache/beam/runners/flink/package-info.java | 22 ------------
 .../src/main/resources/beam/checkstyle.xml      |  3 ++
 .../src/main/resources/beam/suppressions.xml    |  5 +++
 .../beam/sdk/coders/IterableLikeCoder.java      | 22 ++++++------
 .../beam/sdk/coders/protobuf/ProtoCoder.java    |  4 ++-
 .../beam/sdk/util/MergingActiveWindowSet.java   | 12 ++++---
 .../org/apache/beam/sdk/util/PubsubClient.java  |  2 +-
 .../beam/sdk/util/PubsubJsonClientTest.java     | 16 ++++-----
 .../apache/beam/sdk/io/kinesis/KinesisIO.java   |  4 ++-
 .../beam/sdk/io/kinesis/KinesisUploader.java    |  3 +-
 .../beam/sdk/io/kinesis/package-info.java       | 22 ------------
 .../beam/sdk/io/mongodb/package-info.java       | 22 ------------
 18 files changed, 102 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 01b5a88..fb3a8a4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,6 +130,8 @@
     <stax2.version>3.1.4</stax2.version>
     <storage.version>v1-rev71-1.22.0</storage.version>
     <woodstox.version>4.4.1</woodstox.version>
+    
+    <compiler.error.flag>-Werror</compiler.error.flag>
   </properties>
 
   <packaging>pom</packaging>
@@ -225,6 +227,33 @@
       </properties>
     </profile>
 
+    <profile>
+      <id>eclipse-jdt</id>
+      <properties>
+        <!-- Tycho doesn't support -Werror -->
+        <compiler.error.flag>-Xlint:all</compiler.error.flag>
+      </properties>
+      <build>
+        <pluginManagement>
+          <plugins>
+            <plugin>
+              <artifactId>maven-compiler-plugin</artifactId>
+              <configuration>
+                <compilerId>jdt</compilerId>
+              </configuration>
+              <dependencies>
+                <dependency>
+                  <groupId>org.eclipse.tycho</groupId>
+                  <artifactId>tycho-compiler-jdt</artifactId>
+                  <!-- 0.24.0 is last version to support Java7 -->
+                  <version>0.24.0</version>
+                </dependency>
+              </dependencies>
+            </plugin>
+          </plugins>
+        </pluginManagement>
+      </build>
+    </profile>
   </profiles>
 
   <dependencyManagement>
@@ -681,7 +710,7 @@
             <dependency>
               <groupId>com.puppycrawl.tools</groupId>
               <artifactId>checkstyle</artifactId>
-              <version>6.17</version>
+              <version>6.19</version>
             </dependency>
             <dependency>
               <groupId>org.apache.beam</groupId>
@@ -716,13 +745,13 @@
 
         <plugin>
           <artifactId>maven-compiler-plugin</artifactId>
-          <version>3.1</version>
+          <version>3.3</version>
           <configuration>
             <source>1.7</source>
             <target>1.7</target>
             <compilerArgs>
               <arg>-Xlint:all</arg>
-              <arg>-Werror</arg>
+              <arg>${compiler.error.flag}</arg>
               <!-- Override options warnings to support cross-compilation -->
               <arg>-Xlint:-options</arg>
               <!-- Temporary lint overrides, to be removed over time. -->

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index e06883f..354c8c7 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -361,6 +361,12 @@
       </exclusions>
       <scope>test</scope>
     </dependency>
-
+    
+    <!-- needed for eclipse-jdt generated core as the test-jar references classes from this -->
+    <dependency>
+        <groupId>com.google.cloud.dataflow</groupId>
+        <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
+        <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 2046d31..9c77946 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -73,7 +73,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
    * already done so.
    */
   private <OutputT> TransformEvaluator<?> getTransformEvaluator(
-      final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform) {
+      final AppliedPTransform<?, PCollection<OutputT>, ?> transform) {
     // Key by the application and the context the evaluation is occurring in (which call to
     // Pipeline#run).
     Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue =
@@ -83,7 +83,8 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
       if (sourceEvaluators.putIfAbsent(transform, evaluatorQueue) == null) {
         // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
         // factory for this transform
-        BoundedSource<OutputT> source = transform.getTransform().getSource();
+        Bounded<OutputT> bound = (Bounded<OutputT>) transform.getTransform();
+        BoundedSource<OutputT> source = bound.getSource();
         BoundedReadEvaluator<OutputT> evaluator =
             new BoundedReadEvaluator<OutputT>(transform, evaluationContext, source);
         evaluatorQueue.offer(evaluator);
@@ -105,7 +106,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
    * may produce duplicate elements.
    */
   private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
-    private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform;
+    private final AppliedPTransform<?, PCollection<OutputT>, ?> transform;
     private final EvaluationContext evaluationContext;
     /**
      * The source being read from by this {@link BoundedReadEvaluator}. This may not be the same as
@@ -114,7 +115,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     private BoundedSource<OutputT> source;
 
     public BoundedReadEvaluator(
-        AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
+        AppliedPTransform<?, PCollection<OutputT>, ?> transform,
         EvaluationContext evaluationContext,
         BoundedSource<OutputT> source) {
       this.transform = transform;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index fcb68c4..d909e8b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -1,5 +1,5 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
+* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -39,18 +39,18 @@ import org.slf4j.LoggerFactory;
  */
 class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
   private static final Logger LOG = LoggerFactory.getLogger(ParDoMultiEvaluatorFactory.class);
-  private final LoadingCache<AppliedPTransform<?, ?, BoundMulti<?, ?>>, DoFnLifecycleManager>
-      fnClones;
+  private final LoadingCache<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> fnClones;
   private final EvaluationContext evaluationContext;
 
   public ParDoMultiEvaluatorFactory(EvaluationContext evaluationContext) {
     this.evaluationContext = evaluationContext;
     fnClones = CacheBuilder.newBuilder()
-        .build(new CacheLoader<AppliedPTransform<?, ?, BoundMulti<?, ?>>, DoFnLifecycleManager>() {
+        .build(new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>() {
           @Override
-          public DoFnLifecycleManager load(AppliedPTransform<?, ?, BoundMulti<?, ?>> key)
+          public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> key)
               throws Exception {
-            return DoFnLifecycleManager.of(key.getTransform().getFn());
+            BoundMulti<?, ?> bound = (BoundMulti<?, ?>) key.getTransform();
+            return DoFnLifecycleManager.of(bound.getFn());
           }
         });
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index 91da35f..1a06ea6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
  */
 class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
   private static final Logger LOG = LoggerFactory.getLogger(ParDoSingleEvaluatorFactory.class);
-  private final LoadingCache<AppliedPTransform<?, ?, Bound<?, ?>>, DoFnLifecycleManager> fnClones;
+  private final LoadingCache<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> fnClones;
   private final EvaluationContext evaluationContext;
 
   public ParDoSingleEvaluatorFactory(EvaluationContext evaluationContext) {
@@ -47,11 +47,12 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
     fnClones =
         CacheBuilder.newBuilder()
             .build(
-                new CacheLoader<AppliedPTransform<?, ?, Bound<?, ?>>, DoFnLifecycleManager>() {
+                new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>() {
                   @Override
-                  public DoFnLifecycleManager load(AppliedPTransform<?, ?, Bound<?, ?>> key)
+                  public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> key)
                       throws Exception {
-                    return DoFnLifecycleManager.of(key.getTransform().getFn());
+                    Bound<?, ?> bound = (Bound<?, ?>) key.getTransform();
+                    return DoFnLifecycleManager.of(bound.getFn());
                   }
                 });
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 0dfcd69..9fb3dbf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -82,7 +82,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
    */
   private <OutputT, CheckpointMarkT extends CheckpointMark>
       TransformEvaluator<?> getTransformEvaluator(
-          final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform) {
+          final AppliedPTransform<?, PCollection<OutputT>, ?> transform) {
     ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue =
         (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>)
             sourceEvaluators.get(transform);
@@ -91,8 +91,9 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
       if (sourceEvaluators.putIfAbsent(transform, evaluatorQueue) == null) {
         // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
         // factory for this transform
+        Unbounded<OutputT> unbounded = (Unbounded<OutputT>) transform.getTransform();
         UnboundedSource<OutputT, CheckpointMarkT> source =
-            (UnboundedSource<OutputT, CheckpointMarkT>) transform.getTransform().getSource();
+            (UnboundedSource<OutputT, CheckpointMarkT>) unbounded.getSource();
         UnboundedReadDeduplicator deduplicator;
         if (source.requiresDeduping()) {
           deduplicator = UnboundedReadDeduplicator.CachedIdDeduplicator.create();
@@ -130,7 +131,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
       implements TransformEvaluator<Object> {
     private static final int ARBITRARY_MAX_ELEMENTS = 10;
 
-    private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
+    private final AppliedPTransform<?, PCollection<OutputT>, ?> transform;
     private final EvaluationContext evaluationContext;
     private final ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>
         evaluatorQueue;
@@ -151,7 +152,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     private int outputBundles = 0;
 
     public UnboundedReadEvaluator(
-        AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
+        AppliedPTransform<?, PCollection<OutputT>, ?> transform,
         EvaluationContext evaluationContext,
         UnboundedSource<OutputT, CheckpointMarkT> source,
         UnboundedReadDeduplicator deduplicator,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java
deleted file mode 100644
index 57f1e59..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
index c7d9b2c..a3313ca 100644
--- a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
@@ -59,6 +59,9 @@ page at http://checkstyle.sourceforge.net/config.html -->
     <property name="onCommentFormat" value="CHECKSTYLE.ON\: ([\w\|]+)"/>
     <property name="checkFormat" value="$1"/>
   </module>
+  <module name="SuppressionFilter">
+    <property name="file" value="${checkstyle.suppressions.file}" />
+  </module>
 
   <!-- Check that every module has a package-info.java -->
   <module name="JavadocPackage"/>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
index 6dbb7f5..00d6729 100644
--- a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
@@ -20,5 +20,10 @@
   <suppress checks="JavadocPackage" files=".*/src/test/.*"/>
   <suppress checks="JavadocPackage" files=".*/maven-archetypes/.*"/>
   <suppress checks="JavadocPackage" files=".*/examples/.*"/>
+  
+  <!-- suppress all checks in the generated directories -->
+  <suppress checks=".*" files=".+[\\\/]generated[\\\/].+\.java" />
+  <suppress checks=".*" files=".+[\\\/]generated-sources[\\\/].+\.java" />
+  <suppress checks=".*" files=".+[\\\/]generated-test-sources[\\\/].+\.java" />
 </suppressions>
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
index 8680552..da64a93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
@@ -140,19 +140,19 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
         elements.add(elementCoder.decode(dataInStream, nestedContext));
       }
       return decodeToIterable(elements);
-    } else {
-      List<T> elements = new ArrayList<>();
-      long count;
-      // We don't know the size a priori.  Check if we're done with
-      // each block of elements.
-      while ((count = VarInt.decodeLong(dataInStream)) > 0) {
-        while (count > 0) {
-          elements.add(elementCoder.decode(dataInStream, nestedContext));
-          count -= 1;
-        }
+    }
+    List<T> elements = new ArrayList<>();
+    // We don't know the size a priori.  Check if we're done with
+    // each block of elements.
+    long count = VarInt.decodeLong(dataInStream);
+    while (count > 0L) {
+      elements.add(elementCoder.decode(dataInStream, nestedContext));
+      --count;
+      if (count == 0L) {
+          count = VarInt.decodeLong(dataInStream);
       }
-      return decodeToIterable(elements);
     }
+    return decodeToIterable(elements);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
index 79fb373..9bba42b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
@@ -364,6 +364,8 @@ public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
     return memoizedParser;
   }
 
+  static final TypeDescriptor<Message> CHECK = new TypeDescriptor<Message>() {};
+
   /**
    * The implementation of the {@link CoderProvider} for this {@link ProtoCoder} returned by
    * {@link #coderProvider()}.
@@ -372,7 +374,7 @@ public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
       new CoderProvider() {
         @Override
         public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
-          if (!type.isSubtypeOf(new TypeDescriptor<Message>() {})) {
+          if (!type.isSubtypeOf(CHECK)) {
             throw new CannotProvideCoderException(
                 String.format(
                     "Cannot provide %s because %s is not a subclass of %s",

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java
index 6ad63b0..066579b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java
@@ -144,7 +144,7 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
     checkState(stateAddressWindows != null,
                              "Cannot ensure window %s is active since it is neither ACTIVE nor NEW",
                              window);
-    if (stateAddressWindows.isEmpty()) {
+    if (stateAddressWindows != null && stateAddressWindows.isEmpty()) {
       // Window was NEW, make it ACTIVE with itself as its state address window.
       stateAddressWindows.add(window);
     }
@@ -266,10 +266,12 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
       checkState(otherStateAddressWindows != null,
                                "Window %s is not ACTIVE or NEW", other);
 
-      for (W otherStateAddressWindow : otherStateAddressWindows) {
-        // Since otherTarget equiv other AND other equiv mergeResult
-        // THEN otherTarget equiv mergeResult.
-        newStateAddressWindows.add(otherStateAddressWindow);
+      if (otherStateAddressWindows != null) {
+        for (W otherStateAddressWindow : otherStateAddressWindows) {
+          // Since otherTarget equiv other AND other equiv mergeResult
+          // THEN otherTarget equiv mergeResult.
+          newStateAddressWindows.add(otherStateAddressWindow);
+        }
       }
       activeWindowToStateAddressWindows.remove(other);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
index fdcee16..bb6aa93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
@@ -114,7 +114,7 @@ public abstract class PubsubClient implements Closeable {
                     "Cannot interpret value of label %s as timestamp: %s",
                     timestampLabel, value);
     }
-    return timestampMsSinceEpoch;
+    return timestampMsSinceEpoch == null ? 0 : timestampMsSinceEpoch;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
index b6d7ccb..72fb9a2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
@@ -93,10 +93,10 @@ public class PubsubJsonClientTest {
                              .setAckId(ACK_ID);
     PullResponse expectedResponse =
         new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage));
-    Mockito.when(mockPubsub.projects()
-                           .subscriptions()
-                           .pull(expectedSubscription, expectedRequest)
-                           .execute())
+    Mockito.when((Object) (mockPubsub.projects()
+                               .subscriptions()
+                               .pull(expectedSubscription, expectedRequest)
+                               .execute()))
            .thenReturn(expectedResponse);
     List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
     assertEquals(1, acutalMessages.size());
@@ -120,10 +120,10 @@ public class PubsubJsonClientTest {
         .setMessages(ImmutableList.of(expectedPubsubMessage));
     PublishResponse expectedResponse = new PublishResponse()
         .setMessageIds(ImmutableList.of(MESSAGE_ID));
-    Mockito.when(mockPubsub.projects()
-                           .topics()
-                           .publish(expectedTopic, expectedRequest)
-                           .execute())
+    Mockito.when((Object) (mockPubsub.projects()
+                                .topics()
+                                .publish(expectedTopic, expectedRequest)
+                                .execute()))
            .thenReturn(expectedResponse);
     OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, RECORD_ID);
     int n = client.publish(TOPIC, ImmutableList.of(actualMessage));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index 811051c..acff33f 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -183,7 +183,9 @@ public final class KinesisIO {
 
             @Override
             public AmazonKinesis get() {
-                return new AmazonKinesisClient(getCredentialsProvider()).withRegion(region);
+                AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider());
+                client.withRegion(region);
+                return client;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
index c98242b..b1c212b 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
@@ -22,7 +22,6 @@ import static com.google.common.collect.Lists.newArrayList;
 import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.internal.StaticCredentialsProvider;
 import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.model.PutRecordsRequest;
 import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
@@ -41,7 +40,7 @@ public class KinesisUploader {
     public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
 
     public static void uploadAll(List<String> data, KinesisTestOptions options) {
-        AmazonKinesis client = new AmazonKinesisClient(
+        AmazonKinesisClient client = new AmazonKinesisClient(
                 new StaticCredentialsProvider(
                         new BasicAWSCredentials(
                                 options.getAwsAccessKey(), options.getAwsSecretKey()))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
deleted file mode 100644
index 44dbf4a..0000000
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Transforms for reading and writing from Amazon Kinesis.
- */
-package org.apache.beam.sdk.io.kinesis;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java
deleted file mode 100644
index fd08b58..0000000
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Transforms for reading and writing from MongoDB.
- */
-package org.apache.beam.sdk.io.mongodb;


[2/2] incubator-beam git commit: [BEAM-634] This closes #949

Posted by jb...@apache.org.
[BEAM-634] This closes #949


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c4036753
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c4036753
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c4036753

Branch: refs/heads/master
Commit: c4036753fe95708cfd14bd360c60bdfd7a4ec953
Parents: d71d828 a0ae04b
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Thu Sep 15 16:59:07 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Thu Sep 15 16:59:07 2016 +0200

----------------------------------------------------------------------
 pom.xml                                         | 35 ++++++++++++++++++--
 runners/direct-java/pom.xml                     |  8 ++++-
 .../direct/BoundedReadEvaluatorFactory.java     |  9 ++---
 .../direct/ParDoMultiEvaluatorFactory.java      | 12 +++----
 .../direct/ParDoSingleEvaluatorFactory.java     |  9 ++---
 .../direct/UnboundedReadEvaluatorFactory.java   |  9 ++---
 .../apache/beam/runners/flink/package-info.java | 22 ------------
 .../src/main/resources/beam/checkstyle.xml      |  3 ++
 .../src/main/resources/beam/suppressions.xml    |  5 +++
 .../beam/sdk/coders/IterableLikeCoder.java      | 22 ++++++------
 .../beam/sdk/coders/protobuf/ProtoCoder.java    |  4 ++-
 .../beam/sdk/util/MergingActiveWindowSet.java   | 12 ++++---
 .../org/apache/beam/sdk/util/PubsubClient.java  |  2 +-
 .../beam/sdk/util/PubsubJsonClientTest.java     | 16 ++++-----
 .../apache/beam/sdk/io/kinesis/KinesisIO.java   |  4 ++-
 .../beam/sdk/io/kinesis/KinesisUploader.java    |  3 +-
 .../beam/sdk/io/kinesis/package-info.java       | 22 ------------
 .../beam/sdk/io/mongodb/package-info.java       | 22 ------------
 18 files changed, 102 insertions(+), 117 deletions(-)
----------------------------------------------------------------------