You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/05/02 16:15:11 UTC

[1/2] beam git commit: [BEAM-1764] Remove aggregators from Flink Runner

Repository: beam
Updated Branches:
  refs/heads/master ae72456b7 -> 19105d9be


[BEAM-1764] Remove aggregators from Flink Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8e94f8f8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8e94f8f8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8e94f8f8

Branch: refs/heads/master
Commit: 8e94f8f827d3fa5cd94f7fffcaf0a7730df86587
Parents: ae72456
Author: Ismaël Mejía <ie...@apache.org>
Authored: Tue May 2 15:59:26 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Tue May 2 18:11:42 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/runners/flink/FlinkRunner.java  |  3 +-
 .../beam/runners/flink/FlinkRunnerResult.java   | 15 ++-
 .../flink/metrics/FlinkMetricResults.java       | 30 +++---
 .../functions/FlinkAggregatorFactory.java       | 53 -----------
 .../functions/FlinkDoFnFunction.java            |  2 +-
 .../functions/FlinkStatefulDoFnFunction.java    |  2 +-
 .../SerializableFnAggregatorWrapper.java        | 98 --------------------
 .../wrappers/streaming/DoFnOperator.java        | 27 +-----
 8 files changed, 26 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 096f030..181ffda 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -126,8 +126,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
       LOG.info("Execution finished in {} msecs", result.getNetRuntime());
       Map<String, Object> accumulators = result.getAllAccumulatorResults();
       if (accumulators != null && !accumulators.isEmpty()) {
-        LOG.info("Final aggregator values:");
-
+        LOG.info("Final accumulator values:");
         for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) {
           LOG.info("{} : {}", entry.getKey(), entry.getValue());
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index dfc1d8e..90dc79b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -27,19 +27,18 @@ import org.joda.time.Duration;
 
 /**
  * Result of executing a {@link org.apache.beam.sdk.Pipeline} with Flink. This
- * has methods to query to job runtime and the final values of
- * {@link org.apache.beam.sdk.transforms.Aggregator}s.
+ * has methods to query to job runtime and the final values of the accumulators.
  */
 public class FlinkRunnerResult implements PipelineResult {
 
-  private final Map<String, Object> aggregators;
+  private final Map<String, Object> accumulators;
 
   private final long runtime;
 
-  FlinkRunnerResult(Map<String, Object> aggregators, long runtime) {
-    this.aggregators = (aggregators == null || aggregators.isEmpty())
+  FlinkRunnerResult(Map<String, Object> accumulators, long runtime) {
+    this.accumulators = (accumulators == null || accumulators.isEmpty())
         ? Collections.<String, Object>emptyMap()
-        : Collections.unmodifiableMap(aggregators);
+        : Collections.unmodifiableMap(accumulators);
     this.runtime = runtime;
   }
 
@@ -51,7 +50,7 @@ public class FlinkRunnerResult implements PipelineResult {
   @Override
   public String toString() {
     return "FlinkRunnerResult{"
-        + "aggregators=" + aggregators
+        + "accumulators=" + accumulators
         + ", runtime=" + runtime
         + '}';
   }
@@ -73,6 +72,6 @@ public class FlinkRunnerResult implements PipelineResult {
 
   @Override
   public MetricResults metrics() {
-    return new FlinkMetricResults(aggregators);
+    return new FlinkMetricResults(accumulators);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
index 263a68e..9e1430b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
@@ -42,10 +42,10 @@ import org.apache.beam.sdk.metrics.MetricsFilter;
  */
 public class FlinkMetricResults extends MetricResults {
 
-  private Map<String, Object> aggregators;
+  private Map<String, Object> accumulators;
 
-  public FlinkMetricResults(Map<String, Object> aggregators) {
-    this.aggregators = aggregators;
+  public FlinkMetricResults(Map<String, Object> accumulators) {
+    this.accumulators = accumulators;
   }
 
   @Override
@@ -64,12 +64,12 @@ public class FlinkMetricResults extends MetricResults {
     @Override
     public Iterable<MetricResult<Long>> counters() {
       List<MetricResult<Long>> result = new ArrayList<>();
-      for (Map.Entry<String, Object> entry : aggregators.entrySet()) {
-        if (entry.getKey().startsWith(COUNTER_PREFIX)) {
-          MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey());
+      for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) {
+        if (accumulator.getKey().startsWith(COUNTER_PREFIX)) {
+          MetricKey metricKey = FlinkMetricContainer.parseMetricKey(accumulator.getKey());
           if (MetricFiltering.matches(filter, metricKey)) {
             result.add(new FlinkMetricResult<>(
-                metricKey.metricName(), metricKey.stepName(), (Long) entry.getValue()));
+                metricKey.metricName(), metricKey.stepName(), (Long) accumulator.getValue()));
           }
         }
       }
@@ -79,10 +79,10 @@ public class FlinkMetricResults extends MetricResults {
     @Override
     public Iterable<MetricResult<DistributionResult>> distributions() {
       List<MetricResult<DistributionResult>> result = new ArrayList<>();
-      for (Map.Entry<String, Object> entry : aggregators.entrySet()) {
-        if (entry.getKey().startsWith(DISTRIBUTION_PREFIX)) {
-          MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey());
-          DistributionData data = (DistributionData) entry.getValue();
+      for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) {
+        if (accumulator.getKey().startsWith(DISTRIBUTION_PREFIX)) {
+          MetricKey metricKey = FlinkMetricContainer.parseMetricKey(accumulator.getKey());
+          DistributionData data = (DistributionData) accumulator.getValue();
           if (MetricFiltering.matches(filter, metricKey)) {
             result.add(new FlinkMetricResult<>(
                 metricKey.metricName(), metricKey.stepName(), data.extractResult()));
@@ -95,10 +95,10 @@ public class FlinkMetricResults extends MetricResults {
     @Override
     public Iterable<MetricResult<GaugeResult>> gauges() {
       List<MetricResult<GaugeResult>> result = new ArrayList<>();
-      for (Map.Entry<String, Object> entry : aggregators.entrySet()) {
-        if (entry.getKey().startsWith(GAUGE_PREFIX)) {
-          MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey());
-          GaugeData data = (GaugeData) entry.getValue();
+      for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) {
+        if (accumulator.getKey().startsWith(GAUGE_PREFIX)) {
+          MetricKey metricKey = FlinkMetricContainer.parseMetricKey(accumulator.getKey());
+          GaugeData data = (GaugeData) accumulator.getValue();
           if (MetricFiltering.matches(filter, metricKey)) {
             result.add(new FlinkMetricResult<>(
                 metricKey.metricName(), metricKey.stepName(), data.extractResult()));

http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
deleted file mode 100644
index fb2493b..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-/**
- * A {@link AggregatorFactory} for the Flink Batch Runner.
- */
-public class FlinkAggregatorFactory implements AggregatorFactory{
-
-  private final RuntimeContext runtimeContext;
-
-  public FlinkAggregatorFactory(RuntimeContext runtimeContext) {
-    this.runtimeContext = runtimeContext;
-  }
-
-  @Override
-  public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
-      Class<?> fnClass, ExecutionContext.StepContext stepContext, String aggregatorName,
-      Combine.CombineFn<InputT, AccumT, OutputT> combine) {
-    @SuppressWarnings("unchecked")
-    SerializableFnAggregatorWrapper<InputT, OutputT> result =
-        (SerializableFnAggregatorWrapper<InputT, OutputT>)
-            runtimeContext.getAccumulator(aggregatorName);
-
-    if (result == null) {
-      result = new SerializableFnAggregatorWrapper<>(combine);
-      runtimeContext.addAccumulator(aggregatorName, result);
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 68ac780..d28e7c4 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -105,7 +105,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
         // see SimpleDoFnRunner, just use it to limit number of additional outputs
         Collections.<TupleTag<?>>emptyList(),
         new FlinkNoOpStepContext(),
-        new FlinkAggregatorFactory(runtimeContext),
+        null,
         windowingStrategy);
 
     if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))

http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 3e02bee..879fad7 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -131,7 +131,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
             return timerInternals;
           }
         },
-        new FlinkAggregatorFactory(runtimeContext),
+        null,
         windowingStrategy);
 
     if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))

http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
deleted file mode 100644
index 70d97e3..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.Serializable;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.flink.api.common.accumulators.Accumulator;
-
-/**
- * Wrapper that wraps a {@link org.apache.beam.sdk.transforms.Combine.CombineFn}
- * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using
- * the function as an aggregator in a {@link org.apache.beam.sdk.transforms.ParDo}
- * operation.
- */
-public class SerializableFnAggregatorWrapper<InputT, OutputT>
-    implements Aggregator<InputT, OutputT>, Accumulator<InputT, Serializable> {
-
-  private OutputT aa;
-  private Combine.CombineFn<InputT, ?, OutputT> combiner;
-
-  public SerializableFnAggregatorWrapper(Combine.CombineFn<InputT, ?, OutputT> combiner) {
-    this.combiner = combiner;
-    resetLocal();
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void add(InputT value) {
-    this.aa = combiner.apply(ImmutableList.of((InputT) aa, value));
-  }
-
-  @Override
-  public Serializable getLocalValue() {
-    return (Serializable) aa;
-  }
-
-  @Override
-  public void resetLocal() {
-    this.aa = combiner.apply(ImmutableList.<InputT>of());
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void merge(Accumulator<InputT, Serializable> other) {
-    this.aa = combiner.apply(ImmutableList.of((InputT) aa, (InputT) other.getLocalValue()));
-  }
-
-  @Override
-  public void addValue(InputT value) {
-    add(value);
-  }
-
-  @Override
-  public String getName() {
-    return "Aggregator :" + combiner.toString();
-  }
-
-  @Override
-  public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() {
-    return combiner;
-  }
-
-  @Override
-  public Accumulator<InputT, Serializable> clone() {
-    try {
-      super.clone();
-    } catch (CloneNotSupportedException e) {
-      // Flink Accumulators cannot throw CloneNotSupportedException, work around that.
-      throw new RuntimeException(e);
-    }
-
-    // copy it by merging
-    OutputT resultCopy = combiner.apply(Lists.newArrayList((InputT) aa));
-    SerializableFnAggregatorWrapper<InputT, OutputT> result = new
-        SerializableFnAggregatorWrapper<>(combiner);
-
-    result.aa = resultCopy;
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 54eb770..01830de 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -30,7 +30,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.ExecutionContext;
@@ -51,7 +50,6 @@ import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
 import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals;
@@ -59,8 +57,6 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkS
 import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -205,27 +201,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     currentInputWatermark = Long.MIN_VALUE;
     currentOutputWatermark = Long.MIN_VALUE;
 
-    AggregatorFactory aggregatorFactory = new AggregatorFactory() {
-      @Override
-      public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
-          Class<?> fnClass,
-          ExecutionContext.StepContext stepContext,
-          String aggregatorName,
-          Combine.CombineFn<InputT, AccumT, OutputT> combine) {
-
-        @SuppressWarnings("unchecked")
-        SerializableFnAggregatorWrapper<InputT, OutputT> result =
-            (SerializableFnAggregatorWrapper<InputT, OutputT>)
-                getRuntimeContext().getAccumulator(aggregatorName);
-
-        if (result == null) {
-          result = new SerializableFnAggregatorWrapper<>(combine);
-          getRuntimeContext().addAccumulator(aggregatorName, result);
-        }
-        return result;
-      }
-    };
-
     sideInputReader = NullSideInputReader.of(sideInputs);
 
     if (!sideInputs.isEmpty()) {
@@ -285,7 +260,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
         mainOutputTag,
         additionalOutputTags,
         stepContext,
-        aggregatorFactory,
+        null,
         windowingStrategy);
 
     if (doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) {


[2/2] beam git commit: This closes #2822

Posted by ie...@apache.org.
This closes #2822


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/19105d9b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/19105d9b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/19105d9b

Branch: refs/heads/master
Commit: 19105d9be1580f79287c24678e2b6194bc9c704b
Parents: ae72456 8e94f8f
Author: Ismaël Mejía <ie...@apache.org>
Authored: Tue May 2 18:14:41 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Tue May 2 18:14:41 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/runners/flink/FlinkRunner.java  |  3 +-
 .../beam/runners/flink/FlinkRunnerResult.java   | 15 ++-
 .../flink/metrics/FlinkMetricResults.java       | 30 +++---
 .../functions/FlinkAggregatorFactory.java       | 53 -----------
 .../functions/FlinkDoFnFunction.java            |  2 +-
 .../functions/FlinkStatefulDoFnFunction.java    |  2 +-
 .../SerializableFnAggregatorWrapper.java        | 98 --------------------
 .../wrappers/streaming/DoFnOperator.java        | 27 +-----
 8 files changed, 26 insertions(+), 204 deletions(-)
----------------------------------------------------------------------