You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/04 03:52:48 UTC
[1/2] beam git commit: Removal of Aggregator class. Also removal from
comments.
Repository: beam
Updated Branches:
refs/heads/master f3f881084 -> 34d25f406
Removal of Aggregator class. Also removal from comments.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/878981fd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/878981fd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/878981fd
Branch: refs/heads/master
Commit: 878981fd527098ccd083f999dd3d84c4934306b2
Parents: f3f8810
Author: Pablo <pa...@google.com>
Authored: Tue May 2 18:06:56 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed May 3 20:51:43 2017 -0700
----------------------------------------------------------------------
.../apache/beam/examples/WindowedWordCount.java | 2 +-
.../operators/ApexParDoOperator.java | 1 -
.../apache/beam/runners/core/DoFnRunners.java | 21 ------------
.../functions/FlinkDoFnFunction.java | 1 -
.../functions/FlinkStatefulDoFnFunction.java | 1 -
.../wrappers/streaming/DoFnOperator.java | 1 -
.../apache/beam/sdk/transforms/Aggregator.java | 34 --------------------
.../org/apache/beam/sdk/transforms/Latest.java | 15 +--------
.../beam/sdk/util/SystemDoFnInternal.java | 3 --
.../sdk/transforms/ApproximateUniqueTest.java | 2 +-
.../beam/sdk/transforms/DoFnTesterTest.java | 2 +-
.../beam/sdk/transforms/LatestFnTest.java | 2 +-
12 files changed, 5 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 5c64c53..45746af 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -51,7 +51,7 @@ import org.joda.time.Instant;
*
* <p>Basic concepts, also in the MinimalWordCount, WordCount, and DebuggingWordCount examples:
* Reading text files; counting a PCollection; writing to GCS; executing a Pipeline both locally
- * and using a selected runner; defining DoFns; creating a custom aggregator;
+ * and using a selected runner; defining DoFns;
* user-defined PTransforms; defining PipelineOptions.
*
* <p>New Concepts:
http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index d5dd0dd..f7242e7 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -345,7 +345,6 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
mainOutputTag,
additionalOutputTags,
stepContext,
- null,
windowingStrategy
);
http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index fe33af7..c090001 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -48,27 +48,6 @@ public class DoFnRunners {
<T> void output(TupleTag<T> tag, WindowedValue<T> output);
}
- @Deprecated
- public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
- PipelineOptions options,
- DoFn<InputT, OutputT> fn,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> additionalOutputTags,
- StepContext stepContext,
- Object aggregatorFactory,
- WindowingStrategy<?, ?> windowingStrategy) {
- return simpleRunner(options,
- fn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- additionalOutputTags,
- stepContext,
- windowingStrategy);
- }
-
/**
* Returns an implementation of {@link DoFnRunner} that for a {@link DoFn}.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index d28e7c4..28e1a44 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -105,7 +105,6 @@ public class FlinkDoFnFunction<InputT, OutputT>
// see SimpleDoFnRunner, just use it to limit number of additional outputs
Collections.<TupleTag<?>>emptyList(),
new FlinkNoOpStepContext(),
- null,
windowingStrategy);
if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index a79f856..9f000e0 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -131,7 +131,6 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
return timerInternals;
}
},
- null,
windowingStrategy);
if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 243342d..c624036 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -260,7 +260,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
mainOutputTag,
additionalOutputTags,
stepContext,
- null,
windowingStrategy);
if (doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) {
http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
deleted file mode 100644
index 6c21b8c..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * An {@code Aggregator<InputT>} enables monitoring of values of type {@code InputT},
- * to be combined across all bundles.
- *
- * @param <InputT> the type of input values
- * @param <OutputT> the type of output values
- */
-@Deprecated
-public interface Aggregator<InputT, OutputT> {
- void addValue(InputT value);
- String getName();
- CombineFn<InputT, ?, OutputT> getCombineFn();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
index d3ebbb7..f7028ec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
@@ -36,7 +36,7 @@ import org.apache.beam.sdk.values.TimestampedValue;
* {@link PTransform} and {@link Combine.CombineFn} for computing the latest element
* in a {@link PCollection}.
*
- * <p>Example 1: compute the latest value for each session:
+ * <p>Example: compute the latest value for each session:
* <pre>{@code
* PCollection<Long> input = ...;
* PCollection<Long> sessioned = input
@@ -44,19 +44,6 @@ import org.apache.beam.sdk.values.TimestampedValue;
* PCollection<Long> latestValues = sessioned.apply(Latest.<Long>globally());
* }</pre>
*
- * <p>Example 2: track a latest computed value in an aggregator:
- * <pre>{@code
- * class MyDoFn extends DoFn<String, String> {
- *
- * {@literal @}ProcessElement
- * public void processElement(ProcessContext c) {
- * double val = // ..
- * latestValue.addValue(TimestampedValue.of(val, c.timestamp()));
- * // ..
- * }
- * }
- * }</pre>
- *
* <p>{@link #combineFn} can also be used manually, in combination with state and with the
* {@link Combine} transform.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java
index 004496b..368cb9a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java
@@ -26,9 +26,6 @@ import java.lang.annotation.Target;
/**
* Annotation to mark {@code DoFns} as an internal component of the Beam SDK.
*
- * <p>Currently, the only effect of this is to mark any aggregators reported by an annotated
- * {@code DoFn} as a system counter (as opposed to a user counter).
- *
* <p>This is internal to the Beam SDK.
*/
@Documented
http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
index 51880e1..e1c5f08 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
@@ -49,7 +49,7 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Suite;
/**
- * Tests for the ApproximateUnique aggregator transform.
+ * Tests for the ApproximateUnique transform.
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({
http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 5c5718c..d609d0e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -374,7 +374,7 @@ public class DoFnTesterTest {
}
/**
- * A {@link DoFn} that adds values to an aggregator and converts input to String in
+ * A {@link DoFn} that adds values to a user metric and converts input to String in
* {@link DoFn.ProcessElement @ProcessElement}.
*/
private static class CounterDoFn extends DoFn<Long, String> {
http://git-wip-us.apache.org/repos/asf/beam/blob/878981fd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
index b782b6e..f1f2e44 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
@@ -166,7 +166,7 @@ public class LatestFnTest {
}
@Test
- public void testExtractOutputDefaultAggregator() {
+ public void testExtractOutputDefaultAccumulator() {
TimestampedValue<Long> accum = fn.createAccumulator();
assertThat(fn.extractOutput(accum), nullValue());
}
[2/2] beam git commit: This closes #2851
Posted by dh...@apache.org.
This closes #2851
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/34d25f40
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/34d25f40
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/34d25f40
Branch: refs/heads/master
Commit: 34d25f4069631e43b19a86a06b4a491e524fc1d5
Parents: f3f8810 878981f
Author: Dan Halperin <dh...@google.com>
Authored: Wed May 3 20:52:41 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed May 3 20:52:41 2017 -0700
----------------------------------------------------------------------
.../apache/beam/examples/WindowedWordCount.java | 2 +-
.../operators/ApexParDoOperator.java | 1 -
.../apache/beam/runners/core/DoFnRunners.java | 21 ------------
.../functions/FlinkDoFnFunction.java | 1 -
.../functions/FlinkStatefulDoFnFunction.java | 1 -
.../wrappers/streaming/DoFnOperator.java | 1 -
.../apache/beam/sdk/transforms/Aggregator.java | 34 --------------------
.../org/apache/beam/sdk/transforms/Latest.java | 15 +--------
.../beam/sdk/util/SystemDoFnInternal.java | 3 --
.../sdk/transforms/ApproximateUniqueTest.java | 2 +-
.../beam/sdk/transforms/DoFnTesterTest.java | 2 +-
.../beam/sdk/transforms/LatestFnTest.java | 2 +-
12 files changed, 5 insertions(+), 80 deletions(-)
----------------------------------------------------------------------