You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/25 18:55:25 UTC

[11/50] incubator-beam git commit: Inline MapAggregatorValues to remove dependencies

Inline MapAggregatorValues to remove dependencies

This class is trivial. Adding it to the public API of the SDK is
not desirable, since it is just for runners. Adding it to runners-core
would be OK but is really overkill for a glorified Map.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9da4bbcd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9da4bbcd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9da4bbcd

Branch: refs/heads/gearpump-runner
Commit: 9da4bbcdaf3c19ee5f78836b7cffaab947861a58
Parents: c867790
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 21 20:24:17 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 13:55:24 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       | 18 +++++--
 .../runners/dataflow/DataflowPipelineJob.java   | 17 ++++++-
 .../beam/sdk/util/MapAggregatorValues.java      | 50 --------------------
 3 files changed, 30 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9da4bbcd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 743c565..a9c8ecb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.util.MapAggregatorValues;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -47,6 +46,7 @@ import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 
+import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -321,7 +321,7 @@ public class DirectRunner
         throws AggregatorRetrievalException {
       AggregatorContainer aggregators = evaluationContext.getAggregatorContainer();
       Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator);
-      Map<String, T> stepValues = new HashMap<>();
+      final Map<String, T> stepValues = new HashMap<>();
       for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
         if (steps.contains(transform.getTransform())) {
           T aggregate = aggregators.getAggregate(
@@ -331,7 +331,19 @@ public class DirectRunner
           }
         }
       }
-      return new MapAggregatorValues<>(stepValues);
+      return new AggregatorValues<T>() {
+        @Override
+        public Map<String, T> getValuesAtSteps() {
+          return stepValues;
+        }
+
+        @Override
+        public String toString() {
+          return MoreObjects.toStringHelper(this)
+              .add("stepValues", stepValues)
+              .toString();
+        }
+      };
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9da4bbcd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 3194f7c..a6baa4f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -29,7 +29,6 @@ import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.util.AttemptAndTimeBoundedExponentialBackOff;
 import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.MapAggregatorValues;
 
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.client.util.BackOff;
@@ -41,6 +40,7 @@ import com.google.api.services.dataflow.model.JobMessage;
 import com.google.api.services.dataflow.model.JobMetrics;
 import com.google.api.services.dataflow.model.MetricUpdate;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
 
 import org.joda.time.Duration;
 import org.slf4j.Logger;
@@ -369,7 +369,20 @@ public class DataflowPipelineJob implements PipelineResult {
   public <OutputT> AggregatorValues<OutputT> getAggregatorValues(Aggregator<?, OutputT> aggregator)
       throws AggregatorRetrievalException {
     try {
-      return new MapAggregatorValues<>(fromMetricUpdates(aggregator));
+      final Map<String, OutputT> stepValues = fromMetricUpdates(aggregator);
+      return new AggregatorValues<OutputT>() {
+        @Override
+        public Map<String, OutputT> getValuesAtSteps() {
+          return stepValues;
+        }
+
+        @Override
+        public String toString() {
+          return MoreObjects.toStringHelper(this)
+              .add("stepValues", stepValues)
+              .toString();
+        }
+      };
     } catch (IOException e) {
       throw new AggregatorRetrievalException(
           "IOException when retrieving Aggregator values for Aggregator " + aggregator, e);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9da4bbcd/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java
deleted file mode 100644
index 3d949ec..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.runners.AggregatorValues;
-import org.apache.beam.sdk.transforms.Aggregator;
-
-import com.google.common.base.MoreObjects;
-
-import java.util.Map;
-
-/**
- * An {@link AggregatorValues} implementation that is backed by an in-memory map.
- *
- * @param <T> the output type of the {@link Aggregator}
- */
-public class MapAggregatorValues<T> extends AggregatorValues<T> {
-  private final Map<String, T> stepValues;
-
-  public MapAggregatorValues(Map<String, T> stepValues) {
-    this.stepValues = stepValues;
-  }
-
-  @Override
-  public Map<String, T> getValuesAtSteps() {
-    return stepValues;
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(this)
-        .add("stepValues", stepValues)
-        .toString();
-  }
-}