You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/11 22:10:13 UTC
[3/4] incubator-beam git commit: Fix access levels on SimpleDoFnRunner
Fix access levels on SimpleDoFnRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bac00e1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bac00e1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bac00e1a
Branch: refs/heads/master
Commit: bac00e1a4e0bd7a223f4645e438848c22b830ce1
Parents: 48a7a55
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 10 15:04:42 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Oct 11 15:09:21 2016 -0700
----------------------------------------------------------------------
.../beam/runners/core/SimpleDoFnRunner.java | 43 ++++++++++----------
1 file changed, 21 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bac00e1a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 594ca5c..8f25705 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -63,11 +63,11 @@ import org.joda.time.format.PeriodFormat;
public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
/** The {@link OldDoFn} being run. */
- public final OldDoFn<InputT, OutputT> fn;
+ private final OldDoFn<InputT, OutputT> fn;
/** The context used for running the {@link OldDoFn}. */
- public final DoFnContext<InputT, OutputT> context;
+ private final DoFnContext<InputT, OutputT> context;
- protected SimpleDoFnRunner(PipelineOptions options, OldDoFn<InputT, OutputT> fn,
+ public SimpleDoFnRunner(PipelineOptions options, OldDoFn<InputT, OutputT> fn,
SideInputReader sideInputReader,
OutputManager outputManager,
TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, StepContext stepContext,
@@ -85,16 +85,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
windowingStrategy == null ? null : windowingStrategy.getWindowFn());
}
- protected void invokeProcessElement(WindowedValue<InputT> elem) {
- final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
- // This can contain user code. Wrap it in case it throws an exception.
- try {
- fn.processElement(processContext);
- } catch (Exception ex) {
- throw wrapUserCodeException(ex);
- }
- }
-
@Override
public void startBundle() {
// This can contain user code. Wrap it in case it throws an exception.
@@ -121,6 +111,16 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
}
+ private void invokeProcessElement(WindowedValue<InputT> elem) {
+ final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
+ // This can contain user code. Wrap it in case it throws an exception.
+ try {
+ fn.processElement(processContext);
+ } catch (Exception ex) {
+ throw wrapUserCodeException(ex);
+ }
+ }
+
@Override
public void finishBundle() {
// This can contain user code. Wrap it in case it throws an exception.
@@ -135,12 +135,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
/**
* Returns a new {@link OldDoFn.ProcessContext} for the given element.
*/
- protected OldDoFn<InputT, OutputT>.ProcessContext createProcessContext(
+ private OldDoFn<InputT, OutputT>.ProcessContext createProcessContext(
WindowedValue<InputT> elem) {
return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
}
- protected RuntimeException wrapUserCodeException(Throwable t) {
+ private RuntimeException wrapUserCodeException(Throwable t) {
throw UserCodeException.wrapIf(!isSystemDoFn(), t);
}
@@ -154,8 +154,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
* @param <InputT> the type of the {@link OldDoFn} (main) input elements
* @param <OutputT> the type of the {@link OldDoFn} (main) output elements
*/
- private static class DoFnContext<InputT, OutputT>
- extends OldDoFn<InputT, OutputT>.Context {
+ private static class DoFnContext<InputT, OutputT> extends OldDoFn<InputT, OutputT>.Context {
private static final int MAX_SIDE_OUTPUTS = 1000;
final PipelineOptions options;
@@ -276,7 +275,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
}
- protected <T> void sideOutputWindowedValue(TupleTag<T> tag,
+ private <T> void sideOutputWindowedValue(TupleTag<T> tag,
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
@@ -284,7 +283,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
}
- protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
+ private <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
if (!outputTags.contains(tag)) {
// This tag wasn't declared nor was it seen before during this execution.
// Thus, this must be a new, undeclared and unconsumed output.
@@ -337,13 +336,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
/**
- * A concrete implementation of {@link OldDoFn.ProcessContext} used for
- * running a {@link OldDoFn} over a single element.
+ * A concrete implementation of {@link OldDoFn.ProcessContext} used for running a {@link OldDoFn}
+ * over a single element.
*
* @param <InputT> the type of the {@link OldDoFn} (main) input elements
* @param <OutputT> the type of the {@link OldDoFn} (main) output elements
*/
- static class DoFnProcessContext<InputT, OutputT>
+ private static class DoFnProcessContext<InputT, OutputT>
extends OldDoFn<InputT, OutputT>.ProcessContext {