You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2017/04/27 16:51:26 UTC
[1/2] beam git commit: BEAM-1766 Remove Aggregators from Apex runner
Repository: beam
Updated Branches:
refs/heads/master 0c26d024d -> e6f94a85a
BEAM-1766 Remove Aggregators from Apex runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62fc6489
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62fc6489
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62fc6489
Branch: refs/heads/master
Commit: 62fc6489699130b50237731105884d3a3f993db9
Parents: 634bf4e
Author: Thomas Weise <th...@apache.org>
Authored: Thu Apr 27 09:13:19 2017 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Thu Apr 27 09:13:19 2017 -0700
----------------------------------------------------------------------
.../operators/ApexParDoOperator.java | 47 +-------------------
1 file changed, 2 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/62fc6489/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 52d1d43..f4c617d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -39,11 +39,9 @@ import org.apache.beam.runners.apex.translation.utils.NoOpStepContext;
import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
import org.apache.beam.runners.apex.translation.utils.StateInternalsProxy;
import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable;
-import org.apache.beam.runners.core.AggregatorFactory;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
@@ -57,8 +55,6 @@ import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
@@ -337,7 +333,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
mainOutputTag,
additionalOutputTags,
stepContext,
- new NoOpAggregatorFactory(),
+ null,
windowingStrategy
);
@@ -362,7 +358,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
doFn,
doFnRunner,
stepContext,
- new NoOpAggregatorFactory(),
+ null,
windowingStrategy,
cleanupTimer,
stateCleaner);
@@ -387,45 +383,6 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
public void endWindow() {
}
- /**
- * TODO: Placeholder for aggregation, to be implemented for embedded and cluster mode.
- * It is called from {@link org.apache.beam.runners.core.SimpleDoFnRunner}.
- */
- public static class NoOpAggregatorFactory implements AggregatorFactory {
-
- private NoOpAggregatorFactory() {
- }
-
- @Override
- public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
- Class<?> fnClass, ExecutionContext.StepContext step,
- String name, CombineFn<InputT, AccumT, OutputT> combine) {
- return new NoOpAggregator<>();
- }
-
- private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>,
- java.io.Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void addValue(InputT value) {
- }
-
- @Override
- public String getName() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public CombineFn<InputT, ?, OutputT> getCombineFn() {
- // TODO Auto-generated method stub
- return null;
- }
-
- };
- }
-
private static class LongMin {
long state = Long.MAX_VALUE;
[2/2] beam git commit: This closes #2742
Posted by th...@apache.org.
This closes #2742
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e6f94a85
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e6f94a85
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e6f94a85
Branch: refs/heads/master
Commit: e6f94a85afecfd62fcb243cd026ec936806f4363
Parents: 0c26d02 62fc648
Author: Thomas Weise <th...@apache.org>
Authored: Thu Apr 27 09:51:06 2017 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Thu Apr 27 09:51:06 2017 -0700
----------------------------------------------------------------------
.../operators/ApexParDoOperator.java | 47 +-------------------
1 file changed, 2 insertions(+), 45 deletions(-)
----------------------------------------------------------------------