You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/25 18:56:03 UTC
[49/50] incubator-beam git commit: Rename DoFn to OldDoFn in Gearpump
runner
Rename DoFn to OldDoFn in Gearpump runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bc1b3549
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bc1b3549
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bc1b3549
Branch: refs/heads/gearpump-runner
Commit: bc1b354949416db3b52c4f37c66968bdb86f0813
Parents: 40be715
Author: manuzhang <ow...@gmail.com>
Authored: Fri Aug 12 07:22:00 2016 +0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 25 11:38:08 2016 -0700
----------------------------------------------------------------------
.../gearpump/GearpumpPipelineResult.java | 23 ++++++++++++++++++--
.../gearpump/GearpumpPipelineRunner.java | 6 ++---
.../gearpump/examples/StreamingWordCount.java | 6 ++---
.../translators/ParDoBoundMultiTranslator.java | 3 ++-
.../translators/ParDoBoundTranslator.java | 3 ++-
.../translators/functions/DoFnFunction.java | 3 ++-
.../translators/utils/GearpumpDoFnRunner.java | 23 ++++++++++----------
7 files changed, 45 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index bc27147..6184bc3 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -17,11 +17,14 @@
*/
package org.apache.beam.runners.gearpump;
+import org.apache.beam.sdk.AggregatorRetrievalException;
+import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.runners.AggregatorRetrievalException;
-import org.apache.beam.sdk.runners.AggregatorValues;
import org.apache.beam.sdk.transforms.Aggregator;
+import org.joda.time.Duration;
+
+import java.io.IOException;
/**
* Result of executing a {@link Pipeline} with Gearpump.
@@ -33,10 +36,26 @@ public class GearpumpPipelineResult implements PipelineResult {
}
@Override
+ public State cancel() throws IOException {
+ return null;
+ }
+
+ @Override
+ public State waitUntilFinish(Duration duration) throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override
+ public State waitUntilFinish() throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override
public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
throws AggregatorRetrievalException {
throw new AggregatorRetrievalException(
"PipelineResult getAggregatorValues not supported in Gearpump pipeline",
new UnsupportedOperationException());
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
index 660d703..4182ee4 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
@@ -23,8 +23,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -151,7 +151,7 @@ public class GearpumpPipelineRunner extends PipelineRunner<GearpumpPipelineResul
private final Window.Bound<T> wrapped;
- public AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
+ AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
this.wrapped = wrapped;
}
@@ -184,7 +184,7 @@ public class GearpumpPipelineRunner extends PipelineRunner<GearpumpPipelineResul
}
}
- private static class IdentityFn<T> extends DoFn<T, T> {
+ private static class IdentityFn<T> extends OldDoFn<T, T> {
@Override
public void processElement(ProcessContext c) {
c.output(c.element());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
index c51289d..5f35c6b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
*/
public class StreamingWordCount {
- static class ExtractWordsFn extends DoFn<String, String> {
+ static class ExtractWordsFn extends OldDoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
@@ -66,7 +66,7 @@ public class StreamingWordCount {
}
}
- static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+ static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> {
private static final Logger LOG = LoggerFactory.getLogger(FormatAsStringFn.class);
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index af5bcbc..d5ed0d2 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.DoFnRunner;
import org.apache.beam.sdk.util.DoFnRunners;
@@ -92,7 +93,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
public DoFnMultiFunction(
GearpumpPipelineOptions pipelineOptions,
- DoFn<InputT, OutputT> doFn,
+ OldDoFn<InputT, OutputT> doFn,
TupleTag<OutputT> mainOutputTag,
TupleTagList sideOutputTags,
WindowingStrategy<?, ?> windowingStrategy,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
index 689bc08..b97cbb4 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
@@ -21,6 +21,7 @@ package org.apache.beam.runners.gearpump.translators;
import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction;
import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -38,7 +39,7 @@ public class ParDoBoundTranslator<InputT, OutputT> implements
@Override
public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
- DoFn<InputT, OutputT> doFn = transform.getFn();
+ OldDoFn<InputT, OutputT> doFn = transform.getFn();
PCollection<OutputT> output = context.getOutput(transform);
WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index 088fc14..b1ebd2a 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -22,6 +22,7 @@ import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.DoFnRunner;
import org.apache.beam.sdk.util.DoFnRunners;
import org.apache.beam.sdk.util.SideInputReader;
@@ -50,7 +51,7 @@ public class DoFnFunction<InputT, OutputT> implements
public DoFnFunction(
GearpumpPipelineOptions pipelineOptions,
- DoFn<InputT, OutputT> doFn,
+ OldDoFn<InputT, OutputT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
SideInputReader sideInputReader) {
this.doFnRunner = new GearpumpDoFnRunner<>(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
index 608ad7c..be0d025 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -64,7 +65,7 @@ import java.util.Set;
public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT>,
Serializable {
- private final DoFn<InputT, OutputT> fn;
+ private final OldDoFn<InputT, OutputT> fn;
private final transient PipelineOptions options;
private final SideInputReader sideInputReader;
private final DoFnRunners.OutputManager outputManager;
@@ -76,7 +77,7 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O
public GearpumpDoFnRunner(
GearpumpPipelineOptions pipelineOptions,
- DoFn<InputT, OutputT> doFn,
+ OldDoFn<InputT, OutputT> doFn,
SideInputReader sideInputReader,
DoFnRunners.OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
@@ -119,7 +120,7 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O
@Override
public void processElement(WindowedValue<InputT> elem) {
if (elem.getWindows().size() <= 1
- || (!DoFn.RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
+ || (!OldDoFn.RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
&& context.sideInputReader.isEmpty())) {
invokeProcessElement(elem);
} else {
@@ -144,7 +145,7 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O
}
private void invokeProcessElement(WindowedValue<InputT> elem) {
- final DoFn<InputT, OutputT>.ProcessContext processContext =
+ final OldDoFn<InputT, OutputT>.ProcessContext processContext =
new DoFnProcessContext<>(fn, context, elem);
// This can contain user code. Wrap it in case it throws an exception.
try {
@@ -169,11 +170,11 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O
* @param <OutputT> the type of the DoFn's (main) output elements
*/
private static class DoFnContext<InputT, OutputT>
- extends DoFn<InputT, OutputT>.Context {
+ extends OldDoFn<InputT, OutputT>.Context {
private static final int MAX_SIDE_OUTPUTS = 1000;
final transient PipelineOptions options;
- final DoFn<InputT, OutputT> fn;
+ final OldDoFn<InputT, OutputT> fn;
final SideInputReader sideInputReader;
final DoFnRunners.OutputManager outputManager;
final TupleTag<OutputT> mainOutputTag;
@@ -187,7 +188,7 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O
private final Set<TupleTag<?>> outputTags;
public DoFnContext(PipelineOptions options,
- DoFn<InputT, OutputT> fn,
+ OldDoFn<InputT, OutputT> fn,
SideInputReader sideInputReader,
DoFnRunners.OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
@@ -357,14 +358,14 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O
* @param <OutputT> the type of the DoFn's (main) output elements
*/
private static class DoFnProcessContext<InputT, OutputT>
- extends DoFn<InputT, OutputT>.ProcessContext {
+ extends OldDoFn<InputT, OutputT>.ProcessContext {
- final DoFn<InputT, OutputT> fn;
+ final OldDoFn<InputT, OutputT> fn;
final DoFnContext<InputT, OutputT> context;
final WindowedValue<InputT> windowedValue;
- public DoFnProcessContext(DoFn<InputT, OutputT> fn,
+ public DoFnProcessContext(OldDoFn<InputT, OutputT> fn,
DoFnContext<InputT, OutputT> context,
WindowedValue<InputT> windowedValue) {
fn.super();
@@ -409,7 +410,7 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O
@Override
public BoundedWindow window() {
- if (!(fn instanceof DoFn.RequiresWindowAccess)) {
+ if (!(fn instanceof OldDoFn.RequiresWindowAccess)) {
throw new UnsupportedOperationException(
"window() is only available in the context of a DoFn marked as RequiresWindow.");
}