You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2017/04/27 16:51:26 UTC

[1/2] beam git commit: BEAM-1766 Remove Aggregators from Apex runner

Repository: beam
Updated Branches:
  refs/heads/master 0c26d024d -> e6f94a85a


BEAM-1766 Remove Aggregators from Apex runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62fc6489
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62fc6489
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62fc6489

Branch: refs/heads/master
Commit: 62fc6489699130b50237731105884d3a3f993db9
Parents: 634bf4e
Author: Thomas Weise <th...@apache.org>
Authored: Thu Apr 27 09:13:19 2017 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Thu Apr 27 09:13:19 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexParDoOperator.java            | 47 +-------------------
 1 file changed, 2 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/62fc6489/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 52d1d43..f4c617d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -39,11 +39,9 @@ import org.apache.beam.runners.apex.translation.utils.NoOpStepContext;
 import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.StateInternalsProxy;
 import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable;
-import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
@@ -57,8 +55,6 @@ import org.apache.beam.runners.core.TimerInternalsFactory;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
@@ -337,7 +333,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
         mainOutputTag,
         additionalOutputTags,
         stepContext,
-        new NoOpAggregatorFactory(),
+        null,
         windowingStrategy
         );
 
@@ -362,7 +358,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
           doFn,
           doFnRunner,
           stepContext,
-          new NoOpAggregatorFactory(),
+          null,
           windowingStrategy,
           cleanupTimer,
           stateCleaner);
@@ -387,45 +383,6 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
   public void endWindow() {
   }
 
-  /**
-   * TODO: Placeholder for aggregation, to be implemented for embedded and cluster mode.
-   * It is called from {@link org.apache.beam.runners.core.SimpleDoFnRunner}.
-   */
-  public static class NoOpAggregatorFactory implements AggregatorFactory {
-
-    private NoOpAggregatorFactory() {
-    }
-
-    @Override
-    public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
-        Class<?> fnClass, ExecutionContext.StepContext step,
-        String name, CombineFn<InputT, AccumT, OutputT> combine) {
-      return new NoOpAggregator<>();
-    }
-
-    private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>,
-        java.io.Serializable {
-      private static final long serialVersionUID = 1L;
-
-      @Override
-      public void addValue(InputT value) {
-      }
-
-      @Override
-      public String getName() {
-        // TODO Auto-generated method stub
-        return null;
-      }
-
-      @Override
-      public CombineFn<InputT, ?, OutputT> getCombineFn() {
-        // TODO Auto-generated method stub
-        return null;
-      }
-
-    };
-  }
-
   private static class LongMin {
     long state = Long.MAX_VALUE;
 


[2/2] beam git commit: This closes #2742

Posted by th...@apache.org.
This closes #2742


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e6f94a85
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e6f94a85
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e6f94a85

Branch: refs/heads/master
Commit: e6f94a85afecfd62fcb243cd026ec936806f4363
Parents: 0c26d02 62fc648
Author: Thomas Weise <th...@apache.org>
Authored: Thu Apr 27 09:51:06 2017 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Thu Apr 27 09:51:06 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexParDoOperator.java            | 47 +-------------------
 1 file changed, 2 insertions(+), 45 deletions(-)
----------------------------------------------------------------------