You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:44 UTC

[19/50] [abbrv] beam git commit: Update gearpump-runner against master changes

Update gearpump-runner against master changes


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/12b9719e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/12b9719e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/12b9719e

Branch: refs/heads/master
Commit: 12b9719e992d6fbac57efb4dc8ce7eff5e977862
Parents: 9a59ea3
Author: manuzhang <ow...@gmail.com>
Authored: Thu May 4 12:14:00 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Thu May 4 12:14:00 2017 +0800

----------------------------------------------------------------------
 .../gearpump/GearpumpPipelineResult.java        | 11 ----
 .../translators/GroupByKeyTranslator.java       | 28 ++++-----
 .../translators/functions/DoFnFunction.java     |  2 -
 .../translators/utils/DoFnRunnerFactory.java    |  6 +-
 .../utils/NoOpAggregatorFactory.java            | 63 --------------------
 .../translators/utils/NoOpStepContext.java      |  2 +-
 .../translators/GroupByKeyTranslatorTest.java   | 24 ++++----
 7 files changed, 27 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index d833cd6..dd7fa23 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -20,12 +20,9 @@ package org.apache.beam.runners.gearpump;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
 
 import org.apache.gearpump.cluster.ApplicationStatus;
 import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData;
@@ -85,14 +82,6 @@ public class GearpumpPipelineResult implements PipelineResult {
   }
 
   @Override
-  public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
-      throws AggregatorRetrievalException {
-    throw new AggregatorRetrievalException(
-        "PipelineResult getAggregatorValues not supported in Gearpump pipeline",
-        new UnsupportedOperationException());
-  }
-
-  @Override
   public MetricResults metrics() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 54c8737..521f665 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -66,8 +66,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     JavaStream<WindowedValue<KV<K, V>>> inputStream =
         context.getInputStream(input);
     int parallelism = context.getPipelineOptions().getParallelism();
-    OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super BoundedWindow>)
-        input.getWindowingStrategy().getOutputTimeFn();
+    TimestampCombiner timestampCombiner = input.getWindowingStrategy().getTimestampCombiner();
     WindowFn<KV<K, V>, BoundedWindow> windowFn = (WindowFn<KV<K, V>, BoundedWindow>)
         input.getWindowingStrategy().getWindowFn();
     JavaStream<WindowedValue<KV<K, List<V>>>> outputStream = inputStream
@@ -75,9 +74,8 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
             new GearpumpWindowFn(windowFn.isNonMerging()),
             EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
         .groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window")
-        .map(new KeyedByTimestamp<K, V>((OutputTimeFn<? super BoundedWindow>)
-            input.getWindowingStrategy().getOutputTimeFn()), "keyed_by_timestamp")
-        .fold(new Merge<>(windowFn, outputTimeFn), "merge")
+        .map(new KeyedByTimestamp<K, V>(timestampCombiner), "keyed_by_timestamp")
+        .fold(new Merge<>(windowFn, timestampCombiner), "merge")
         .map(new Values<K, V>(), "values");
 
     context.setOutputStream(context.getOutput(), outputStream);
@@ -148,17 +146,17 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
       extends MapFunction<WindowedValue<KV<K, V>>,
       KV<Instant, WindowedValue<KV<K, V>>>> {
 
-    private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
+    private final TimestampCombiner timestampCombiner;
 
-    public KeyedByTimestamp(OutputTimeFn<? super BoundedWindow> outputTimeFn) {
-      this.outputTimeFn = outputTimeFn;
+    public KeyedByTimestamp(TimestampCombiner timestampCombiner) {
+      this.timestampCombiner = timestampCombiner;
     }
 
     @Override
     public KV<org.joda.time.Instant, WindowedValue<KV<K, V>>> map(
         WindowedValue<KV<K, V>> wv) {
-      Instant timestamp = outputTimeFn.assignOutputTime(wv.getTimestamp(),
-          Iterables.getOnlyElement(wv.getWindows()));
+      Instant timestamp = timestampCombiner.assign(
+          Iterables.getOnlyElement(wv.getWindows()), wv.getTimestamp());
       return KV.of(timestamp, wv);
     }
   }
@@ -171,12 +169,12 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
       KV<Instant, WindowedValue<KV<K, List<V>>>>> {
 
     private final WindowFn<KV<K, V>, BoundedWindow> windowFn;
-    private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
+    private final TimestampCombiner timestampCombiner;
 
     Merge(WindowFn<KV<K, V>, BoundedWindow> windowFn,
-        OutputTimeFn<? super BoundedWindow> outputTimeFn) {
+        TimestampCombiner timestampCombiner) {
       this.windowFn = windowFn;
-      this.outputTimeFn = outputTimeFn;
+      this.timestampCombiner = timestampCombiner;
     }
 
     @Override
@@ -229,7 +227,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
         mergedWindows.addAll(wv1.getWindows());
       }
 
-      Instant timestamp = outputTimeFn.combine(t1, t2);
+      Instant timestamp = timestampCombiner.combine(t1, t2);
       return KV.of(timestamp,
           WindowedValue.of(wv1.getValue(), timestamp,
               mergedWindows, wv1.getPane()));

http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index 3473f53..dfd6296 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -38,7 +38,6 @@ import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory;
-import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
 import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
 import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils.RawUnionValue;
@@ -92,7 +91,6 @@ public class DoFnFunction<InputT, OutputT> extends
         mainOutput,
         sideOutputs,
         new NoOpStepContext(),
-        new NoOpAggregatorFactory(),
         windowingStrategy
     );
     this.sideInputs = sideInputs;

http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
index 70b4271..8d55d6f 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.ExecutionContext;
@@ -50,7 +49,6 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
   private final TupleTag<OutputT> mainOutputTag;
   private final List<TupleTag<?>> sideOutputTags;
   private final ExecutionContext.StepContext stepContext;
-  private final AggregatorFactory aggregatorFactory;
   private final WindowingStrategy<?, ?> windowingStrategy;
 
   public DoFnRunnerFactory(
@@ -61,7 +59,6 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
       ExecutionContext.StepContext stepContext,
-      AggregatorFactory aggregatorFactory,
       WindowingStrategy<?, ?> windowingStrategy) {
     this.fn = doFn;
     this.options = pipelineOptions;
@@ -70,7 +67,6 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
     this.mainOutputTag = mainOutputTag;
     this.sideOutputTags = sideOutputTags;
     this.stepContext = stepContext;
-    this.aggregatorFactory = aggregatorFactory;
     this.windowingStrategy = windowingStrategy;
   }
 
@@ -78,7 +74,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
       ReadyCheckingSideInputReader sideInputReader) {
     DoFnRunner<InputT, OutputT> underlying = DoFnRunners.simpleRunner(
         options, fn, sideInputReader, outputManager, mainOutputTag,
-        sideOutputTags, stepContext, aggregatorFactory, windowingStrategy);
+        sideOutputTags, stepContext, windowingStrategy);
     return SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
deleted file mode 100644
index 3436930..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators.utils;
-
-import java.io.Serializable;
-
-import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-
-/**
- * no-op aggregator factory.
- */
-public class NoOpAggregatorFactory implements AggregatorFactory, Serializable {
-
-  @Override
-  public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
-      Class<?> fnClass,
-      ExecutionContext.StepContext stepContext,
-      String aggregatorName,
-      Combine.CombineFn<InputT, AccumT, OutputT> combine) {
-    return new NoOpAggregator<>();
-  }
-
-  private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>,
-      java.io.Serializable {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public void addValue(InputT value) {
-    }
-
-    @Override
-    public String getName() {
-      // TODO Auto-generated method stub
-      return null;
-    }
-
-    @Override
-    public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() {
-      // TODO Auto-generated method stub
-      return null;
-    }
-
-  };
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
index 4e0a74c..64fd615 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
@@ -61,7 +61,7 @@ public class NoOpStepContext implements ExecutionContext.StepContext, Serializab
   }
 
   @Override
-  public StateInternals<?> stateInternals() {
+  public StateInternals stateInternals() {
     throw new UnsupportedOperationException();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
index 4e66ba9..86b60aa 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
@@ -33,10 +33,9 @@ import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator.Gearpum
 import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -83,15 +82,15 @@ public class GroupByKeyTranslatorTest {
   }
 
   @Parameterized.Parameters(name = "{index}: {0}")
-  public static Iterable<OutputTimeFn<BoundedWindow>> data() {
+  public static Iterable<TimestampCombiner> data() {
     return ImmutableList.of(
-        OutputTimeFns.outputAtEarliestInputTimestamp(),
-        OutputTimeFns.outputAtLatestInputTimestamp(),
-        OutputTimeFns.outputAtEndOfWindow());
+        TimestampCombiner.EARLIEST,
+        TimestampCombiner.LATEST,
+        TimestampCombiner.END_OF_WINDOW);
   }
 
   @Parameterized.Parameter(0)
-  public OutputTimeFn<? super BoundedWindow> outputTimeFn;
+  public TimestampCombiner timestampCombiner;
 
   @Test
   @SuppressWarnings({"rawtypes", "unchecked"})
@@ -99,15 +98,15 @@ public class GroupByKeyTranslatorTest {
     BoundedWindow window =
         new IntervalWindow(new org.joda.time.Instant(0), new org.joda.time.Instant(10));
     GroupByKeyTranslator.KeyedByTimestamp keyedByTimestamp =
-        new GroupByKeyTranslator.KeyedByTimestamp(outputTimeFn);
+        new GroupByKeyTranslator.KeyedByTimestamp(timestampCombiner);
     WindowedValue<KV<String, String>> value =
         WindowedValue.of(
             KV.of("key", "val"), org.joda.time.Instant.now(), window, PaneInfo.NO_FIRING);
     KV<org.joda.time.Instant, WindowedValue<KV<String, String>>> result =
         keyedByTimestamp.map(value);
     org.joda.time.Instant time =
-        outputTimeFn.assignOutputTime(
-            value.getTimestamp(), Iterables.getOnlyElement(value.getWindows()));
+        timestampCombiner.assign(Iterables.getOnlyElement(value.getWindows()),
+            value.getTimestamp());
     assertThat(result, equalTo(KV.of(time, value)));
   }
 
@@ -115,7 +114,8 @@ public class GroupByKeyTranslatorTest {
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void testMerge() {
     WindowFn slidingWindows = Sessions.withGapDuration(Duration.millis(10));
-    GroupByKeyTranslator.Merge merge = new GroupByKeyTranslator.Merge(slidingWindows, outputTimeFn);
+    GroupByKeyTranslator.Merge merge = new GroupByKeyTranslator.Merge(slidingWindows,
+        timestampCombiner);
     org.joda.time.Instant key1 = new org.joda.time.Instant(5);
     WindowedValue<KV<String, String>> value1 =
         WindowedValue.of(
@@ -140,7 +140,7 @@ public class GroupByKeyTranslatorTest {
 
     KV<org.joda.time.Instant, WindowedValue<KV<String, List<String>>>> result2 =
         merge.fold(result1, KV.of(key2, value2));
-    assertThat(result2.getKey(), equalTo(outputTimeFn.combine(key1, key2)));
+    assertThat(result2.getKey(), equalTo(timestampCombiner.combine(key1, key2)));
     Collection<? extends BoundedWindow> resultWindows = result2.getValue().getWindows();
     assertThat(resultWindows.size(), equalTo(1));
     IntervalWindow expectedWindow =