You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/05/02 16:15:11 UTC
[1/2] beam git commit: [BEAM-1764] Remove aggregators from Flink
Runner
Repository: beam
Updated Branches:
refs/heads/master ae72456b7 -> 19105d9be
[BEAM-1764] Remove aggregators from Flink Runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8e94f8f8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8e94f8f8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8e94f8f8
Branch: refs/heads/master
Commit: 8e94f8f827d3fa5cd94f7fffcaf0a7730df86587
Parents: ae72456
Author: Ismaël Mejía <ie...@apache.org>
Authored: Tue May 2 15:59:26 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Tue May 2 18:11:42 2017 +0200
----------------------------------------------------------------------
.../apache/beam/runners/flink/FlinkRunner.java | 3 +-
.../beam/runners/flink/FlinkRunnerResult.java | 15 ++-
.../flink/metrics/FlinkMetricResults.java | 30 +++---
.../functions/FlinkAggregatorFactory.java | 53 -----------
.../functions/FlinkDoFnFunction.java | 2 +-
.../functions/FlinkStatefulDoFnFunction.java | 2 +-
.../SerializableFnAggregatorWrapper.java | 98 --------------------
.../wrappers/streaming/DoFnOperator.java | 27 +-----
8 files changed, 26 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 096f030..181ffda 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -126,8 +126,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
LOG.info("Execution finished in {} msecs", result.getNetRuntime());
Map<String, Object> accumulators = result.getAllAccumulatorResults();
if (accumulators != null && !accumulators.isEmpty()) {
- LOG.info("Final aggregator values:");
-
+ LOG.info("Final accumulator values:");
for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) {
LOG.info("{} : {}", entry.getKey(), entry.getValue());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index dfc1d8e..90dc79b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -27,19 +27,18 @@ import org.joda.time.Duration;
/**
* Result of executing a {@link org.apache.beam.sdk.Pipeline} with Flink. This
- * has methods to query to job runtime and the final values of
- * {@link org.apache.beam.sdk.transforms.Aggregator}s.
+ * has methods to query to job runtime and the final values of the accumulators.
*/
public class FlinkRunnerResult implements PipelineResult {
- private final Map<String, Object> aggregators;
+ private final Map<String, Object> accumulators;
private final long runtime;
- FlinkRunnerResult(Map<String, Object> aggregators, long runtime) {
- this.aggregators = (aggregators == null || aggregators.isEmpty())
+ FlinkRunnerResult(Map<String, Object> accumulators, long runtime) {
+ this.accumulators = (accumulators == null || accumulators.isEmpty())
? Collections.<String, Object>emptyMap()
- : Collections.unmodifiableMap(aggregators);
+ : Collections.unmodifiableMap(accumulators);
this.runtime = runtime;
}
@@ -51,7 +50,7 @@ public class FlinkRunnerResult implements PipelineResult {
@Override
public String toString() {
return "FlinkRunnerResult{"
- + "aggregators=" + aggregators
+ + "accumulators=" + accumulators
+ ", runtime=" + runtime
+ '}';
}
@@ -73,6 +72,6 @@ public class FlinkRunnerResult implements PipelineResult {
@Override
public MetricResults metrics() {
- return new FlinkMetricResults(aggregators);
+ return new FlinkMetricResults(accumulators);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
index 263a68e..9e1430b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
@@ -42,10 +42,10 @@ import org.apache.beam.sdk.metrics.MetricsFilter;
*/
public class FlinkMetricResults extends MetricResults {
- private Map<String, Object> aggregators;
+ private Map<String, Object> accumulators;
- public FlinkMetricResults(Map<String, Object> aggregators) {
- this.aggregators = aggregators;
+ public FlinkMetricResults(Map<String, Object> accumulators) {
+ this.accumulators = accumulators;
}
@Override
@@ -64,12 +64,12 @@ public class FlinkMetricResults extends MetricResults {
@Override
public Iterable<MetricResult<Long>> counters() {
List<MetricResult<Long>> result = new ArrayList<>();
- for (Map.Entry<String, Object> entry : aggregators.entrySet()) {
- if (entry.getKey().startsWith(COUNTER_PREFIX)) {
- MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey());
+ for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) {
+ if (accumulator.getKey().startsWith(COUNTER_PREFIX)) {
+ MetricKey metricKey = FlinkMetricContainer.parseMetricKey(accumulator.getKey());
if (MetricFiltering.matches(filter, metricKey)) {
result.add(new FlinkMetricResult<>(
- metricKey.metricName(), metricKey.stepName(), (Long) entry.getValue()));
+ metricKey.metricName(), metricKey.stepName(), (Long) accumulator.getValue()));
}
}
}
@@ -79,10 +79,10 @@ public class FlinkMetricResults extends MetricResults {
@Override
public Iterable<MetricResult<DistributionResult>> distributions() {
List<MetricResult<DistributionResult>> result = new ArrayList<>();
- for (Map.Entry<String, Object> entry : aggregators.entrySet()) {
- if (entry.getKey().startsWith(DISTRIBUTION_PREFIX)) {
- MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey());
- DistributionData data = (DistributionData) entry.getValue();
+ for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) {
+ if (accumulator.getKey().startsWith(DISTRIBUTION_PREFIX)) {
+ MetricKey metricKey = FlinkMetricContainer.parseMetricKey(accumulator.getKey());
+ DistributionData data = (DistributionData) accumulator.getValue();
if (MetricFiltering.matches(filter, metricKey)) {
result.add(new FlinkMetricResult<>(
metricKey.metricName(), metricKey.stepName(), data.extractResult()));
@@ -95,10 +95,10 @@ public class FlinkMetricResults extends MetricResults {
@Override
public Iterable<MetricResult<GaugeResult>> gauges() {
List<MetricResult<GaugeResult>> result = new ArrayList<>();
- for (Map.Entry<String, Object> entry : aggregators.entrySet()) {
- if (entry.getKey().startsWith(GAUGE_PREFIX)) {
- MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey());
- GaugeData data = (GaugeData) entry.getValue();
+ for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) {
+ if (accumulator.getKey().startsWith(GAUGE_PREFIX)) {
+ MetricKey metricKey = FlinkMetricContainer.parseMetricKey(accumulator.getKey());
+ GaugeData data = (GaugeData) accumulator.getValue();
if (MetricFiltering.matches(filter, metricKey)) {
result.add(new FlinkMetricResult<>(
metricKey.metricName(), metricKey.stepName(), data.extractResult()));
http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
deleted file mode 100644
index fb2493b..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-/**
- * A {@link AggregatorFactory} for the Flink Batch Runner.
- */
-public class FlinkAggregatorFactory implements AggregatorFactory{
-
- private final RuntimeContext runtimeContext;
-
- public FlinkAggregatorFactory(RuntimeContext runtimeContext) {
- this.runtimeContext = runtimeContext;
- }
-
- @Override
- public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
- Class<?> fnClass, ExecutionContext.StepContext stepContext, String aggregatorName,
- Combine.CombineFn<InputT, AccumT, OutputT> combine) {
- @SuppressWarnings("unchecked")
- SerializableFnAggregatorWrapper<InputT, OutputT> result =
- (SerializableFnAggregatorWrapper<InputT, OutputT>)
- runtimeContext.getAccumulator(aggregatorName);
-
- if (result == null) {
- result = new SerializableFnAggregatorWrapper<>(combine);
- runtimeContext.addAccumulator(aggregatorName, result);
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 68ac780..d28e7c4 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -105,7 +105,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
// see SimpleDoFnRunner, just use it to limit number of additional outputs
Collections.<TupleTag<?>>emptyList(),
new FlinkNoOpStepContext(),
- new FlinkAggregatorFactory(runtimeContext),
+ null,
windowingStrategy);
if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 3e02bee..879fad7 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -131,7 +131,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
return timerInternals;
}
},
- new FlinkAggregatorFactory(runtimeContext),
+ null,
windowingStrategy);
if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
deleted file mode 100644
index 70d97e3..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.Serializable;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.flink.api.common.accumulators.Accumulator;
-
-/**
- * Wrapper that wraps a {@link org.apache.beam.sdk.transforms.Combine.CombineFn}
- * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using
- * the function as an aggregator in a {@link org.apache.beam.sdk.transforms.ParDo}
- * operation.
- */
-public class SerializableFnAggregatorWrapper<InputT, OutputT>
- implements Aggregator<InputT, OutputT>, Accumulator<InputT, Serializable> {
-
- private OutputT aa;
- private Combine.CombineFn<InputT, ?, OutputT> combiner;
-
- public SerializableFnAggregatorWrapper(Combine.CombineFn<InputT, ?, OutputT> combiner) {
- this.combiner = combiner;
- resetLocal();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void add(InputT value) {
- this.aa = combiner.apply(ImmutableList.of((InputT) aa, value));
- }
-
- @Override
- public Serializable getLocalValue() {
- return (Serializable) aa;
- }
-
- @Override
- public void resetLocal() {
- this.aa = combiner.apply(ImmutableList.<InputT>of());
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void merge(Accumulator<InputT, Serializable> other) {
- this.aa = combiner.apply(ImmutableList.of((InputT) aa, (InputT) other.getLocalValue()));
- }
-
- @Override
- public void addValue(InputT value) {
- add(value);
- }
-
- @Override
- public String getName() {
- return "Aggregator :" + combiner.toString();
- }
-
- @Override
- public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() {
- return combiner;
- }
-
- @Override
- public Accumulator<InputT, Serializable> clone() {
- try {
- super.clone();
- } catch (CloneNotSupportedException e) {
- // Flink Accumulators cannot throw CloneNotSupportedException, work around that.
- throw new RuntimeException(e);
- }
-
- // copy it by merging
- OutputT resultCopy = combiner.apply(Lists.newArrayList((InputT) aa));
- SerializableFnAggregatorWrapper<InputT, OutputT> result = new
- SerializableFnAggregatorWrapper<>(combiner);
-
- result.aa = resultCopy;
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 54eb770..01830de 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -30,7 +30,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
-import org.apache.beam.runners.core.AggregatorFactory;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.ExecutionContext;
@@ -51,7 +50,6 @@ import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals;
@@ -59,8 +57,6 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkS
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -205,27 +201,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
currentInputWatermark = Long.MIN_VALUE;
currentOutputWatermark = Long.MIN_VALUE;
- AggregatorFactory aggregatorFactory = new AggregatorFactory() {
- @Override
- public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
- Class<?> fnClass,
- ExecutionContext.StepContext stepContext,
- String aggregatorName,
- Combine.CombineFn<InputT, AccumT, OutputT> combine) {
-
- @SuppressWarnings("unchecked")
- SerializableFnAggregatorWrapper<InputT, OutputT> result =
- (SerializableFnAggregatorWrapper<InputT, OutputT>)
- getRuntimeContext().getAccumulator(aggregatorName);
-
- if (result == null) {
- result = new SerializableFnAggregatorWrapper<>(combine);
- getRuntimeContext().addAccumulator(aggregatorName, result);
- }
- return result;
- }
- };
-
sideInputReader = NullSideInputReader.of(sideInputs);
if (!sideInputs.isEmpty()) {
@@ -285,7 +260,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
mainOutputTag,
additionalOutputTags,
stepContext,
- aggregatorFactory,
+ null,
windowingStrategy);
if (doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) {
[2/2] beam git commit: This closes #2822
Posted by ie...@apache.org.
This closes #2822
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/19105d9b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/19105d9b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/19105d9b
Branch: refs/heads/master
Commit: 19105d9be1580f79287c24678e2b6194bc9c704b
Parents: ae72456 8e94f8f
Author: Ismaël Mejía <ie...@apache.org>
Authored: Tue May 2 18:14:41 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Tue May 2 18:14:41 2017 +0200
----------------------------------------------------------------------
.../apache/beam/runners/flink/FlinkRunner.java | 3 +-
.../beam/runners/flink/FlinkRunnerResult.java | 15 ++-
.../flink/metrics/FlinkMetricResults.java | 30 +++---
.../functions/FlinkAggregatorFactory.java | 53 -----------
.../functions/FlinkDoFnFunction.java | 2 +-
.../functions/FlinkStatefulDoFnFunction.java | 2 +-
.../SerializableFnAggregatorWrapper.java | 98 --------------------
.../wrappers/streaming/DoFnOperator.java | 27 +-----
8 files changed, 26 insertions(+), 204 deletions(-)
----------------------------------------------------------------------