You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/02 23:38:36 UTC
[1/3] beam git commit: Full removal of Aggregators in Java SDK and
Runners
Repository: beam
Updated Branches:
refs/heads/master 5bfd3e049 -> 4682238dc
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
deleted file mode 100644
index cfaf0a6..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.base.MoreObjects;
-import java.io.Serializable;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * An {@link Aggregator} that delegates calls to {@link #addValue} to another aggregator.
- *
- * <p>This {@link Aggregator} is designed to be constructed without a delegate, at pipeline
- * construction time, and serialized within a {@link DoFn}. The delegate aggregator to which it
- * submits values must be provided by the runner at execution time.
- *
- * @param <AggInputT> the type of input element
- * @param <AggOutputT> the type of output element
- */
-public class DelegatingAggregator<AggInputT, AggOutputT>
- implements Aggregator<AggInputT, AggOutputT>, Serializable {
- private static final AtomicInteger ID_GEN = new AtomicInteger();
- private final int id;
-
- private final String name;
-
- private final CombineFn<AggInputT, ?, AggOutputT> combineFn;
-
- private Aggregator<AggInputT, ?> delegate;
-
- public DelegatingAggregator(String name,
- CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
- this.id = ID_GEN.getAndIncrement();
- this.name = checkNotNull(name, "name cannot be null");
- // Safe contravariant cast
- @SuppressWarnings("unchecked")
- CombineFn<AggInputT, ?, AggOutputT> specificCombiner =
- (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null");
- this.combineFn = specificCombiner;
- }
-
- @Override
- public void addValue(AggInputT value) {
- if (delegate == null) {
- throw new IllegalStateException(
- String.format(
- "addValue cannot be called on Aggregator outside of the execution of a %s.",
- DoFn.class.getSimpleName()));
- } else {
- delegate.addValue(value);
- }
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() {
- return combineFn;
- }
-
- /**
- * Sets the current delegate of the Aggregator.
- *
- * @param delegate the delegate to set in this aggregator
- */
- public void setDelegate(Aggregator<AggInputT, ?> delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("name", name)
- .add("combineFn", combineFn)
- .toString();
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, name, combineFn.getClass());
- }
-
- /**
- * Indicates whether some other object is "equal to" this one.
- *
- * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their
- * CombineFns are the same class, and they have identical IDs.
- */
- @Override
- public boolean equals(Object o) {
- if (o == this) {
- return true;
- }
- if (o == null) {
- return false;
- }
- if (o instanceof DelegatingAggregator) {
- DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o;
- return Objects.equals(this.id, that.id)
- && Objects.equals(this.name, that.name)
- && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass());
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
index 4cb1142..d3ebbb7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
@@ -47,8 +47,6 @@ import org.apache.beam.sdk.values.TimestampedValue;
* <p>Example 2: track a latest computed value in an aggregator:
* <pre>{@code
* class MyDoFn extends DoFn<String, String> {
- * private Aggregator<TimestampedValue<Double>, Double> latestValue =
- * createAggregator("latestValue", new Latest.LatestFn<Double>());
*
* {@literal @}ProcessElement
* public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 72cba79..f29aeb9 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -43,7 +43,6 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fake.FakeAggregatorFactory;
import org.apache.beam.fn.harness.fake.FakeStepContext;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
@@ -316,7 +315,6 @@ public class ProcessBundleHandler {
(TupleTag) doFnInfo.getOutputMap().get(doFnInfo.getMainOutput()),
new ArrayList<>(doFnInfo.getOutputMap().values()),
new FakeStepContext(),
- new FakeAggregatorFactory(),
(WindowingStrategy) doFnInfo.getWindowingStrategy());
return runner;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java
deleted file mode 100644
index b3b7f48..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.fake;
-
-import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * A fake implementation of an {@link AggregatorFactory} that is to be filled in at a later time.
- * The factory returns {@link Aggregator}s that do nothing when a value is added.
- */
-public class FakeAggregatorFactory implements AggregatorFactory {
- @Override
- public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
- Class<?> fnClass,
- StepContext stepContext,
- String aggregatorName,
- CombineFn<InputT, AccumT, OutputT> combine) {
- return new Aggregator<InputT, OutputT>() {
- @Override
- public void addValue(InputT value) {}
-
- @Override
- public String getName() {
- return aggregatorName;
- }
-
- @Override
- public CombineFn<InputT, ?, OutputT> getCombineFn() {
- return combine;
- }
- };
- }
-}
[2/3] beam git commit: Full removal of Aggregators in Java SDK and
Runners
Posted by dh...@apache.org.
Full removal of Aggregators in Java SDK and Runners
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/615761a7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/615761a7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/615761a7
Branch: refs/heads/master
Commit: 615761a77d2da6229dfa2cad5376d265afea8a62
Parents: 5bfd3e0
Author: Pablo <pa...@google.com>
Authored: Tue May 2 14:49:39 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 2 16:37:27 2017 -0700
----------------------------------------------------------------------
.../cookbook/CombinePerKeyExamples.java | 4 -
.../beam/runners/core/AggregatorFactory.java | 38 ----
.../apache/beam/runners/core/DoFnRunners.java | 27 ++-
.../apache/beam/runners/core/LateDataUtils.java | 6 +-
.../beam/runners/core/SimpleDoFnRunner.java | 1 -
.../beam/runners/core/SimpleOldDoFnRunner.java | 3 -
.../core/LateDataDroppingDoFnRunnerTest.java | 27 ---
.../beam/runners/core/SimpleDoFnRunnerTest.java | 9 -
.../runners/core/SimpleOldDoFnRunnerTest.java | 2 +-
.../runners/core/StatefulDoFnRunnerTest.java | 1 -
.../runners/direct/AggregatorContainer.java | 200 -------------------
.../beam/runners/direct/EvaluationContext.java | 23 +--
.../GroupAlsoByWindowEvaluatorFactory.java | 23 +--
.../beam/runners/direct/ParDoEvaluator.java | 14 +-
...littableProcessElementsEvaluatorFactory.java | 2 -
.../direct/StatefulParDoEvaluatorFactory.java | 1 -
.../runners/direct/StepTransformResult.java | 8 -
.../beam/runners/direct/TransformResult.java | 6 -
.../beam/runners/direct/ParDoEvaluatorTest.java | 5 -
.../beam/runners/spark/SparkPipelineResult.java | 5 -
.../spark/aggregators/SparkAggregators.java | 110 ----------
.../SparkGroupAlsoByWindowViaWindowSet.java | 57 ++----
.../spark/translation/MultiDoFnFunction.java | 2 -
...SparkGroupAlsoByWindowViaOutputBufferFn.java | 10 -
.../spark/translation/SparkRuntimeContext.java | 81 --------
.../spark/aggregators/ClearAggregatorsRule.java | 38 ----
.../metrics/sink/NamedAggregatorsTest.java | 101 ----------
.../beam/sdk/AggregatorPipelineExtractor.java | 84 --------
.../beam/sdk/AggregatorRetrievalException.java | 33 ---
.../org/apache/beam/sdk/AggregatorValues.java | 51 -----
.../main/java/org/apache/beam/sdk/Pipeline.java | 10 -
.../beam/sdk/annotations/Experimental.java | 3 -
.../apache/beam/sdk/transforms/Aggregator.java | 14 +-
.../sdk/transforms/DelegatingAggregator.java | 126 ------------
.../org/apache/beam/sdk/transforms/Latest.java | 2 -
.../harness/control/ProcessBundleHandler.java | 2 -
.../fn/harness/fake/FakeAggregatorFactory.java | 52 -----
37 files changed, 52 insertions(+), 1129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 39553a5..693f0c4 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -45,10 +45,6 @@ import org.apache.beam.sdk.values.PCollection;
* list of play names in which that word appears, and saves this information
* to a bigquery table.
*
- * <p>Concepts: the Combine.perKey transform, which lets you combine the values in a
- * key-grouped Collection, and how to use an Aggregator to track information in the
- * Monitoring UI.
- *
* <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
* table.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
deleted file mode 100644
index 24a605f..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-
-/**
- * A factory for creating aggregators.
- */
-public interface AggregatorFactory {
- /**
- * Create an aggregator with the given {@code name} and {@link CombineFn}.
- *
- * <p>This method is called to create an aggregator for a {@link DoFn}. It receives the
- * class of the {@link DoFn} being executed and the context of the step it is being
- * executed in.
- */
- <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
- Class<?> fnClass, ExecutionContext.StepContext stepContext,
- String aggregatorName, CombineFn<InputT, AccumT, OutputT> combine);
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 26e57f5..fe33af7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -48,6 +48,27 @@ public class DoFnRunners {
<T> void output(TupleTag<T> tag, WindowedValue<T> output);
}
+ @Deprecated
+ public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
+ PipelineOptions options,
+ DoFn<InputT, OutputT> fn,
+ SideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ StepContext stepContext,
+ Object aggregatorFactory,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ return simpleRunner(options,
+ fn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ additionalOutputTags,
+ stepContext,
+ windowingStrategy);
+ }
+
/**
* Returns an implementation of {@link DoFnRunner} that for a {@link DoFn}.
*
@@ -63,7 +84,6 @@ public class DoFnRunners {
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
- AggregatorFactory aggregatorFactory,
WindowingStrategy<?, ?> windowingStrategy) {
return new SimpleDoFnRunner<>(
options,
@@ -73,7 +93,6 @@ public class DoFnRunners {
mainOutputTag,
additionalOutputTags,
stepContext,
- aggregatorFactory,
windowingStrategy);
}
@@ -90,7 +109,6 @@ public class DoFnRunners {
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
- AggregatorFactory aggregatorFactory,
WindowingStrategy<?, ?> windowingStrategy) {
return new SimpleOldDoFnRunner<>(
options,
@@ -100,7 +118,6 @@ public class DoFnRunners {
mainOutputTag,
additionalOutputTags,
stepContext,
- aggregatorFactory,
windowingStrategy);
}
@@ -151,7 +168,6 @@ public class DoFnRunners {
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
- AggregatorFactory aggregatorFactory,
WindowingStrategy<?, ?> windowingStrategy) {
return new ProcessFnRunner<>(
simpleRunner(
@@ -162,7 +178,6 @@ public class DoFnRunners {
mainOutputTag,
additionalOutputTags,
stepContext,
- aggregatorFactory,
windowingStrategy),
views,
sideInputReader);
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
index 17bd360..982d693 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
@@ -22,7 +22,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.CounterCell;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
@@ -42,7 +42,7 @@ public class LateDataUtils {
Iterable<WindowedValue<V>> elements,
final TimerInternals timerInternals,
final WindowingStrategy<?, ?> windowingStrategy,
- final Aggregator<Long, Long> droppedDueToLateness) {
+ final CounterCell droppedDueToLateness) {
return FluentIterable.from(elements)
.transformAndConcat(
// Explode windows to filter out expired ones
@@ -71,7 +71,7 @@ public class LateDataUtils {
.isBefore(timerInternals.currentInputWatermarkTime());
if (expired) {
// The element is too late for this window.
- droppedDueToLateness.addValue(1L);
+ droppedDueToLateness.inc();
WindowTracing.debug(
"GroupAlsoByWindow: Dropping element at {} for key: {}; "
+ "window: {} since it is too far behind inputWatermark: {}",
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index edce9a2..8a3e25f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -106,7 +106,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
- AggregatorFactory aggregatorFactory,
WindowingStrategy<?, ?> windowingStrategy) {
this.fn = fn;
this.signature = DoFnSignatures.getSignature(fn.getClass());
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index b5f8f45..4c3149a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -66,7 +66,6 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
- AggregatorFactory aggregatorFactory,
WindowingStrategy<?, ?> windowingStrategy) {
this.fn = fn;
this.context = new DoFnContext<>(
@@ -77,7 +76,6 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
mainOutputTag,
additionalOutputTags,
stepContext,
- aggregatorFactory,
windowingStrategy == null ? null : windowingStrategy.getWindowFn());
}
@@ -181,7 +179,6 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
- AggregatorFactory aggregatorFactory,
WindowFn<?, ?> windowFn) {
fn.super();
this.options = options;
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
index 74fb562..bf78427 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
@@ -29,9 +29,6 @@ import org.apache.beam.runners.core.LateDataDroppingDoFnRunner.LateDataFilter;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -102,28 +99,4 @@ public class LateDataDroppingDoFnRunnerTest {
Arrays.asList(WINDOW_FN.assignWindow(timestamp)),
PaneInfo.NO_FIRING);
}
-
- private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> {
- private final String name;
- private long sum = 0;
-
- public InMemoryLongSumAggregator(String name) {
- this.name = name;
- }
-
- @Override
- public void addValue(Long value) {
- sum += value;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public CombineFn<Long, ?, Long> getCombineFn() {
- return Sum.ofLongs();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 4ae5332..3e404ad 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -86,7 +86,6 @@ public class SimpleDoFnRunnerTest {
null,
Collections.<TupleTag<?>>emptyList(),
mockStepContext,
- null,
WindowingStrategy.of(new GlobalWindows()));
thrown.expect(UserCodeException.class);
@@ -107,7 +106,6 @@ public class SimpleDoFnRunnerTest {
null,
Collections.<TupleTag<?>>emptyList(),
mockStepContext,
- null,
WindowingStrategy.of(new GlobalWindows()));
thrown.expect(UserCodeException.class);
@@ -138,7 +136,6 @@ public class SimpleDoFnRunnerTest {
null,
Collections.<TupleTag<?>>emptyList(),
mockStepContext,
- null,
WindowingStrategy.of(new GlobalWindows()));
// Setting the timer needs the current time, as it is set relative
@@ -167,7 +164,6 @@ public class SimpleDoFnRunnerTest {
null,
Collections.<TupleTag<?>>emptyList(),
mockStepContext,
- null,
WindowingStrategy.of(new GlobalWindows()));
thrown.expect(UserCodeException.class);
@@ -188,7 +184,6 @@ public class SimpleDoFnRunnerTest {
null,
Collections.<TupleTag<?>>emptyList(),
mockStepContext,
- null,
WindowingStrategy.of(new GlobalWindows()));
thrown.expect(UserCodeException.class);
@@ -215,7 +210,6 @@ public class SimpleDoFnRunnerTest {
null,
Collections.<TupleTag<?>>emptyList(),
mockStepContext,
- null,
WindowingStrategy.of(windowFn));
Instant currentTime = new Instant(42);
@@ -255,7 +249,6 @@ public class SimpleDoFnRunnerTest {
new TupleTag<Duration>(),
Collections.<TupleTag<?>>emptyList(),
mockStepContext,
- null,
WindowingStrategy.of(new GlobalWindows()));
runner.startBundle();
@@ -292,7 +285,6 @@ public class SimpleDoFnRunnerTest {
new TupleTag<Duration>(),
Collections.<TupleTag<?>>emptyList(),
mockStepContext,
- null,
WindowingStrategy.of(new GlobalWindows()));
runner.startBundle();
@@ -330,7 +322,6 @@ public class SimpleDoFnRunnerTest {
new TupleTag<Duration>(),
Collections.<TupleTag<?>>emptyList(),
mockStepContext,
- null,
WindowingStrategy.of(new GlobalWindows()));
runner.startBundle();
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
index 8ded2dc..a73ef5e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
@@ -67,7 +67,7 @@ public class SimpleOldDoFnRunnerTest {
List<TupleTag<?>> additionalOutputTags = Arrays.asList();
StepContext context = mock(StepContext.class);
return new SimpleOldDoFnRunner<>(
- null, fn, null, null, null, additionalOutputTags, context, null, null);
+ null, fn, null, null, null, additionalOutputTags, context, null);
}
static class ThrowingDoFn extends OldDoFn<String, String> {
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index f80643a..d4ff49e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -204,7 +204,6 @@ public class StatefulDoFnRunnerTest {
null,
Collections.<TupleTag<?>>emptyList(),
mockStepContext,
- null,
WINDOWING_STRATEGY);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
deleted file mode 100644
index fd17704..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.auto.value.AutoValue;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * AccumT container for the current values associated with {@link Aggregator Aggregators}.
- */
-public class AggregatorContainer {
-
- private static class AggregatorInfo<InputT, AccumT, OutputT>
- implements Aggregator<InputT, OutputT> {
- private final String stepName;
- private final String name;
- private final CombineFn<InputT, AccumT, OutputT> combiner;
- @GuardedBy("this")
- private volatile AccumT accumulator = null;
- private boolean committed = false;
-
- private AggregatorInfo(
- String stepName, String name, CombineFn<InputT, AccumT, OutputT> combiner) {
- this.stepName = stepName;
- this.name = name;
- this.combiner = combiner;
- }
-
- @Override
- public synchronized void addValue(InputT input) {
- checkState(!committed, "Cannot addValue after committing");
- if (accumulator == null) {
- accumulator = combiner.createAccumulator();
- }
- accumulator = combiner.addInput(accumulator, input);
- }
-
- public synchronized OutputT getOutput() {
- return accumulator == null ? null : combiner.extractOutput(accumulator);
- }
-
- private void merge(AggregatorInfo<?, ?, ?> other) {
- // Aggregators are only merged if they are the same (same step, same name).
- // As a result, they should also have the same CombineFn, so this is safe.
- AggregatorInfo<InputT, AccumT, OutputT> otherSafe =
- (AggregatorInfo<InputT, AccumT, OutputT>) other;
- mergeSafe(otherSafe);
- }
-
- private synchronized void mergeSafe(AggregatorInfo<InputT, AccumT, OutputT> other) {
- if (accumulator == null) {
- accumulator = other.accumulator;
- } else if (other.accumulator != null) {
- accumulator = combiner.mergeAccumulators(Arrays.asList(accumulator, other.accumulator));
- }
- }
-
- public String getStepName() {
- return name;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public CombineFn<InputT, ?, OutputT> getCombineFn() {
- return combiner;
- }
- }
-
- private final ConcurrentMap<AggregatorKey, AggregatorInfo<?, ?, ?>> accumulators =
- new ConcurrentHashMap<>();
-
- private AggregatorContainer() {
- }
-
- public static AggregatorContainer create() {
- return new AggregatorContainer();
- }
-
- @Nullable
- <OutputT> OutputT getAggregate(String stepName, String aggregatorName) {
- AggregatorInfo<?, ?, OutputT> aggregatorInfo =
- (AggregatorInfo<?, ?, OutputT>) accumulators.get(
- AggregatorKey.create(stepName, aggregatorName));
- return aggregatorInfo == null ? null : aggregatorInfo.getOutput();
- }
-
- public Mutator createMutator() {
- return new Mutator(this);
- }
-
- /**
- * AccumT class for mutations to the aggregator values.
- */
- public static class Mutator implements AggregatorFactory {
-
- private final Map<AggregatorKey, AggregatorInfo<?, ?, ?>> accumulatorDeltas = new HashMap<>();
- private final AggregatorContainer container;
- private boolean committed = false;
-
- private Mutator(AggregatorContainer container) {
- this.container = container;
- }
-
- public void commit() {
- checkState(!committed, "Should not be already committed");
- committed = true;
-
- for (Map.Entry<AggregatorKey, AggregatorInfo<?, ?, ?>> entry : accumulatorDeltas.entrySet()) {
- AggregatorInfo<?, ?, ?> previous = container.accumulators.get(entry.getKey());
- entry.getValue().committed = true;
- if (previous == null) {
- previous = container.accumulators.putIfAbsent(entry.getKey(), entry.getValue());
- }
- if (previous != null) {
- previous.merge(entry.getValue());
- previous.committed = true;
- }
- }
- }
-
- @Override
- public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
- Class<?> fnClass,
- ExecutionContext.StepContext step,
- String name,
- CombineFn<InputT, AccumT, OutputT> combine) {
- return createAggregatorForStep(step, name, combine);
- }
-
- public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createSystemAggregator(
- ExecutionContext.StepContext step,
- String name,
- CombineFn<InputT, AccumT, OutputT> combiner) {
- return createAggregatorForStep(step, name, combiner);
- }
-
- private <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForStep(
- ExecutionContext.StepContext step,
- String name,
- CombineFn<InputT, AccumT, OutputT> combine) {
- checkState(!committed, "Cannot create aggregators after committing");
- AggregatorKey key = AggregatorKey.create(step.getStepName(), name);
- AggregatorInfo<?, ?, ?> aggregatorInfo = accumulatorDeltas.get(key);
- if (aggregatorInfo != null) {
- AggregatorInfo<InputT, ?, OutputT> typedAggregatorInfo =
- (AggregatorInfo<InputT, ?, OutputT>) aggregatorInfo;
- return typedAggregatorInfo;
- } else {
- AggregatorInfo<InputT, ?, OutputT> typedAggregatorInfo =
- new AggregatorInfo<>(step.getStepName(), name, combine);
- accumulatorDeltas.put(key, typedAggregatorInfo);
- return typedAggregatorInfo;
- }
- }
- }
-
- /**
- * Aggregators are identified by a step name and an aggregator name.
- */
- @AutoValue
- public abstract static class AggregatorKey {
- public static AggregatorKey create(String stepName, String aggregatorName) {
- return new AutoValue_AggregatorContainer_AggregatorKey(stepName, aggregatorName);
- }
-
- public abstract String getStepName();
- public abstract String aggregatorName();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index f6d9a36..93d6f96 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -62,7 +62,7 @@ import org.joda.time.Instant;
* <p>{@link EvaluationContext} contains shared state for an execution of the
* {@link DirectRunner} that can be used while evaluating a {@link PTransform}. This
* consists of views into underlying state and watermark implementations, access to read and write
- * {@link PCollectionView PCollectionViews}, and managing the {@link AggregatorContainer} and
+ * {@link PCollectionView PCollectionViews}, and managing the
* {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when
* state changes to the appropriate point (e.g. when a {@link PCollectionView} is requested and
* known to be empty).
@@ -95,8 +95,6 @@ class EvaluationContext {
private final SideInputContainer sideInputContainer;
- private final AggregatorContainer mergedAggregators;
-
private final DirectMetrics metrics;
private final Set<PValue> keyedPValues;
@@ -126,7 +124,6 @@ class EvaluationContext {
this.sideInputContainer = SideInputContainer.create(this, graph.getViews());
this.applicationStateInternals = new ConcurrentHashMap<>();
- this.mergedAggregators = AggregatorContainer.create();
this.metrics = new DirectMetrics();
this.callbackExecutor =
@@ -174,10 +171,6 @@ class EvaluationContext {
: completedBundle.withElements((Iterable) result.getUnprocessedElements()),
committedBundles,
outputTypes);
- // Commit aggregator changes
- if (result.getAggregatorChanges() != null) {
- result.getAggregatorChanges().commit();
- }
// Update state internals
CopyOnAccessInMemoryStateInternals theirState = result.getState();
if (theirState != null) {
@@ -362,20 +355,6 @@ class EvaluationContext {
return sideInputContainer.createReaderForViews(sideInputs);
}
- /**
- * Returns a new mutator for the {@link AggregatorContainer}.
- */
- public AggregatorContainer.Mutator getAggregatorMutator() {
- return mergedAggregators.createMutator();
- }
-
- /**
- * Returns the counter container for this context.
- */
- public AggregatorContainer getAggregatorContainer() {
- return mergedAggregators;
- }
-
/** Returns the metrics container for this pipeline. */
public DirectMetrics getMetrics() {
return metrics;
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 9f567a4..d006553 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -42,10 +42,10 @@ import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowTracing;
@@ -113,11 +113,10 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
private final StructuralKey<?> structuralKey;
private final Collection<UncommittedBundle<?>> outputBundles;
private final ImmutableList.Builder<WindowedValue<KeyedWorkItem<K, V>>> unprocessedElements;
- private final AggregatorContainer.Mutator aggregatorChanges;
private final SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
- private final Aggregator<Long, Long> droppedDueToClosedWindow;
- private final Aggregator<Long, Long> droppedDueToLateness;
+ private final Counter droppedDueToClosedWindow;
+ private final Counter droppedDueToLateness;
public GroupAlsoByWindowEvaluator(
final EvaluationContext evaluationContext,
@@ -140,17 +139,14 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
outputBundles = new ArrayList<>();
unprocessedElements = ImmutableList.builder();
- aggregatorChanges = evaluationContext.getAggregatorMutator();
Coder<V> valueCoder =
application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder());
reduceFn = SystemReduceFn.buffering(valueCoder);
- droppedDueToClosedWindow = aggregatorChanges.createSystemAggregator(stepContext,
- GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
- Sum.ofLongs());
- droppedDueToLateness = aggregatorChanges.createSystemAggregator(stepContext,
- GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER,
- Sum.ofLongs());
+ droppedDueToClosedWindow = Metrics.counter(GroupAlsoByWindowEvaluator.class,
+ GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
+ droppedDueToLateness = Metrics.counter(GroupAlsoByWindowEvaluator.class,
+ GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER);
}
@Override
@@ -197,7 +193,6 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
.withState(state)
.addOutput(outputBundles)
.withTimerUpdate(stepContext.getTimerUpdate())
- .withAggregatorChanges(aggregatorChanges)
.addUnprocessedElements(unprocessedElements.build())
.build();
}
@@ -229,7 +224,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
.isBefore(timerInternals.currentInputWatermarkTime());
if (expired) {
// The element is too late for this window.
- droppedDueToLateness.addValue(1L);
+ droppedDueToLateness.inc();
WindowTracing.debug(
"GroupAlsoByWindow: Dropping element at {} for key: {}; "
+ "window: {} since it is too far behind inputWatermark: {}",
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 053da31..2ea8a91 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -54,7 +54,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
DirectStepContext stepContext,
- AggregatorContainer.Mutator aggregatorChanges,
WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy);
}
@@ -70,7 +69,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
DirectStepContext stepContext,
- AggregatorContainer.Mutator aggregatorChanges,
WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) {
DoFnRunner<InputT, OutputT> underlying =
DoFnRunners.simpleRunner(
@@ -81,7 +79,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
mainOutputTag,
additionalOutputTags,
stepContext,
- aggregatorChanges,
windowingStrategy);
return SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
}
@@ -100,7 +97,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
List<TupleTag<?>> additionalOutputTags,
Map<TupleTag<?>, PCollection<?>> outputs,
DoFnRunnerFactory<InputT, OutputT> runnerFactory) {
- AggregatorContainer.Mutator aggregatorChanges = evaluationContext.getAggregatorMutator();
BundleOutputManager outputManager = createOutputManager(evaluationContext, key, outputs);
@@ -116,19 +112,17 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
mainOutputTag,
additionalOutputTags,
stepContext,
- aggregatorChanges,
windowingStrategy);
- return create(runner, stepContext, application, aggregatorChanges, outputManager);
+ return create(runner, stepContext, application, outputManager);
}
public static <InputT, OutputT> ParDoEvaluator<InputT> create(
PushbackSideInputDoFnRunner<InputT, OutputT> runner,
DirectStepContext stepContext,
AppliedPTransform<?, ?, ?> application,
- AggregatorContainer.Mutator aggregatorChanges,
BundleOutputManager outputManager) {
- return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext);
+ return new ParDoEvaluator<>(runner, application, outputManager, stepContext);
}
static BundleOutputManager createOutputManager(
@@ -155,7 +149,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner;
private final AppliedPTransform<?, ?, ?> transform;
- private final AggregatorContainer.Mutator aggregatorChanges;
private final BundleOutputManager outputManager;
private final DirectStepContext stepContext;
@@ -164,14 +157,12 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
private ParDoEvaluator(
PushbackSideInputDoFnRunner<InputT, ?> fnRunner,
AppliedPTransform<?, ?, ?> transform,
- AggregatorContainer.Mutator aggregatorChanges,
BundleOutputManager outputManager,
DirectStepContext stepContext) {
this.fnRunner = fnRunner;
this.transform = transform;
this.outputManager = outputManager;
this.stepContext = stepContext;
- this.aggregatorChanges = aggregatorChanges;
this.unprocessedElements = ImmutableList.builder();
try {
@@ -222,7 +213,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
return resultBuilder
.addOutput(outputManager.bundles.values())
.withTimerUpdate(stepContext.getTimerUpdate())
- .withAggregatorChanges(aggregatorChanges)
.addUnprocessedElements(unprocessedElements.build())
.build();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index e0adc40..5f6b4f7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -197,7 +197,6 @@ class SplittableProcessElementsEvaluatorFactory<
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
DirectExecutionContext.DirectStepContext stepContext,
- AggregatorContainer.Mutator aggregatorChanges,
WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) {
ProcessFn<InputT, OutputT, RestrictionT, ?> processFn =
(ProcessFn) fn;
@@ -210,7 +209,6 @@ class SplittableProcessElementsEvaluatorFactory<
mainOutputTag,
additionalOutputTags,
stepContext,
- aggregatorChanges,
windowingStrategy);
}
};
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 93ab077..7cf3840 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -261,7 +261,6 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
delegateResult.getTransform(), delegateResult.getWatermarkHold())
.withTimerUpdate(delegateResult.getTimerUpdate())
.withState(delegateResult.getState())
- .withAggregatorChanges(delegateResult.getAggregatorChanges())
.withMetricUpdates(delegateResult.getLogicalMetricUpdates())
.addOutput(Lists.newArrayList(delegateResult.getOutputBundles()));
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index fe3ae97..2a2ccab 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -54,7 +54,6 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp
getTransform(),
getOutputBundles(),
getUnprocessedElements(),
- getAggregatorChanges(),
metricUpdates,
getWatermarkHold(),
getState(),
@@ -72,7 +71,6 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp
private MetricUpdates metricUpdates;
private CopyOnAccessInMemoryStateInternals state;
private TimerUpdate timerUpdate;
- private AggregatorContainer.Mutator aggregatorChanges;
private final Set<OutputType> producedOutputs;
private final Instant watermarkHold;
@@ -91,7 +89,6 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp
transform,
bundlesBuilder.build(),
unprocessedElementsBuilder.build(),
- aggregatorChanges,
metricUpdates,
watermarkHold,
state,
@@ -99,11 +96,6 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp
producedOutputs);
}
- public Builder<InputT> withAggregatorChanges(AggregatorContainer.Mutator aggregatorChanges) {
- this.aggregatorChanges = aggregatorChanges;
- return this;
- }
-
public Builder<InputT> withMetricUpdates(MetricUpdates metricUpdates) {
this.metricUpdates = metricUpdates;
return this;
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
index bde44ca..3a95df7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
@@ -61,12 +61,6 @@ public interface TransformResult<InputT> {
Iterable<? extends WindowedValue<InputT>> getUnprocessedElements();
/**
- * Returns the {@link AggregatorContainer.Mutator} used by this {@link PTransform}, or null if
- * this transform did not use an {@link AggregatorContainer.Mutator}.
- */
- @Nullable AggregatorContainer.Mutator getAggregatorChanges();
-
- /**
* Returns the logical metric updates.
*/
MetricUpdates getLogicalMetricUpdates();
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index e99e4bf..69dbc22 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -150,11 +150,6 @@ public class ParDoEvaluatorTest {
Mockito.any(AppliedPTransform.class), Mockito.any(StructuralKey.class)))
.thenReturn(executionContext);
- AggregatorContainer container = AggregatorContainer.create();
- AggregatorContainer.Mutator mutator = container.createMutator();
- when(evaluationContext.getAggregatorContainer()).thenReturn(container);
- when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
@SuppressWarnings("unchecked")
AppliedPTransform<PCollection<Integer>, ?, ?> transform =
(AppliedPTransform<PCollection<Integer>, ?, ?>) DirectGraphs.getProducer(output);
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
index 1110a55..3e94a45 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.beam.runners.spark.aggregators.SparkAggregators;
import org.apache.beam.runners.spark.metrics.SparkMetricResults;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.sdk.Pipeline;
@@ -77,10 +76,6 @@ public abstract class SparkPipelineResult implements PipelineResult {
protected abstract State awaitTermination(Duration duration)
throws TimeoutException, ExecutionException, InterruptedException;
- public <T> T getAggregatorValue(final String name, final Class<T> resultType) {
- return SparkAggregators.valueOf(name, resultType);
- }
-
@Override
public PipelineResult.State getState() {
return state;
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
deleted file mode 100644
index 1da196b..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.aggregators;
-
-import com.google.common.collect.ImmutableList;
-import java.util.Collection;
-import java.util.Map;
-import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
-import org.apache.beam.sdk.AggregatorValues;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.spark.Accumulator;
-
-/**
- * A utility class for handling Beam {@link Aggregator}s.
- */
-public class SparkAggregators {
-
- private static <T> AggregatorValues<T> valueOf(final Accumulator<NamedAggregators> accum,
- final Aggregator<?, T> aggregator) {
- @SuppressWarnings("unchecked")
- Class<T> valueType = (Class<T>) aggregator.getCombineFn().getOutputType().getRawType();
- final T value = valueOf(accum, aggregator.getName(), valueType);
-
- return new AggregatorValues<T>() {
-
- @Override
- public Collection<T> getValues() {
- return ImmutableList.of(value);
- }
-
- @Override
- public Map<String, T> getValuesAtSteps() {
- throw new UnsupportedOperationException("getValuesAtSteps is not supported.");
- }
- };
- }
-
- private static <T> T valueOf(final Accumulator<NamedAggregators> accum,
- final String aggregatorName,
- final Class<T> typeClass) {
- return accum.value().getValue(aggregatorName, typeClass);
- }
-
- /**
- * Retrieves the value of an aggregator from a SparkContext instance.
- *
- * @param aggregator The aggregator whose value to retrieve
- * @param <T> The type of the aggregator's output
- * @return The value of the aggregator
- */
- public static <T> AggregatorValues<T> valueOf(final Aggregator<?, T> aggregator) {
- return valueOf(AggregatorsAccumulator.getInstance(), aggregator);
- }
-
- /**
- * Retrieves the value of an aggregator from a SparkContext instance.
- *
- * @param name Name of the aggregator to retrieve the value of.
- * @param typeClass Type class of value to be retrieved.
- * @param <T> Type of object to be returned.
- * @return The value of the aggregator.
- */
- public static <T> T valueOf(final String name, final Class<T> typeClass) {
- return valueOf(AggregatorsAccumulator.getInstance(), name, typeClass);
- }
-
- /**
- * An implementation of {@link AggregatorFactory} for the SparkRunner.
- */
- public static class Factory implements AggregatorFactory {
-
- private final SparkRuntimeContext runtimeContext;
- private final Accumulator<NamedAggregators> accumulator;
-
- public Factory(SparkRuntimeContext runtimeContext, Accumulator<NamedAggregators> accumulator) {
- this.runtimeContext = runtimeContext;
- this.accumulator = accumulator;
- }
-
- @Override
- public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
- Class<?> fnClass,
- ExecutionContext.StepContext stepContext,
- String aggregatorName,
- Combine.CombineFn<InputT, AccumT, OutputT> combine) {
-
- return runtimeContext.createAggregator(accumulator, aggregatorName, combine);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index c59e0e7..4a2851d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -43,9 +43,9 @@ import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.metrics.CounterCell;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -207,10 +207,13 @@ public class SparkGroupAlsoByWindowViaWindowSet {
new OutputWindowedValueHolder<>();
// use in memory Aggregators since Spark Accumulators are not resilient
// in stateful operators, once done with this partition.
- final InMemoryLongSumAggregator droppedDueToClosedWindow = new InMemoryLongSumAggregator(
- GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
- final InMemoryLongSumAggregator droppedDueToLateness = new InMemoryLongSumAggregator(
- GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER);
+ final MetricsContainer cellProvider = new MetricsContainer("cellProvider");
+ final CounterCell droppedDueToClosedWindow = cellProvider.getCounter(
+ MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class,
+ GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER));
+ final CounterCell droppedDueToLateness = cellProvider.getCounter(
+ MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class,
+ GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER));
AbstractIterator<
Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>
@@ -315,15 +318,15 @@ public class SparkGroupAlsoByWindowViaWindowSet {
};
// log if there's something to log.
- long lateDropped = droppedDueToLateness.getSum();
+ long lateDropped = droppedDueToLateness.getCumulative();
if (lateDropped > 0) {
LOG.info(String.format("Dropped %d elements due to lateness.", lateDropped));
- droppedDueToLateness.zero();
+ droppedDueToLateness.inc(-droppedDueToLateness.getCumulative());
}
- long closedWindowDropped = droppedDueToClosedWindow.getSum();
+ long closedWindowDropped = droppedDueToClosedWindow.getCumulative();
if (closedWindowDropped > 0) {
LOG.info(String.format("Dropped %d elements due to closed window.", closedWindowDropped));
- droppedDueToClosedWindow.zero();
+ droppedDueToClosedWindow.inc(-droppedDueToClosedWindow.getCumulative());
}
return scala.collection.JavaConversions.asScalaIterator(outIter);
@@ -421,36 +424,4 @@ public class SparkGroupAlsoByWindowViaWindowSet {
"Tagged outputs are not allowed in GroupAlsoByWindow.");
}
}
-
- private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> {
- private final String name;
- private long sum = 0;
-
- public void zero() {
- sum = 0;
- }
-
- public long getSum() {
- return sum;
- }
-
- InMemoryLongSumAggregator(String name) {
- this.name = name;
- }
-
- @Override
- public void addValue(Long value) {
- sum += value;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public Combine.CombineFn<Long, ?, Long> getCombineFn() {
- return Sum.ofLongs();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 4cd1683..410b7de 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -28,7 +28,6 @@ import java.util.Map;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
-import org.apache.beam.runners.spark.aggregators.SparkAggregators;
import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.runners.spark.util.SparkSideInputReader;
@@ -104,7 +103,6 @@ public class MultiDoFnFunction<InputT, OutputT>
mainOutputTag,
Collections.<TupleTag<?>>emptyList(),
new SparkProcessContext.NoOpStepContext(),
- new SparkAggregators.Factory(runtimeContext, aggAccum),
windowingStrategy);
DoFnRunnerWithMetrics<InputT, OutputT> doFnRunnerWithMetrics =
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index 063feef..9ee52de 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.beam.runners.core.GroupAlsoByWindowViaOutputBufferDoFn;
-import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
@@ -35,8 +34,6 @@ import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -59,8 +56,6 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde
private final StateInternalsFactory<K> stateInternalsFactory;
private final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn;
private final SparkRuntimeContext runtimeContext;
- private final Aggregator<Long, Long> droppedDueToClosedWindow;
-
public SparkGroupAlsoByWindowViaOutputBufferFn(
WindowingStrategy<?, W> windowingStrategy,
@@ -72,11 +67,6 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde
this.stateInternalsFactory = stateInternalsFactory;
this.reduceFn = reduceFn;
this.runtimeContext = runtimeContext;
-
- droppedDueToClosedWindow = runtimeContext.createAggregator(
- accumulator,
- GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
- Sum.ofLongs());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 6abab17..6bba863 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -22,19 +22,11 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.spark.Accumulator;
/**
* The SparkRuntimeContext allows us to define useful features on the client side before our
@@ -44,9 +36,6 @@ public class SparkRuntimeContext implements Serializable {
private final String serializedPipelineOptions;
private transient CoderRegistry coderRegistry;
- // map for names to Beam aggregators.
- private final Map<String, Aggregator<?, ?>> aggregators = new HashMap<>();
-
SparkRuntimeContext(Pipeline pipeline) {
this.serializedPipelineOptions = serializePipelineOptions(pipeline.getOptions());
}
@@ -71,45 +60,6 @@ public class SparkRuntimeContext implements Serializable {
return PipelineOptionsHolder.getOrInit(serializedPipelineOptions);
}
- /**
- * Creates and aggregator and associates it with the specified name.
- *
- * @param accum Spark Accumulator.
- * @param named Name of aggregator.
- * @param combineFn Combine function used in aggregation.
- * @param <InputT> Type of inputs to aggregator.
- * @param <InterT> Intermediate data type
- * @param <OutputT> Type of aggregator outputs.
- * @return Specified aggregator
- */
- public synchronized <InputT, InterT, OutputT> Aggregator<InputT, OutputT> createAggregator(
- Accumulator<NamedAggregators> accum,
- String named,
- Combine.CombineFn<? super InputT, InterT, OutputT> combineFn) {
- @SuppressWarnings("unchecked")
- Aggregator<InputT, OutputT> aggregator = (Aggregator<InputT, OutputT>) aggregators.get(named);
- try {
- if (aggregator == null) {
- @SuppressWarnings("unchecked")
- final
- NamedAggregators.CombineFunctionState<InputT, InterT, OutputT> state =
- new NamedAggregators.CombineFunctionState<>(
- (Combine.CombineFn<InputT, InterT, OutputT>) combineFn,
- // hidden assumption: InputT == OutputT
- (Coder<InputT>) getCoderRegistry().getCoder(combineFn.getOutputType()),
- this);
-
- accum.add(new NamedAggregators(named, state));
- aggregator = new SparkAggregator<>(named, state);
- aggregators.put(named, aggregator);
- }
- return aggregator;
- } catch (CannotProvideCoderException e) {
- throw new RuntimeException(String.format("Unable to create an aggregator named: [%s]", named),
- e);
- }
- }
-
public CoderRegistry getCoderRegistry() {
if (coderRegistry == null) {
coderRegistry = CoderRegistry.createDefault();
@@ -135,35 +85,4 @@ public class SparkRuntimeContext implements Serializable {
return pipelineOptions;
}
}
-
- /**
- * Initialize spark aggregators exactly once.
- *
- * @param <InputT> Type of element fed in to aggregator.
- */
- private static class SparkAggregator<InputT, OutputT>
- implements Aggregator<InputT, OutputT>, Serializable {
- private final String name;
- private final NamedAggregators.State<InputT, ?, OutputT> state;
-
- SparkAggregator(String name, NamedAggregators.State<InputT, ?, OutputT> state) {
- this.name = name;
- this.state = state;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public void addValue(InputT elem) {
- state.update(elem);
- }
-
- @Override
- public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() {
- return state.getCombineFn();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java
deleted file mode 100644
index 0b31acc..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.aggregators;
-
-import org.junit.rules.ExternalResource;
-
-
-/**
- * A rule that clears the {@link AggregatorsAccumulator}
- * which represents the Beam {@link org.apache.beam.sdk.transforms.Aggregator}s.
- */
-public class ClearAggregatorsRule extends ExternalResource {
-
- @Override
- protected void before() throws Throwable {
- clearNamedAggregators();
- }
-
- public void clearNamedAggregators() {
- AggregatorsAccumulator.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
deleted file mode 100644
index dbd8cac..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.aggregators.metrics.sink;
-
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableSet;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import org.apache.beam.runners.spark.PipelineRule;
-import org.apache.beam.runners.spark.aggregators.ClearAggregatorsRule;
-import org.apache.beam.runners.spark.aggregators.SparkAggregators;
-import org.apache.beam.runners.spark.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExternalResource;
-
-
-/**
- * A test for the NamedAggregators mechanism.
- */
-public class NamedAggregatorsTest {
-
- @Rule
- public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule();
-
- @Rule
- public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule();
-
- @Rule
- public final PipelineRule pipelineRule = PipelineRule.batch();
-
- private Pipeline createSparkPipeline() {
- pipelineRule.getOptions().setEnableSparkMetricSinks(true);
- return pipelineRule.createPipeline();
- }
-
- private void runPipeline() {
-
- final List<String> words =
- Arrays.asList("hi there", "hi", "hi sue bob", "hi sue", "", "bob hi");
-
- final Set<String> expectedCounts =
- ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
-
- final Pipeline pipeline = createSparkPipeline();
-
- final PCollection<String> output =
- pipeline
- .apply(Create.of(words).withCoder(StringUtf8Coder.of()))
- .apply(new WordCount.CountWords())
- .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
- PAssert.that(output).containsInAnyOrder(expectedCounts);
-
- pipeline.run();
- }
-
- @Test
- public void testNamedAggregators() throws Exception {
- assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue()));
-
- runPipeline();
-
- assertThat(InMemoryMetrics.<Double>valueOf("emptyLines"), is(1d));
- }
-
- @Test
- public void testNonExistingAggregatorName() throws Exception {
- runPipeline();
-
- final Long valueOf = SparkAggregators.valueOf("myMissingAggregator", Long.class);
-
- assertThat(valueOf, is(nullValue()));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
deleted file mode 100644
index eeb9b45..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.SetMultimap;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PValue;
-
-/**
- * Retrieves {@link Aggregator Aggregators} at each {@link ParDo} and returns a {@link Map} of
- * {@link Aggregator} to the {@link PTransform PTransforms} in which it is present.
- */
-@Deprecated
-class AggregatorPipelineExtractor {
- private final Pipeline pipeline;
-
- /**
- * Creates an {@code AggregatorPipelineExtractor} for the given {@link Pipeline}.
- */
- public AggregatorPipelineExtractor(Pipeline pipeline) {
- this.pipeline = pipeline;
- }
-
- /**
- * Returns a {@link Map} between each {@link Aggregator} in the {@link Pipeline} to the {@link
- * PTransform PTransforms} in which it is used.
- */
- public Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> getAggregatorSteps() {
- HashMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps = HashMultimap.create();
- pipeline.traverseTopologically(new AggregatorVisitor(aggregatorSteps));
- return aggregatorSteps.asMap();
- }
-
- private static class AggregatorVisitor extends PipelineVisitor.Defaults {
- private final SetMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps;
-
- public AggregatorVisitor(SetMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps) {
- this.aggregatorSteps = aggregatorSteps;
- }
-
- @Override
- public void visitPrimitiveTransform(TransformHierarchy.Node node) {
- PTransform<?, ?> transform = node.getTransform();
- addStepToAggregators(transform, getAggregators(transform));
- }
-
- private Collection<Aggregator<?, ?>> getAggregators(PTransform<?, ?> transform) {
- return Collections.emptyList();
- }
-
- private void addStepToAggregators(
- PTransform<?, ?> transform, Collection<Aggregator<?, ?>> aggregators) {
- for (Aggregator<?, ?> aggregator : aggregators) {
- aggregatorSteps.put(aggregator, transform);
- }
- }
-
- @Override
- public void visitValue(PValue value, TransformHierarchy.Node producer) {}
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java
deleted file mode 100644
index 3040815..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-
-/**
- * Signals that an exception has occurred while retrieving {@link Aggregator}s.
- */
-public class AggregatorRetrievalException extends Exception {
- /**
- * Constructs a new {@code AggregatorRetrievalException} with the specified detail message and
- * cause.
- */
- public AggregatorRetrievalException(String message, Throwable cause) {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java
deleted file mode 100644
index 1fd034a..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk;
-
-import java.util.Collection;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-
-/**
- * A collection of values associated with an {@link Aggregator}. Aggregators declared in a
- * {@link DoFn} are emitted on a per-{@link DoFn}-application basis.
- *
- * @param <T> the output type of the aggregator
- */
-public abstract class AggregatorValues<T> {
- /**
- * Get the values of the {@link Aggregator} at all steps it was used.
- */
- public Collection<T> getValues() {
- return getValuesAtSteps().values();
- }
-
- /**
- * Get the values of the {@link Aggregator} by the user name at each step it was used.
- */
- public abstract Map<String, T> getValuesAtSteps();
-
- /**
- * Get the total value of this {@link Aggregator} by applying the specified {@link CombineFn}.
- */
- public T getTotalValue(CombineFn<T, ?, T> combineFn) {
- return combineFn.apply(getValues());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index ab8906a..351e1b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -24,7 +24,6 @@ import com.google.common.base.Joiner;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -40,7 +39,6 @@ import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
@@ -571,14 +569,6 @@ public class Pipeline {
}
/**
- * Returns a {@link Map} from each {@link Aggregator} in the {@link Pipeline} to the {@link
- * PTransform PTransforms} in which it is used.
- */
- public Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> getAggregatorSteps() {
- return new AggregatorPipelineExtractor(this).getAggregatorSteps();
- }
-
- /**
* Builds a name from a "/"-delimited prefix and a name.
*/
private String buildName(String namePrefix, String name) {
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index f720599..7255a01 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -63,9 +63,6 @@ public @interface Experimental {
/** Trigger-related experimental APIs. */
TRIGGER,
- /** Aggregator-related experimental APIs. */
- AGGREGATOR,
-
/** Experimental APIs for Coder binary format identifiers. */
CODER_ENCODING_ID,
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
index c957100..6c21b8c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
@@ -26,21 +26,9 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
* @param <InputT> the type of input values
* @param <OutputT> the type of output values
*/
+@Deprecated
public interface Aggregator<InputT, OutputT> {
-
- /**
- * Adds a new value into the Aggregator.
- */
void addValue(InputT value);
-
- /**
- * Returns the name of the Aggregator.
- */
String getName();
-
- /**
- * Returns the {@link CombineFn}, which combines input elements in the
- * aggregator.
- */
CombineFn<InputT, ?, OutputT> getCombineFn();
}
[3/3] beam git commit: This closes #2838
Posted by dh...@apache.org.
This closes #2838
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4682238d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4682238d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4682238d
Branch: refs/heads/master
Commit: 4682238dc596a6d3d04b408049d7070731043cf2
Parents: 5bfd3e0 615761a
Author: Dan Halperin <dh...@google.com>
Authored: Tue May 2 16:38:30 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 2 16:38:30 2017 -0700
----------------------------------------------------------------------
.../cookbook/CombinePerKeyExamples.java | 4 -
.../beam/runners/core/AggregatorFactory.java | 38 ----
.../apache/beam/runners/core/DoFnRunners.java | 27 ++-
.../apache/beam/runners/core/LateDataUtils.java | 6 +-
.../beam/runners/core/SimpleDoFnRunner.java | 1 -
.../beam/runners/core/SimpleOldDoFnRunner.java | 3 -
.../core/LateDataDroppingDoFnRunnerTest.java | 27 ---
.../beam/runners/core/SimpleDoFnRunnerTest.java | 9 -
.../runners/core/SimpleOldDoFnRunnerTest.java | 2 +-
.../runners/core/StatefulDoFnRunnerTest.java | 1 -
.../runners/direct/AggregatorContainer.java | 200 -------------------
.../beam/runners/direct/EvaluationContext.java | 23 +--
.../GroupAlsoByWindowEvaluatorFactory.java | 23 +--
.../beam/runners/direct/ParDoEvaluator.java | 14 +-
...littableProcessElementsEvaluatorFactory.java | 2 -
.../direct/StatefulParDoEvaluatorFactory.java | 1 -
.../runners/direct/StepTransformResult.java | 8 -
.../beam/runners/direct/TransformResult.java | 6 -
.../beam/runners/direct/ParDoEvaluatorTest.java | 5 -
.../beam/runners/spark/SparkPipelineResult.java | 5 -
.../spark/aggregators/SparkAggregators.java | 110 ----------
.../SparkGroupAlsoByWindowViaWindowSet.java | 57 ++----
.../spark/translation/MultiDoFnFunction.java | 2 -
...SparkGroupAlsoByWindowViaOutputBufferFn.java | 10 -
.../spark/translation/SparkRuntimeContext.java | 81 --------
.../spark/aggregators/ClearAggregatorsRule.java | 38 ----
.../metrics/sink/NamedAggregatorsTest.java | 101 ----------
.../beam/sdk/AggregatorPipelineExtractor.java | 84 --------
.../beam/sdk/AggregatorRetrievalException.java | 33 ---
.../org/apache/beam/sdk/AggregatorValues.java | 51 -----
.../main/java/org/apache/beam/sdk/Pipeline.java | 10 -
.../beam/sdk/annotations/Experimental.java | 3 -
.../apache/beam/sdk/transforms/Aggregator.java | 14 +-
.../sdk/transforms/DelegatingAggregator.java | 126 ------------
.../org/apache/beam/sdk/transforms/Latest.java | 2 -
.../harness/control/ProcessBundleHandler.java | 2 -
.../fn/harness/fake/FakeAggregatorFactory.java | 52 -----
37 files changed, 52 insertions(+), 1129 deletions(-)
----------------------------------------------------------------------