You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/12 02:28:54 UTC
[36/39] incubator-beam git commit: Fix findbugs issues.
Fix findbugs issues.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99001575
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99001575
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99001575
Branch: refs/heads/master
Commit: 99001575d266798cb5537c8a025735a095ac535e
Parents: c08ebbe
Author: Thomas Weise <th...@apache.org>
Authored: Tue Nov 8 05:02:26 2016 +0100
Committer: Thomas Weise <th...@apache.org>
Committed: Tue Nov 8 09:22:13 2016 +0100
----------------------------------------------------------------------
.../src/main/java/org/apache/beam/runners/apex/ApexRunner.java | 5 +++--
.../java/org/apache/beam/runners/apex/ApexRunnerResult.java | 4 ++--
.../apex/translation/operators/ApexGroupByKeyOperator.java | 5 +----
.../runners/apex/translation/operators/ApexParDoOperator.java | 2 +-
.../runners/apex/translation/ApexGroupByKeyOperatorTest.java | 5 +++++
5 files changed, 12 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index b42dddf..5ce4fef 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -28,6 +28,7 @@ import com.google.common.base.Throwables;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.runners.apex.translation.ApexPipelineTranslator;
import org.apache.beam.runners.core.AssignWindows;
@@ -73,7 +74,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
* Holds any most resent assertion error that was raised while processing elements.
* Used in the unit test driver in embedded mode to propagate the exception.
*/
- public static volatile AssertionError assertionError;
+ public static final AtomicReference<AssertionError> ASSERTION_ERROR = new AtomicReference<>();
public ApexRunner(ApexPipelineOptions options) {
this.options = options;
@@ -141,7 +142,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
// turns off timeout checking for operator progress
lc.setHeartbeatMonitoringEnabled(false);
}
- assertionError = null;
+ ApexRunner.ASSERTION_ERROR.set(null);
lc.runAsync();
return new ApexRunnerResult(lma.getDAG(), lc);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
index 3ae69f2..18b50bc 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -95,8 +95,8 @@ public class ApexRunnerResult implements PipelineResult {
appDoneField = ctrl.getClass().getDeclaredField("appDone");
appDoneField.setAccessible(true);
while (!appDoneField.getBoolean(ctrl) && System.currentTimeMillis() < timeout) {
- if (ApexRunner.assertionError != null) {
- throw ApexRunner.assertionError;
+ if (ApexRunner.ASSERTION_ERROR.get() != null) {
+ throw ApexRunner.ASSERTION_ERROR.get();
}
Thread.sleep(500);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 1b5e693..8fbfb03 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -32,7 +32,6 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import java.io.IOException;
-import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
@@ -467,9 +466,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
}
- private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable {
- private static final long serialVersionUID = 1L;
-
+ private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K> {
@Override
public StateInternals<K> stateInternalsForKey(K key) {
return getStateInternalsForKey(key);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 44e7b11..637c3ff 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -240,7 +240,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
return pushedBack;
} catch (UserCodeException ue) {
if (ue.getCause() instanceof AssertionError) {
- ApexRunner.assertionError = (AssertionError) ue.getCause();
+ ApexRunner.ASSERTION_ERROR.set((AssertionError) ue.getCause());
}
throw ue;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
index c0ddb83..fb80d0c 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.apex.translation;
import com.datatorrent.api.Sink;
+import com.datatorrent.lib.util.KryoCloneUtils;
import com.google.common.collect.Lists;
import java.util.List;
@@ -68,6 +69,10 @@ public class ApexGroupByKeyOperatorTest {
input, new ApexStateInternals.ApexStateInternalsFactory<String>()
);
+ operator.setup(null);
+ operator.beginWindow(1);
+ Assert.assertNotNull("Serialization", operator = KryoCloneUtils.cloneObject(operator));
+
final List<Object> results = Lists.newArrayList();
Sink<Object> sink = new Sink<Object>() {
@Override