You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:44 UTC
[19/50] [abbrv] beam git commit: Update gearpump-runner against
master changes
Update gearpump-runner against master changes
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/12b9719e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/12b9719e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/12b9719e
Branch: refs/heads/master
Commit: 12b9719e992d6fbac57efb4dc8ce7eff5e977862
Parents: 9a59ea3
Author: manuzhang <ow...@gmail.com>
Authored: Thu May 4 12:14:00 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Thu May 4 12:14:00 2017 +0800
----------------------------------------------------------------------
.../gearpump/GearpumpPipelineResult.java | 11 ----
.../translators/GroupByKeyTranslator.java | 28 ++++-----
.../translators/functions/DoFnFunction.java | 2 -
.../translators/utils/DoFnRunnerFactory.java | 6 +-
.../utils/NoOpAggregatorFactory.java | 63 --------------------
.../translators/utils/NoOpStepContext.java | 2 +-
.../translators/GroupByKeyTranslatorTest.java | 24 ++++----
7 files changed, 27 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index d833cd6..dd7fa23 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -20,12 +20,9 @@ package org.apache.beam.runners.gearpump;
import java.io.IOException;
import java.util.List;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.gearpump.cluster.ApplicationStatus;
import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData;
@@ -85,14 +82,6 @@ public class GearpumpPipelineResult implements PipelineResult {
}
@Override
- public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
- throws AggregatorRetrievalException {
- throw new AggregatorRetrievalException(
- "PipelineResult getAggregatorValues not supported in Gearpump pipeline",
- new UnsupportedOperationException());
- }
-
- @Override
public MetricResults metrics() {
return null;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 54c8737..521f665 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
@@ -66,8 +66,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
JavaStream<WindowedValue<KV<K, V>>> inputStream =
context.getInputStream(input);
int parallelism = context.getPipelineOptions().getParallelism();
- OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super BoundedWindow>)
- input.getWindowingStrategy().getOutputTimeFn();
+ TimestampCombiner timestampCombiner = input.getWindowingStrategy().getTimestampCombiner();
WindowFn<KV<K, V>, BoundedWindow> windowFn = (WindowFn<KV<K, V>, BoundedWindow>)
input.getWindowingStrategy().getWindowFn();
JavaStream<WindowedValue<KV<K, List<V>>>> outputStream = inputStream
@@ -75,9 +74,8 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
new GearpumpWindowFn(windowFn.isNonMerging()),
EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
.groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window")
- .map(new KeyedByTimestamp<K, V>((OutputTimeFn<? super BoundedWindow>)
- input.getWindowingStrategy().getOutputTimeFn()), "keyed_by_timestamp")
- .fold(new Merge<>(windowFn, outputTimeFn), "merge")
+ .map(new KeyedByTimestamp<K, V>(timestampCombiner), "keyed_by_timestamp")
+ .fold(new Merge<>(windowFn, timestampCombiner), "merge")
.map(new Values<K, V>(), "values");
context.setOutputStream(context.getOutput(), outputStream);
@@ -148,17 +146,17 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
extends MapFunction<WindowedValue<KV<K, V>>,
KV<Instant, WindowedValue<KV<K, V>>>> {
- private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
+ private final TimestampCombiner timestampCombiner;
- public KeyedByTimestamp(OutputTimeFn<? super BoundedWindow> outputTimeFn) {
- this.outputTimeFn = outputTimeFn;
+ public KeyedByTimestamp(TimestampCombiner timestampCombiner) {
+ this.timestampCombiner = timestampCombiner;
}
@Override
public KV<org.joda.time.Instant, WindowedValue<KV<K, V>>> map(
WindowedValue<KV<K, V>> wv) {
- Instant timestamp = outputTimeFn.assignOutputTime(wv.getTimestamp(),
- Iterables.getOnlyElement(wv.getWindows()));
+ Instant timestamp = timestampCombiner.assign(
+ Iterables.getOnlyElement(wv.getWindows()), wv.getTimestamp());
return KV.of(timestamp, wv);
}
}
@@ -171,12 +169,12 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
KV<Instant, WindowedValue<KV<K, List<V>>>>> {
private final WindowFn<KV<K, V>, BoundedWindow> windowFn;
- private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
+ private final TimestampCombiner timestampCombiner;
Merge(WindowFn<KV<K, V>, BoundedWindow> windowFn,
- OutputTimeFn<? super BoundedWindow> outputTimeFn) {
+ TimestampCombiner timestampCombiner) {
this.windowFn = windowFn;
- this.outputTimeFn = outputTimeFn;
+ this.timestampCombiner = timestampCombiner;
}
@Override
@@ -229,7 +227,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
mergedWindows.addAll(wv1.getWindows());
}
- Instant timestamp = outputTimeFn.combine(t1, t2);
+ Instant timestamp = timestampCombiner.combine(t1, t2);
return KV.of(timestamp,
WindowedValue.of(wv1.getValue(), timestamp,
mergedWindows, wv1.getPane()));
http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index 3473f53..dfd6296 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -38,7 +38,6 @@ import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory;
-import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory;
import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils.RawUnionValue;
@@ -92,7 +91,6 @@ public class DoFnFunction<InputT, OutputT> extends
mainOutput,
sideOutputs,
new NoOpStepContext(),
- new NoOpAggregatorFactory(),
windowingStrategy
);
this.sideInputs = sideInputs;
http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
index 70b4271..8d55d6f 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.List;
-import org.apache.beam.runners.core.AggregatorFactory;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.ExecutionContext;
@@ -50,7 +49,6 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
private final TupleTag<OutputT> mainOutputTag;
private final List<TupleTag<?>> sideOutputTags;
private final ExecutionContext.StepContext stepContext;
- private final AggregatorFactory aggregatorFactory;
private final WindowingStrategy<?, ?> windowingStrategy;
public DoFnRunnerFactory(
@@ -61,7 +59,6 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
ExecutionContext.StepContext stepContext,
- AggregatorFactory aggregatorFactory,
WindowingStrategy<?, ?> windowingStrategy) {
this.fn = doFn;
this.options = pipelineOptions;
@@ -70,7 +67,6 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
this.mainOutputTag = mainOutputTag;
this.sideOutputTags = sideOutputTags;
this.stepContext = stepContext;
- this.aggregatorFactory = aggregatorFactory;
this.windowingStrategy = windowingStrategy;
}
@@ -78,7 +74,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
ReadyCheckingSideInputReader sideInputReader) {
DoFnRunner<InputT, OutputT> underlying = DoFnRunners.simpleRunner(
options, fn, sideInputReader, outputManager, mainOutputTag,
- sideOutputTags, stepContext, aggregatorFactory, windowingStrategy);
+ sideOutputTags, stepContext, windowingStrategy);
return SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
deleted file mode 100644
index 3436930..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators.utils;
-
-import java.io.Serializable;
-
-import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-
-/**
- * no-op aggregator factory.
- */
-public class NoOpAggregatorFactory implements AggregatorFactory, Serializable {
-
- @Override
- public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
- Class<?> fnClass,
- ExecutionContext.StepContext stepContext,
- String aggregatorName,
- Combine.CombineFn<InputT, AccumT, OutputT> combine) {
- return new NoOpAggregator<>();
- }
-
- private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>,
- java.io.Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void addValue(InputT value) {
- }
-
- @Override
- public String getName() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() {
- // TODO Auto-generated method stub
- return null;
- }
-
- };
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
index 4e0a74c..64fd615 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
@@ -61,7 +61,7 @@ public class NoOpStepContext implements ExecutionContext.StepContext, Serializab
}
@Override
- public StateInternals<?> stateInternals() {
+ public StateInternals stateInternals() {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
index 4e66ba9..86b60aa 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
@@ -33,10 +33,9 @@ import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator.Gearpum
import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -83,15 +82,15 @@ public class GroupByKeyTranslatorTest {
}
@Parameterized.Parameters(name = "{index}: {0}")
- public static Iterable<OutputTimeFn<BoundedWindow>> data() {
+ public static Iterable<TimestampCombiner> data() {
return ImmutableList.of(
- OutputTimeFns.outputAtEarliestInputTimestamp(),
- OutputTimeFns.outputAtLatestInputTimestamp(),
- OutputTimeFns.outputAtEndOfWindow());
+ TimestampCombiner.EARLIEST,
+ TimestampCombiner.LATEST,
+ TimestampCombiner.END_OF_WINDOW);
}
@Parameterized.Parameter(0)
- public OutputTimeFn<? super BoundedWindow> outputTimeFn;
+ public TimestampCombiner timestampCombiner;
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -99,15 +98,15 @@ public class GroupByKeyTranslatorTest {
BoundedWindow window =
new IntervalWindow(new org.joda.time.Instant(0), new org.joda.time.Instant(10));
GroupByKeyTranslator.KeyedByTimestamp keyedByTimestamp =
- new GroupByKeyTranslator.KeyedByTimestamp(outputTimeFn);
+ new GroupByKeyTranslator.KeyedByTimestamp(timestampCombiner);
WindowedValue<KV<String, String>> value =
WindowedValue.of(
KV.of("key", "val"), org.joda.time.Instant.now(), window, PaneInfo.NO_FIRING);
KV<org.joda.time.Instant, WindowedValue<KV<String, String>>> result =
keyedByTimestamp.map(value);
org.joda.time.Instant time =
- outputTimeFn.assignOutputTime(
- value.getTimestamp(), Iterables.getOnlyElement(value.getWindows()));
+ timestampCombiner.assign(Iterables.getOnlyElement(value.getWindows()),
+ value.getTimestamp());
assertThat(result, equalTo(KV.of(time, value)));
}
@@ -115,7 +114,8 @@ public class GroupByKeyTranslatorTest {
@SuppressWarnings({"rawtypes", "unchecked"})
public void testMerge() {
WindowFn slidingWindows = Sessions.withGapDuration(Duration.millis(10));
- GroupByKeyTranslator.Merge merge = new GroupByKeyTranslator.Merge(slidingWindows, outputTimeFn);
+ GroupByKeyTranslator.Merge merge = new GroupByKeyTranslator.Merge(slidingWindows,
+ timestampCombiner);
org.joda.time.Instant key1 = new org.joda.time.Instant(5);
WindowedValue<KV<String, String>> value1 =
WindowedValue.of(
@@ -140,7 +140,7 @@ public class GroupByKeyTranslatorTest {
KV<org.joda.time.Instant, WindowedValue<KV<String, List<String>>>> result2 =
merge.fold(result1, KV.of(key2, value2));
- assertThat(result2.getKey(), equalTo(outputTimeFn.combine(key1, key2)));
+ assertThat(result2.getKey(), equalTo(timestampCombiner.combine(key1, key2)));
Collection<? extends BoundedWindow> resultWindows = result2.getValue().getWindows();
assertThat(resultWindows.size(), equalTo(1));
IntervalWindow expectedWindow =