You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/09/15 14:59:31 UTC
[1/2] incubator-beam git commit: [BEAM-634] Be able to import Beam
codebase in Eclipse and support m2e
Repository: incubator-beam
Updated Branches:
refs/heads/master d71d828b7 -> c4036753f
[BEAM-634] Be able to import Beam codebase in Eclipse and support m2e
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a0ae04be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a0ae04be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a0ae04be
Branch: refs/heads/master
Commit: a0ae04bef40149cdf54d0ab50909f18a444f3023
Parents: d71d828
Author: Daniel Kulp <dk...@apache.org>
Authored: Mon Sep 12 17:31:22 2016 -0400
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Thu Sep 15 16:26:05 2016 +0200
----------------------------------------------------------------------
pom.xml | 35 ++++++++++++++++++--
runners/direct-java/pom.xml | 8 ++++-
.../direct/BoundedReadEvaluatorFactory.java | 9 ++---
.../direct/ParDoMultiEvaluatorFactory.java | 12 +++----
.../direct/ParDoSingleEvaluatorFactory.java | 9 ++---
.../direct/UnboundedReadEvaluatorFactory.java | 9 ++---
.../apache/beam/runners/flink/package-info.java | 22 ------------
.../src/main/resources/beam/checkstyle.xml | 3 ++
.../src/main/resources/beam/suppressions.xml | 5 +++
.../beam/sdk/coders/IterableLikeCoder.java | 22 ++++++------
.../beam/sdk/coders/protobuf/ProtoCoder.java | 4 ++-
.../beam/sdk/util/MergingActiveWindowSet.java | 12 ++++---
.../org/apache/beam/sdk/util/PubsubClient.java | 2 +-
.../beam/sdk/util/PubsubJsonClientTest.java | 16 ++++-----
.../apache/beam/sdk/io/kinesis/KinesisIO.java | 4 ++-
.../beam/sdk/io/kinesis/KinesisUploader.java | 3 +-
.../beam/sdk/io/kinesis/package-info.java | 22 ------------
.../beam/sdk/io/mongodb/package-info.java | 22 ------------
18 files changed, 102 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 01b5a88..fb3a8a4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,6 +130,8 @@
<stax2.version>3.1.4</stax2.version>
<storage.version>v1-rev71-1.22.0</storage.version>
<woodstox.version>4.4.1</woodstox.version>
+
+ <compiler.error.flag>-Werror</compiler.error.flag>
</properties>
<packaging>pom</packaging>
@@ -225,6 +227,33 @@
</properties>
</profile>
+ <profile>
+ <id>eclipse-jdt</id>
+ <properties>
+ <!-- Tycho doesn't support -Werror -->
+ <compiler.error.flag>-Xlint:all</compiler.error.flag>
+ </properties>
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <compilerId>jdt</compilerId>
+ </configuration>
+ <dependencies>
+ <dependency>
+ <groupId>org.eclipse.tycho</groupId>
+ <artifactId>tycho-compiler-jdt</artifactId>
+ <!-- 0.24.0 is last version to support Java7 -->
+ <version>0.24.0</version>
+ </dependency>
+ </dependencies>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+ </profile>
</profiles>
<dependencyManagement>
@@ -681,7 +710,7 @@
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
- <version>6.17</version>
+ <version>6.19</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
@@ -716,13 +745,13 @@
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
+ <version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<compilerArgs>
<arg>-Xlint:all</arg>
- <arg>-Werror</arg>
+ <arg>${compiler.error.flag}</arg>
<!-- Override options warnings to support cross-compilation -->
<arg>-Xlint:-options</arg>
<!-- Temporary lint overrides, to be removed over time. -->
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index e06883f..354c8c7 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -361,6 +361,12 @@
</exclusions>
<scope>test</scope>
</dependency>
-
+
+ <!-- needed for eclipse-jdt generated core as the test-jar references classes from this -->
+ <dependency>
+ <groupId>com.google.cloud.dataflow</groupId>
+ <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 2046d31..9c77946 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -73,7 +73,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
* already done so.
*/
private <OutputT> TransformEvaluator<?> getTransformEvaluator(
- final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform) {
+ final AppliedPTransform<?, PCollection<OutputT>, ?> transform) {
// Key by the application and the context the evaluation is occurring in (which call to
// Pipeline#run).
Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue =
@@ -83,7 +83,8 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
if (sourceEvaluators.putIfAbsent(transform, evaluatorQueue) == null) {
// If no queue existed in the evaluators, add an evaluator to initialize the evaluator
// factory for this transform
- BoundedSource<OutputT> source = transform.getTransform().getSource();
+ Bounded<OutputT> bound = (Bounded<OutputT>) transform.getTransform();
+ BoundedSource<OutputT> source = bound.getSource();
BoundedReadEvaluator<OutputT> evaluator =
new BoundedReadEvaluator<OutputT>(transform, evaluationContext, source);
evaluatorQueue.offer(evaluator);
@@ -105,7 +106,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
* may produce duplicate elements.
*/
private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
- private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform;
+ private final AppliedPTransform<?, PCollection<OutputT>, ?> transform;
private final EvaluationContext evaluationContext;
/**
* The source being read from by this {@link BoundedReadEvaluator}. This may not be the same as
@@ -114,7 +115,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
private BoundedSource<OutputT> source;
public BoundedReadEvaluator(
- AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
+ AppliedPTransform<?, PCollection<OutputT>, ?> transform,
EvaluationContext evaluationContext,
BoundedSource<OutputT> source) {
this.transform = transform;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index fcb68c4..d909e8b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -1,5 +1,5 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
+* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -39,18 +39,18 @@ import org.slf4j.LoggerFactory;
*/
class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
private static final Logger LOG = LoggerFactory.getLogger(ParDoMultiEvaluatorFactory.class);
- private final LoadingCache<AppliedPTransform<?, ?, BoundMulti<?, ?>>, DoFnLifecycleManager>
- fnClones;
+ private final LoadingCache<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> fnClones;
private final EvaluationContext evaluationContext;
public ParDoMultiEvaluatorFactory(EvaluationContext evaluationContext) {
this.evaluationContext = evaluationContext;
fnClones = CacheBuilder.newBuilder()
- .build(new CacheLoader<AppliedPTransform<?, ?, BoundMulti<?, ?>>, DoFnLifecycleManager>() {
+ .build(new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>() {
@Override
- public DoFnLifecycleManager load(AppliedPTransform<?, ?, BoundMulti<?, ?>> key)
+ public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> key)
throws Exception {
- return DoFnLifecycleManager.of(key.getTransform().getFn());
+ BoundMulti<?, ?> bound = (BoundMulti<?, ?>) key.getTransform();
+ return DoFnLifecycleManager.of(bound.getFn());
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index 91da35f..1a06ea6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
*/
class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
private static final Logger LOG = LoggerFactory.getLogger(ParDoSingleEvaluatorFactory.class);
- private final LoadingCache<AppliedPTransform<?, ?, Bound<?, ?>>, DoFnLifecycleManager> fnClones;
+ private final LoadingCache<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> fnClones;
private final EvaluationContext evaluationContext;
public ParDoSingleEvaluatorFactory(EvaluationContext evaluationContext) {
@@ -47,11 +47,12 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
fnClones =
CacheBuilder.newBuilder()
.build(
- new CacheLoader<AppliedPTransform<?, ?, Bound<?, ?>>, DoFnLifecycleManager>() {
+ new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>() {
@Override
- public DoFnLifecycleManager load(AppliedPTransform<?, ?, Bound<?, ?>> key)
+ public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> key)
throws Exception {
- return DoFnLifecycleManager.of(key.getTransform().getFn());
+ Bound<?, ?> bound = (Bound<?, ?>) key.getTransform();
+ return DoFnLifecycleManager.of(bound.getFn());
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 0dfcd69..9fb3dbf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -82,7 +82,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
*/
private <OutputT, CheckpointMarkT extends CheckpointMark>
TransformEvaluator<?> getTransformEvaluator(
- final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform) {
+ final AppliedPTransform<?, PCollection<OutputT>, ?> transform) {
ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue =
(ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>)
sourceEvaluators.get(transform);
@@ -91,8 +91,9 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
if (sourceEvaluators.putIfAbsent(transform, evaluatorQueue) == null) {
// If no queue existed in the evaluators, add an evaluator to initialize the evaluator
// factory for this transform
+ Unbounded<OutputT> unbounded = (Unbounded<OutputT>) transform.getTransform();
UnboundedSource<OutputT, CheckpointMarkT> source =
- (UnboundedSource<OutputT, CheckpointMarkT>) transform.getTransform().getSource();
+ (UnboundedSource<OutputT, CheckpointMarkT>) unbounded.getSource();
UnboundedReadDeduplicator deduplicator;
if (source.requiresDeduping()) {
deduplicator = UnboundedReadDeduplicator.CachedIdDeduplicator.create();
@@ -130,7 +131,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
implements TransformEvaluator<Object> {
private static final int ARBITRARY_MAX_ELEMENTS = 10;
- private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
+ private final AppliedPTransform<?, PCollection<OutputT>, ?> transform;
private final EvaluationContext evaluationContext;
private final ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>
evaluatorQueue;
@@ -151,7 +152,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
private int outputBundles = 0;
public UnboundedReadEvaluator(
- AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
+ AppliedPTransform<?, PCollection<OutputT>, ?> transform,
EvaluationContext evaluationContext,
UnboundedSource<OutputT, CheckpointMarkT> source,
UnboundedReadDeduplicator deduplicator,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java
deleted file mode 100644
index 57f1e59..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
index c7d9b2c..a3313ca 100644
--- a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
@@ -59,6 +59,9 @@ page at http://checkstyle.sourceforge.net/config.html -->
<property name="onCommentFormat" value="CHECKSTYLE.ON\: ([\w\|]+)"/>
<property name="checkFormat" value="$1"/>
</module>
+ <module name="SuppressionFilter">
+ <property name="file" value="${checkstyle.suppressions.file}" />
+ </module>
<!-- Check that every module has a package-info.java -->
<module name="JavadocPackage"/>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
index 6dbb7f5..00d6729 100644
--- a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
@@ -20,5 +20,10 @@
<suppress checks="JavadocPackage" files=".*/src/test/.*"/>
<suppress checks="JavadocPackage" files=".*/maven-archetypes/.*"/>
<suppress checks="JavadocPackage" files=".*/examples/.*"/>
+
+ <!-- suppress all checks in the generated directories -->
+ <suppress checks=".*" files=".+[\\\/]generated[\\\/].+\.java" />
+ <suppress checks=".*" files=".+[\\\/]generated-sources[\\\/].+\.java" />
+ <suppress checks=".*" files=".+[\\\/]generated-test-sources[\\\/].+\.java" />
</suppressions>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
index 8680552..da64a93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
@@ -140,19 +140,19 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
elements.add(elementCoder.decode(dataInStream, nestedContext));
}
return decodeToIterable(elements);
- } else {
- List<T> elements = new ArrayList<>();
- long count;
- // We don't know the size a priori. Check if we're done with
- // each block of elements.
- while ((count = VarInt.decodeLong(dataInStream)) > 0) {
- while (count > 0) {
- elements.add(elementCoder.decode(dataInStream, nestedContext));
- count -= 1;
- }
+ }
+ List<T> elements = new ArrayList<>();
+ // We don't know the size a priori. Check if we're done with
+ // each block of elements.
+ long count = VarInt.decodeLong(dataInStream);
+ while (count > 0L) {
+ elements.add(elementCoder.decode(dataInStream, nestedContext));
+ --count;
+ if (count == 0L) {
+ count = VarInt.decodeLong(dataInStream);
}
- return decodeToIterable(elements);
}
+ return decodeToIterable(elements);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
index 79fb373..9bba42b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
@@ -364,6 +364,8 @@ public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
return memoizedParser;
}
+ static final TypeDescriptor<Message> CHECK = new TypeDescriptor<Message>() {};
+
/**
* The implementation of the {@link CoderProvider} for this {@link ProtoCoder} returned by
* {@link #coderProvider()}.
@@ -372,7 +374,7 @@ public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
new CoderProvider() {
@Override
public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
- if (!type.isSubtypeOf(new TypeDescriptor<Message>() {})) {
+ if (!type.isSubtypeOf(CHECK)) {
throw new CannotProvideCoderException(
String.format(
"Cannot provide %s because %s is not a subclass of %s",
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java
index 6ad63b0..066579b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java
@@ -144,7 +144,7 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
checkState(stateAddressWindows != null,
"Cannot ensure window %s is active since it is neither ACTIVE nor NEW",
window);
- if (stateAddressWindows.isEmpty()) {
+ if (stateAddressWindows != null && stateAddressWindows.isEmpty()) {
// Window was NEW, make it ACTIVE with itself as its state address window.
stateAddressWindows.add(window);
}
@@ -266,10 +266,12 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
checkState(otherStateAddressWindows != null,
"Window %s is not ACTIVE or NEW", other);
- for (W otherStateAddressWindow : otherStateAddressWindows) {
- // Since otherTarget equiv other AND other equiv mergeResult
- // THEN otherTarget equiv mergeResult.
- newStateAddressWindows.add(otherStateAddressWindow);
+ if (otherStateAddressWindows != null) {
+ for (W otherStateAddressWindow : otherStateAddressWindows) {
+ // Since otherTarget equiv other AND other equiv mergeResult
+ // THEN otherTarget equiv mergeResult.
+ newStateAddressWindows.add(otherStateAddressWindow);
+ }
}
activeWindowToStateAddressWindows.remove(other);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
index fdcee16..bb6aa93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
@@ -114,7 +114,7 @@ public abstract class PubsubClient implements Closeable {
"Cannot interpret value of label %s as timestamp: %s",
timestampLabel, value);
}
- return timestampMsSinceEpoch;
+ return timestampMsSinceEpoch == null ? 0 : timestampMsSinceEpoch;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
index b6d7ccb..72fb9a2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
@@ -93,10 +93,10 @@ public class PubsubJsonClientTest {
.setAckId(ACK_ID);
PullResponse expectedResponse =
new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage));
- Mockito.when(mockPubsub.projects()
- .subscriptions()
- .pull(expectedSubscription, expectedRequest)
- .execute())
+ Mockito.when((Object) (mockPubsub.projects()
+ .subscriptions()
+ .pull(expectedSubscription, expectedRequest)
+ .execute()))
.thenReturn(expectedResponse);
List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
assertEquals(1, acutalMessages.size());
@@ -120,10 +120,10 @@ public class PubsubJsonClientTest {
.setMessages(ImmutableList.of(expectedPubsubMessage));
PublishResponse expectedResponse = new PublishResponse()
.setMessageIds(ImmutableList.of(MESSAGE_ID));
- Mockito.when(mockPubsub.projects()
- .topics()
- .publish(expectedTopic, expectedRequest)
- .execute())
+ Mockito.when((Object) (mockPubsub.projects()
+ .topics()
+ .publish(expectedTopic, expectedRequest)
+ .execute()))
.thenReturn(expectedResponse);
OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, RECORD_ID);
int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index 811051c..acff33f 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -183,7 +183,9 @@ public final class KinesisIO {
@Override
public AmazonKinesis get() {
- return new AmazonKinesisClient(getCredentialsProvider()).withRegion(region);
+ AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider());
+ client.withRegion(region);
+ return client;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
index c98242b..b1c212b 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
@@ -22,7 +22,6 @@ import static com.google.common.collect.Lists.newArrayList;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
@@ -41,7 +40,7 @@ public class KinesisUploader {
public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
public static void uploadAll(List<String> data, KinesisTestOptions options) {
- AmazonKinesis client = new AmazonKinesisClient(
+ AmazonKinesisClient client = new AmazonKinesisClient(
new StaticCredentialsProvider(
new BasicAWSCredentials(
options.getAwsAccessKey(), options.getAwsSecretKey()))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
deleted file mode 100644
index 44dbf4a..0000000
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Transforms for reading and writing from Amazon Kinesis.
- */
-package org.apache.beam.sdk.io.kinesis;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0ae04be/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java
deleted file mode 100644
index fd08b58..0000000
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Transforms for reading and writing from MongoDB.
- */
-package org.apache.beam.sdk.io.mongodb;
[2/2] incubator-beam git commit: [BEAM-634] This closes #949
Posted by jb...@apache.org.
[BEAM-634] This closes #949
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c4036753
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c4036753
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c4036753
Branch: refs/heads/master
Commit: c4036753fe95708cfd14bd360c60bdfd7a4ec953
Parents: d71d828 a0ae04b
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Thu Sep 15 16:59:07 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Thu Sep 15 16:59:07 2016 +0200
----------------------------------------------------------------------
pom.xml | 35 ++++++++++++++++++--
runners/direct-java/pom.xml | 8 ++++-
.../direct/BoundedReadEvaluatorFactory.java | 9 ++---
.../direct/ParDoMultiEvaluatorFactory.java | 12 +++----
.../direct/ParDoSingleEvaluatorFactory.java | 9 ++---
.../direct/UnboundedReadEvaluatorFactory.java | 9 ++---
.../apache/beam/runners/flink/package-info.java | 22 ------------
.../src/main/resources/beam/checkstyle.xml | 3 ++
.../src/main/resources/beam/suppressions.xml | 5 +++
.../beam/sdk/coders/IterableLikeCoder.java | 22 ++++++------
.../beam/sdk/coders/protobuf/ProtoCoder.java | 4 ++-
.../beam/sdk/util/MergingActiveWindowSet.java | 12 ++++---
.../org/apache/beam/sdk/util/PubsubClient.java | 2 +-
.../beam/sdk/util/PubsubJsonClientTest.java | 16 ++++-----
.../apache/beam/sdk/io/kinesis/KinesisIO.java | 4 ++-
.../beam/sdk/io/kinesis/KinesisUploader.java | 3 +-
.../beam/sdk/io/kinesis/package-info.java | 22 ------------
.../beam/sdk/io/mongodb/package-info.java | 22 ------------
18 files changed, 102 insertions(+), 117 deletions(-)
----------------------------------------------------------------------