You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2017/04/25 21:12:16 UTC

[3/4] beam git commit: Removing Aggregators from PipelineResults and subclasses

Removing Aggregators from PipelineResults and subclasses


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6f26db8c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6f26db8c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6f26db8c

Branch: refs/heads/master
Commit: 6f26db8c479c9543003fd3bb8406117e25c4fed0
Parents: 1fe11d1
Author: Pablo <pa...@google.com>
Authored: Tue Mar 7 13:06:15 2017 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 25 12:45:33 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/apex/ApexRunnerResult.java     |  9 -----
 .../beam/runners/direct/DirectRunner.java       | 34 ------------------
 .../beam/runners/direct/DirectRunnerTest.java   | 36 --------------------
 .../flink/FlinkDetachedRunnerResult.java        | 11 ------
 .../beam/runners/flink/FlinkRunnerResult.java   | 21 ------------
 .../runners/dataflow/DataflowPipelineJob.java   | 27 ---------------
 .../beam/runners/spark/SparkPipelineResult.java |  7 ----
 .../beam/runners/spark/examples/WordCount.java  |  2 +-
 .../org/apache/beam/sdk/PipelineResult.java     | 12 -------
 9 files changed, 1 insertion(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
index 8548194..41fdb75 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -23,12 +23,9 @@ import java.io.IOException;
 
 import org.apache.apex.api.Launcher.AppHandle;
 import org.apache.apex.api.Launcher.ShutdownMode;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.joda.time.Duration;
 
 /**
@@ -50,12 +47,6 @@ public class ApexRunnerResult implements PipelineResult {
   }
 
   @Override
-  public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
-      throws AggregatorRetrievalException {
-    return null;
-  }
-
-  @Override
   public State cancel() throws IOException {
     apexApp.shutdown(ShutdownMode.KILL);
     state = State.CANCELLED;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 77ec68f..db2d252 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import com.google.common.base.MoreObjects;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -45,7 +44,6 @@ import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -375,38 +373,6 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
     }
 
     @Override
-    public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
-        throws AggregatorRetrievalException {
-      AggregatorContainer aggregators = evaluationContext.getAggregatorContainer();
-      Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator);
-      final Map<String, T> stepValues = new HashMap<>();
-      if (steps != null) {
-        for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
-          if (steps.contains(transform.getTransform())) {
-            T aggregate = aggregators
-                .getAggregate(evaluationContext.getStepName(transform), aggregator.getName());
-            if (aggregate != null) {
-              stepValues.put(transform.getFullName(), aggregate);
-            }
-          }
-        }
-      }
-      return new AggregatorValues<T>() {
-        @Override
-        public Map<String, T> getValuesAtSteps() {
-          return stepValues;
-        }
-
-        @Override
-        public String toString() {
-          return MoreObjects.toStringHelper(this)
-              .add("stepValues", stepValues)
-              .toString();
-        }
-      };
-    }
-
-    @Override
     public MetricResults metrics() {
       return evaluationContext.getMetrics();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index c55f84a..51ae12a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -40,7 +40,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
-import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
@@ -48,7 +47,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -60,7 +58,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -69,13 +66,9 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.IllegalMutationException;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -530,35 +523,6 @@ public class DirectRunnerTest implements Serializable {
     p.run();
   }
 
-  @Test
-  public void testAggregatorNotPresentInGraph() throws AggregatorRetrievalException {
-    Pipeline p = getPipeline();
-    IdentityDoFn identityDoFn = new IdentityDoFn();
-    p.apply(Create.of(KV.of("key", "element1"), KV.of("key", "element2"), KV.of("key", "element3")))
-        .apply(ParDo.of(identityDoFn));
-    PipelineResult pipelineResult = p.run();
-    pipelineResult.getAggregatorValues(identityDoFn.getCounter()).getValues();
-  }
-
-  private static class IdentityDoFn extends DoFn<KV<String, String>, String> {
-    private final Aggregator<Long, Long> counter = createAggregator("counter", Sum.ofLongs());
-    private static final String STATE_ID = "state";
-    @StateId(STATE_ID)
-    private static final StateSpec<Object, ValueState<String>> stateSpec =
-        StateSpecs.value(StringUtf8Coder.of());
-
-    @ProcessElement
-    public void processElement(ProcessContext context, @StateId(STATE_ID) ValueState<String> state){
-      state.write("state content");
-      counter.addValue(1L);
-      context.output(context.element().getValue());
-    }
-
-    public Aggregator<Long, Long> getCounter() {
-      return counter;
-    }
-  }
-
   private static class LongNoDecodeCoder extends CustomCoder<Long> {
     @Override
     public void encode(

http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
index bf4395f..b4d4b08 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
@@ -19,11 +19,8 @@ package org.apache.beam.runners.flink;
 
 import java.io.IOException;
 
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.joda.time.Duration;
 
 
@@ -41,14 +38,6 @@ public class FlinkDetachedRunnerResult implements PipelineResult {
   }
 
   @Override
-  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator)
-      throws AggregatorRetrievalException {
-    throw new AggregatorRetrievalException(
-        "Accumulators can't be retrieved for detached Job executions.",
-        new UnsupportedOperationException());
-  }
-
-  @Override
   public MetricResults metrics() {
     throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index 0f2462d..dfc1d8e 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -21,11 +21,8 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.runners.flink.metrics.FlinkMetricResults;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.joda.time.Duration;
 
 /**
@@ -52,24 +49,6 @@ public class FlinkRunnerResult implements PipelineResult {
   }
 
   @Override
-  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator)
-      throws AggregatorRetrievalException {
-    // TODO provide a list of all accumulator step values
-    Object value = aggregators.get(aggregator.getName());
-    if (value != null) {
-      return new AggregatorValues<T>() {
-        @Override
-        public Map<String, T> getValuesAtSteps() {
-          return (Map<String, T>) aggregators;
-        }
-      };
-    } else {
-      throw new AggregatorRetrievalException("Accumulator results not found.",
-          new RuntimeException("Accumulator does not exist."));
-    }
-  }
-
-  @Override
   public String toString() {
     return "FlinkRunnerResult{"
         + "aggregators=" + aggregators

http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 7cb0f0e..0399ada 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -29,7 +29,6 @@ import com.google.api.services.dataflow.model.JobMessage;
 import com.google.api.services.dataflow.model.JobMetrics;
 import com.google.api.services.dataflow.model.MetricUpdate;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.util.List;
@@ -41,8 +40,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -488,30 +485,6 @@ public class DataflowPipelineJob implements PipelineResult {
   }
 
   @Override
-  public <OutputT> AggregatorValues<OutputT> getAggregatorValues(Aggregator<?, OutputT> aggregator)
-      throws AggregatorRetrievalException {
-    try {
-      final Map<String, OutputT> stepValues = fromMetricUpdates(aggregator);
-      return new AggregatorValues<OutputT>() {
-        @Override
-        public Map<String, OutputT> getValuesAtSteps() {
-          return stepValues;
-        }
-
-        @Override
-        public String toString() {
-          return MoreObjects.toStringHelper(this)
-              .add("stepValues", stepValues)
-              .toString();
-        }
-      };
-    } catch (IOException e) {
-      throw new AggregatorRetrievalException(
-          "IOException when retrieving Aggregator values for Aggregator " + aggregator, e);
-    }
-  }
-
-  @Override
   public MetricResults metrics() {
     return dataflowMetrics;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
index d2c5c8e..1110a55 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
@@ -27,11 +27,9 @@ import java.util.concurrent.TimeoutException;
 import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.metrics.SparkMetricResults;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
-import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -84,11 +82,6 @@ public abstract class SparkPipelineResult implements PipelineResult {
   }
 
   @Override
-  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) {
-    return SparkAggregators.valueOf(aggregator);
-  }
-
-  @Override
   public PipelineResult.State getState() {
     return state;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 32caa9a..de5ae48 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -21,10 +21,10 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f26db8c/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
index 35f11eb..7e78e6e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.joda.time.Duration;
 
 /**
@@ -64,17 +63,6 @@ public interface PipelineResult {
    */
   State waitUntilFinish();
 
-  /**
-   * Retrieves the current value of the provided {@link Aggregator}.
-   *
-   * @param aggregator the {@link Aggregator} to retrieve values for.
-   * @return the current values of the {@link Aggregator},
-   * which may be empty if there are no values yet.
-   * @throws AggregatorRetrievalException if the {@link Aggregator} values could not be retrieved.
-   */
-  <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
-      throws AggregatorRetrievalException;
-
   // TODO: method to retrieve error messages.
 
   /** Named constants for common values for the job state. */