You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/12 02:28:54 UTC

[36/39] incubator-beam git commit: Fix findbugs issues.

Fix findbugs issues.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99001575
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99001575
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99001575

Branch: refs/heads/master
Commit: 99001575d266798cb5537c8a025735a095ac535e
Parents: c08ebbe
Author: Thomas Weise <th...@apache.org>
Authored: Tue Nov 8 05:02:26 2016 +0100
Committer: Thomas Weise <th...@apache.org>
Committed: Tue Nov 8 09:22:13 2016 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/runners/apex/ApexRunner.java  | 5 +++--
 .../java/org/apache/beam/runners/apex/ApexRunnerResult.java     | 4 ++--
 .../apex/translation/operators/ApexGroupByKeyOperator.java      | 5 +----
 .../runners/apex/translation/operators/ApexParDoOperator.java   | 2 +-
 .../runners/apex/translation/ApexGroupByKeyOperatorTest.java    | 5 +++++
 5 files changed, 12 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index b42dddf..5ce4fef 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -28,6 +28,7 @@ import com.google.common.base.Throwables;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.beam.runners.apex.translation.ApexPipelineTranslator;
 import org.apache.beam.runners.core.AssignWindows;
@@ -73,7 +74,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
    * Holds any most resent assertion error that was raised while processing elements.
    * Used in the unit test driver in embedded mode to propagate the exception.
    */
-  public static volatile AssertionError assertionError;
+  public static final AtomicReference<AssertionError> ASSERTION_ERROR = new AtomicReference<>();
 
   public ApexRunner(ApexPipelineOptions options) {
     this.options = options;
@@ -141,7 +142,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
         // turns off timeout checking for operator progress
         lc.setHeartbeatMonitoringEnabled(false);
       }
-      assertionError = null;
+      ApexRunner.ASSERTION_ERROR.set(null);
       lc.runAsync();
       return new ApexRunnerResult(lma.getDAG(), lc);
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
index 3ae69f2..18b50bc 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -95,8 +95,8 @@ public class ApexRunnerResult implements PipelineResult {
       appDoneField = ctrl.getClass().getDeclaredField("appDone");
       appDoneField.setAccessible(true);
       while (!appDoneField.getBoolean(ctrl) && System.currentTimeMillis() < timeout) {
-        if (ApexRunner.assertionError != null) {
-          throw ApexRunner.assertionError;
+        if (ApexRunner.ASSERTION_ERROR.get() != null) {
+          throw ApexRunner.ASSERTION_ERROR.get();
         }
         Thread.sleep(500);
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 1b5e693..8fbfb03 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -32,7 +32,6 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
@@ -467,9 +466,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
 
   }
 
-  private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable {
-    private static final long serialVersionUID = 1L;
-
+  private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K> {
     @Override
     public StateInternals<K> stateInternalsForKey(K key) {
       return getStateInternalsForKey(key);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 44e7b11..637c3ff 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -240,7 +240,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
       return pushedBack;
     } catch (UserCodeException ue) {
       if (ue.getCause() instanceof AssertionError) {
-        ApexRunner.assertionError = (AssertionError) ue.getCause();
+        ApexRunner.ASSERTION_ERROR.set((AssertionError) ue.getCause());
       }
       throw ue;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
index c0ddb83..fb80d0c 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.apex.translation;
 
 import com.datatorrent.api.Sink;
+import com.datatorrent.lib.util.KryoCloneUtils;
 import com.google.common.collect.Lists;
 
 import java.util.List;
@@ -68,6 +69,10 @@ public class ApexGroupByKeyOperatorTest {
         input, new ApexStateInternals.ApexStateInternalsFactory<String>()
         );
 
+    operator.setup(null);
+    operator.beginWindow(1);
+    Assert.assertNotNull("Serialization", operator = KryoCloneUtils.cloneObject(operator));
+
     final List<Object> results = Lists.newArrayList();
     Sink<Object> sink =  new Sink<Object>() {
       @Override