You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/11 22:10:13 UTC

[3/4] incubator-beam git commit: Fix access levels on SimpleDoFnRunner

Fix access levels on SimpleDoFnRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bac00e1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bac00e1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bac00e1a

Branch: refs/heads/master
Commit: bac00e1a4e0bd7a223f4645e438848c22b830ce1
Parents: 48a7a55
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 10 15:04:42 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Oct 11 15:09:21 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     | 43 ++++++++++----------
 1 file changed, 21 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bac00e1a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 594ca5c..8f25705 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -63,11 +63,11 @@ import org.joda.time.format.PeriodFormat;
 public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
 
   /** The {@link OldDoFn} being run. */
-  public final OldDoFn<InputT, OutputT> fn;
+  private final OldDoFn<InputT, OutputT> fn;
   /** The context used for running the {@link OldDoFn}. */
-  public final DoFnContext<InputT, OutputT> context;
+  private final DoFnContext<InputT, OutputT> context;
 
-  protected SimpleDoFnRunner(PipelineOptions options, OldDoFn<InputT, OutputT> fn,
+  public SimpleDoFnRunner(PipelineOptions options, OldDoFn<InputT, OutputT> fn,
       SideInputReader sideInputReader,
       OutputManager outputManager,
       TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, StepContext stepContext,
@@ -85,16 +85,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
         windowingStrategy == null ? null : windowingStrategy.getWindowFn());
   }
 
-  protected void invokeProcessElement(WindowedValue<InputT> elem) {
-    final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
-    // This can contain user code. Wrap it in case it throws an exception.
-    try {
-      fn.processElement(processContext);
-    } catch (Exception ex) {
-      throw wrapUserCodeException(ex);
-    }
-  }
-
   @Override
   public void startBundle() {
     // This can contain user code. Wrap it in case it throws an exception.
@@ -121,6 +111,16 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     }
   }
 
+  private void invokeProcessElement(WindowedValue<InputT> elem) {
+    final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
+    // This can contain user code. Wrap it in case it throws an exception.
+    try {
+      fn.processElement(processContext);
+    } catch (Exception ex) {
+      throw wrapUserCodeException(ex);
+    }
+  }
+
   @Override
   public void finishBundle() {
     // This can contain user code. Wrap it in case it throws an exception.
@@ -135,12 +135,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
   /**
    * Returns a new {@link OldDoFn.ProcessContext} for the given element.
    */
-  protected OldDoFn<InputT, OutputT>.ProcessContext createProcessContext(
+  private OldDoFn<InputT, OutputT>.ProcessContext createProcessContext(
       WindowedValue<InputT> elem) {
     return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
   }
 
-  protected RuntimeException wrapUserCodeException(Throwable t) {
+  private RuntimeException wrapUserCodeException(Throwable t) {
     throw UserCodeException.wrapIf(!isSystemDoFn(), t);
   }
 
@@ -154,8 +154,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
    * @param <InputT> the type of the {@link OldDoFn} (main) input elements
    * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
    */
-  private static class DoFnContext<InputT, OutputT>
-      extends OldDoFn<InputT, OutputT>.Context {
+  private static class DoFnContext<InputT, OutputT> extends OldDoFn<InputT, OutputT>.Context {
     private static final int MAX_SIDE_OUTPUTS = 1000;
 
     final PipelineOptions options;
@@ -276,7 +275,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       }
     }
 
-    protected <T> void sideOutputWindowedValue(TupleTag<T> tag,
+    private <T> void sideOutputWindowedValue(TupleTag<T> tag,
                                                T output,
                                                Instant timestamp,
                                                Collection<? extends BoundedWindow> windows,
@@ -284,7 +283,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
     }
 
-    protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
+    private <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
       if (!outputTags.contains(tag)) {
         // This tag wasn't declared nor was it seen before during this execution.
         // Thus, this must be a new, undeclared and unconsumed output.
@@ -337,13 +336,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
   }
 
   /**
-   * A concrete implementation of {@link OldDoFn.ProcessContext} used for
-   * running a {@link OldDoFn} over a single element.
+   * A concrete implementation of {@link OldDoFn.ProcessContext} used for running a {@link OldDoFn}
+   * over a single element.
    *
    * @param <InputT> the type of the {@link OldDoFn} (main) input elements
    * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
    */
-  static class DoFnProcessContext<InputT, OutputT>
+  private static class DoFnProcessContext<InputT, OutputT>
       extends OldDoFn<InputT, OutputT>.ProcessContext {