You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:57 UTC
[32/50] [abbrv] beam git commit: Fix PCollectionView translation
Fix PCollectionView translation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a7b5d981
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a7b5d981
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a7b5d981
Branch: refs/heads/master
Commit: a7b5d981a57bae38a5ef70feb0c6bd167ce22ab0
Parents: f61822d
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jun 17 11:11:07 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jun 19 16:03:19 2017 +0800
----------------------------------------------------------------------
.../gearpump/GearpumpPipelineTranslator.java | 526 -------------------
.../beam/runners/gearpump/GearpumpRunner.java | 1 +
...CreateGearpumpPCollectionViewTranslator.java | 9 +-
.../CreateStreamingGearpumpView.java | 157 ++++++
.../translators/GearpumpPipelineTranslator.java | 143 +++++
.../translators/TranslationContext.java | 3 +-
.../translators/functions/DoFnFunction.java | 1 -
.../gearpump/translators/io/GearpumpSource.java | 2 +-
.../translators/utils/TranslatorUtils.java | 1 +
...teGearpumpPCollectionViewTranslatorTest.java | 7 +-
.../translators/io/GearpumpSourceTest.java | 18 +-
.../translators/utils/TranslatorUtilsTest.java | 5 +-
12 files changed, 327 insertions(+), 546 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
deleted file mode 100644
index 58b44a3..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ /dev/null
@@ -1,526 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.runners.core.construction.PTransformMatchers;
-import org.apache.beam.runners.core.construction.PTransformReplacements;
-import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
-import org.apache.beam.runners.gearpump.translators.CreateGearpumpPCollectionViewTranslator;
-import org.apache.beam.runners.gearpump.translators.FlattenPCollectionsTranslator;
-import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator;
-import org.apache.beam.runners.gearpump.translators.ParDoMultiOutputTranslator;
-import org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator;
-import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator;
-import org.apache.beam.runners.gearpump.translators.TransformTranslator;
-import org.apache.beam.runners.gearpump.translators.TranslationContext;
-import org.apache.beam.runners.gearpump.translators.WindowAssignTranslator;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
-import org.apache.beam.sdk.values.PValue;
-
-import org.apache.gearpump.util.Graph;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link GearpumpPipelineTranslator} knows how to translate {@link Pipeline} objects
- * into Gearpump {@link Graph}.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
-
- private static final Logger LOG = LoggerFactory.getLogger(
- GearpumpPipelineTranslator.class);
-
- /**
- * A map from {@link PTransform} subclass to the corresponding
- * {@link TransformTranslator} to use to translate that transform.
- */
- private static final Map<Class<? extends PTransform>, TransformTranslator>
- transformTranslators = new HashMap<>();
-
- private final TranslationContext translationContext;
-
- static {
- // register TransformTranslators
- registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
- registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
- registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
- registerTransformTranslator(Flatten.PCollections.class,
- new FlattenPCollectionsTranslator());
- registerTransformTranslator(ParDo.MultiOutput.class, new ParDoMultiOutputTranslator());
- registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator());
- registerTransformTranslator(CreateGearpumpPCollectionView.class,
- new CreateGearpumpPCollectionViewTranslator<>());
- }
-
- public GearpumpPipelineTranslator(TranslationContext translationContext) {
- this.translationContext = translationContext;
- }
-
- public void translate(Pipeline pipeline) {
- List<PTransformOverride> overrides =
- ImmutableList.<PTransformOverride>builder()
- .add(PTransformOverride.of(
- PTransformMatchers.classEqualTo(View.AsMap.class),
- new ReflectiveOneToOneOverrideFactory(StreamingViewAsMap.class)))
- .add(PTransformOverride.of(
- PTransformMatchers.classEqualTo(View.AsMultimap.class),
- new ReflectiveOneToOneOverrideFactory(StreamingViewAsMultimap.class)))
- .add(PTransformOverride.of(
- PTransformMatchers.classEqualTo(View.AsSingleton.class),
- new ReflectiveOneToOneOverrideFactory(StreamingViewAsSingleton.class)))
- .add(PTransformOverride.of(
- PTransformMatchers.classEqualTo(View.AsList.class),
- new ReflectiveOneToOneOverrideFactory(StreamingViewAsList.class)))
- .add(PTransformOverride.of(
- PTransformMatchers.classEqualTo(View.AsIterable.class),
- new ReflectiveOneToOneOverrideFactory(StreamingViewAsIterable.class)))
- .add(PTransformOverride.of(
- PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
- new ReflectiveOneToOneOverrideFactory(
- StreamingCombineGloballyAsSingletonView.class)))
- .build();
-
- pipeline.replaceAll(overrides);
- pipeline.traverseTopologically(this);
- }
-
- @Override
- public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
- LOG.debug("entering composite transform {}", node.getTransform());
- return CompositeBehavior.ENTER_TRANSFORM;
- }
-
- @Override
- public void leaveCompositeTransform(TransformHierarchy.Node node) {
- LOG.debug("leaving composite transform {}", node.getTransform());
- }
-
- @Override
- public void visitPrimitiveTransform(TransformHierarchy.Node node) {
- LOG.info("visiting transform {}", node.getTransform());
- PTransform transform = node.getTransform();
- TransformTranslator translator = getTransformTranslator(transform.getClass());
- if (null == translator) {
- throw new IllegalStateException(
- "no translator registered for " + transform);
- }
- translationContext.setCurrentTransform(node, getPipeline());
- translator.translate(transform, translationContext);
- }
-
- @Override
- public void visitValue(PValue value, TransformHierarchy.Node producer) {
- LOG.debug("visiting value {}", value);
- }
-
- /**
- * Records that instances of the specified PTransform class
- * should be translated by default by the corresponding
- * {@link TransformTranslator}.
- */
- private static <TransformT extends PTransform> void registerTransformTranslator(
- Class<TransformT> transformClass,
- TransformTranslator<? extends TransformT> transformTranslator) {
- if (transformTranslators.put(transformClass, transformTranslator) != null) {
- throw new IllegalArgumentException(
- "defining multiple translators for " + transformClass);
- }
- }
-
- /**
- * Returns the {@link TransformTranslator} to use for instances of the
- * specified PTransform class, or null if none registered.
- */
- private <TransformT extends PTransform>
- TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
- return transformTranslators.get(transformClass);
- }
-
- // The following codes are forked from DataflowRunner for View translator
- private static class ReflectiveOneToOneOverrideFactory<
- InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
- extends SingleInputOutputOverrideFactory<
- PCollection<InputT>, PCollection<OutputT>, TransformT> {
- private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement;
-
- private ReflectiveOneToOneOverrideFactory(
- Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement) {
- this.replacement = replacement;
- }
-
- @Override
- public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform(
- AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) {
- return PTransformReplacement.of(
- PTransformReplacements.getSingletonMainInput(transform),
- InstanceBuilder.ofType(replacement)
- .withArg(
- (Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>)
- transform.getTransform().getClass(),
- transform.getTransform())
- .build());
- }
- }
-
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
- * for the Gearpump runner.
- */
- private static class StreamingViewAsMap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-
- private static final long serialVersionUID = 4791080760092950304L;
-
- public StreamingViewAsMap(View.AsMap<K, V> transform) {}
-
- @Override
- public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
- PCollectionView<Map<K, V>> view =
- PCollectionViews.mapView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- inputCoder.getKeyCoder().verifyDeterministic();
- } catch (Coder.NonDeterministicException e) {
- // throw new RuntimeException(e);
- }
-
- return input
- .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
- .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, V>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsMap";
- }
- }
-
- /**
- * Specialized expansion for {@link
- * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
- * Gearpump runner.
- */
- private static class StreamingViewAsMultimap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
-
- private static final long serialVersionUID = 5854899081751333352L;
-
- public StreamingViewAsMultimap(View.AsMultimap<K, V> transform) {}
-
- @Override
- public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
- PCollectionView<Map<K, Iterable<V>>> view =
- PCollectionViews.multimapView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- inputCoder.getKeyCoder().verifyDeterministic();
- } catch (Coder.NonDeterministicException e) {
- // throw new RuntimeException(e);
- }
-
- return input
- .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
- .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsMultimap";
- }
- }
-
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the
- * Gearpump runner.
- */
- private static class StreamingViewAsIterable<T>
- extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
-
- private static final long serialVersionUID = -3399860618995613421L;
-
- public StreamingViewAsIterable(View.AsIterable<T> transform) {}
-
- @Override
- public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
- PCollectionView<Iterable<T>> view =
- PCollectionViews.iterableView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
- .apply(CreateGearpumpPCollectionView.<T, Iterable<T>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsIterable";
- }
- }
-
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
- * Gearpump runner.
- */
- private static class StreamingViewAsList<T>
- extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
-
- private static final long serialVersionUID = -5018631473886330629L;
-
- public StreamingViewAsList(View.AsList<T> transform) {}
-
- @Override
- public PCollectionView<List<T>> expand(PCollection<T> input) {
- PCollectionView<List<T>> view =
- PCollectionViews.listView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
- .apply(CreateGearpumpPCollectionView.<T, List<T>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsList";
- }
- }
-
-
- private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
- extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
-
- private static final long serialVersionUID = 9064900748869035738L;
- private final Combine.GloballyAsSingletonView<InputT, OutputT> transform;
-
- public StreamingCombineGloballyAsSingletonView(
- Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollectionView<OutputT> expand(PCollection<InputT> input) {
- PCollection<OutputT> combined =
- input.apply(Combine.globally(transform.getCombineFn())
- .withoutDefaults()
- .withFanout(transform.getFanout()));
-
- PCollectionView<OutputT> view = PCollectionViews.singletonView(
- combined,
- combined.getWindowingStrategy(),
- transform.getInsertDefault(),
- transform.getInsertDefault()
- ? transform.getCombineFn().defaultValue() : null,
- combined.getCoder());
- return combined
- .apply(ParDo.of(new WrapAsList<OutputT>()))
- .apply(CreateGearpumpPCollectionView.<OutputT, OutputT>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingCombineGloballyAsSingletonView";
- }
- }
-
- private static class StreamingViewAsSingleton<T>
- extends PTransform<PCollection<T>, PCollectionView<T>> {
-
- private static final long serialVersionUID = 5870455965625071546L;
- private final View.AsSingleton<T> transform;
-
- public StreamingViewAsSingleton(View.AsSingleton<T> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollectionView<T> expand(PCollection<T> input) {
- Combine.Globally<T, T> combine = Combine.globally(
- new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
- if (!transform.hasDefaultValue()) {
- combine = combine.withoutDefaults();
- }
- return input.apply(combine.asSingletonView());
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsSingleton";
- }
-
- private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
- private boolean hasDefaultValue;
- private T defaultValue;
-
- SingletonCombine(boolean hasDefaultValue, T defaultValue) {
- this.hasDefaultValue = hasDefaultValue;
- this.defaultValue = defaultValue;
- }
-
- @Override
- public T apply(T left, T right) {
- throw new IllegalArgumentException("PCollection with more than one element "
- + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
- + "combine the PCollection into a single value");
- }
-
- @Override
- public T identity() {
- if (hasDefaultValue) {
- return defaultValue;
- } else {
- throw new IllegalArgumentException(
- "Empty PCollection accessed as a singleton view. "
- + "Consider setting withDefault to provide a default value");
- }
- }
- }
- }
-
- private static class WrapAsList<T> extends DoFn<T, List<T>> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(Collections.singletonList(c.element()));
- }
- }
-
- /**
- * Creates a primitive {@link PCollectionView}.
- *
- * <p>For internal use only by runner implementors.
- *
- * @param <ElemT> The type of the elements of the input PCollection
- * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
- */
- public static class CreateGearpumpPCollectionView<ElemT, ViewT>
- extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
- private static final long serialVersionUID = -2637073020800540542L;
- private PCollectionView<ViewT> view;
-
- private CreateGearpumpPCollectionView(PCollectionView<ViewT> view) {
- this.view = view;
- }
-
- public static <ElemT, ViewT> CreateGearpumpPCollectionView<ElemT, ViewT> of(
- PCollectionView<ViewT> view) {
- return new CreateGearpumpPCollectionView<>(view);
- }
-
- public PCollectionView<ViewT> getView() {
- return view;
- }
-
- @Override
- public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
- return view;
- }
- }
-
- /**
- * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
- *
- * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
- * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
- * They require the input {@link PCollection} fits in memory.
- * For a large {@link PCollection} this is expected to crash!
- *
- * @param <T> the type of elements to concatenate.
- */
- private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
- @Override
- public List<T> createAccumulator() {
- return new ArrayList<>();
- }
-
- @Override
- public List<T> addInput(List<T> accumulator, T input) {
- accumulator.add(input);
- return accumulator;
- }
-
- @Override
- public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
- List<T> result = createAccumulator();
- for (List<T> accumulator : accumulators) {
- result.addAll(accumulator);
- }
- return result;
- }
-
- @Override
- public List<T> extractOutput(List<T> accumulator) {
- return accumulator;
- }
-
- @Override
- public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
-
- @Override
- public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 30b1935..ae59121 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -23,6 +23,7 @@ import com.typesafe.config.ConfigValueFactory;
import java.util.HashMap;
import java.util.Map;
+import org.apache.beam.runners.gearpump.translators.GearpumpPipelineTranslator;
import org.apache.beam.runners.gearpump.translators.TranslationContext;
import org.apache.beam.sdk.Pipeline;
http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
index d7588c2..559cb28 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.gearpump.translators;
import java.util.List;
-import org.apache.beam.runners.gearpump.GearpumpPipelineTranslator;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
@@ -30,17 +29,17 @@ import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
* transforms.
*/
public class CreateGearpumpPCollectionViewTranslator<ElemT, ViewT> implements
- TransformTranslator<GearpumpPipelineTranslator.CreateGearpumpPCollectionView<ElemT, ViewT>> {
+ TransformTranslator<CreateStreamingGearpumpView.CreateGearpumpPCollectionView<ElemT, ViewT>> {
private static final long serialVersionUID = -3955521308055056034L;
@Override
public void translate(
- GearpumpPipelineTranslator.CreateGearpumpPCollectionView<ElemT, ViewT> transform,
+ CreateStreamingGearpumpView.CreateGearpumpPCollectionView<ElemT, ViewT> transform,
TranslationContext context) {
JavaStream<WindowedValue<List<ElemT>>> inputStream =
context.getInputStream(context.getInput());
- PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput();
- context.setOutputStream(view.getPCollection(), inputStream);
+ PCollectionView<ViewT> view = transform.getView();
+ context.setOutputStream(view, inputStream);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateStreamingGearpumpView.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateStreamingGearpumpView.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateStreamingGearpumpView.java
new file mode 100644
index 0000000..60577b3
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateStreamingGearpumpView.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.gearpump.translators;
+
+import com.google.common.collect.Iterables;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/** Gearpump streaming overrides for various view (side input) transforms. */
+class CreateStreamingGearpumpView<ElemT, ViewT>
+ extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
+ private final PCollectionView<ViewT> view;
+
+ public CreateStreamingGearpumpView(PCollectionView<ViewT> view) {
+ this.view = view;
+ }
+
+ @Override
+ public PCollection<ElemT> expand(PCollection<ElemT> input) {
+ input
+ .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults())
+ .apply(CreateGearpumpPCollectionView.<ElemT, ViewT>of(view));
+ return input;
+ }
+
+ /**
+ * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+ *
+ * <p>For internal use by {@link CreateStreamingGearpumpView}. This combiner requires that
+ * the input {@link PCollection} fits in memory. For a large {@link PCollection} this is
+ * expected to crash!
+ *
+ * @param <T> the type of elements to concatenate.
+ */
+ private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+ @Override
+ public List<T> createAccumulator() {
+ return new ArrayList<T>();
+ }
+
+ @Override
+ public List<T> addInput(List<T> accumulator, T input) {
+ accumulator.add(input);
+ return accumulator;
+ }
+
+ @Override
+ public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+ List<T> result = createAccumulator();
+ for (List<T> accumulator : accumulators) {
+ result.addAll(accumulator);
+ }
+ return result;
+ }
+
+ @Override
+ public List<T> extractOutput(List<T> accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+
+ @Override
+ public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+ }
+
+ /**
+ * Creates a primitive {@link PCollectionView}.
+ *
+ * <p>For internal use only by runner implementors.
+ *
+ * @param <ElemT> The type of the elements of the input PCollection
+ * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+ */
+ public static class CreateGearpumpPCollectionView<ElemT, ViewT>
+ extends PTransform<PCollection<List<ElemT>>, PCollection<List<ElemT>>> {
+ private PCollectionView<ViewT> view;
+
+ private CreateGearpumpPCollectionView(PCollectionView<ViewT> view) {
+ this.view = view;
+ }
+
+ public static <ElemT, ViewT> CreateGearpumpPCollectionView<ElemT, ViewT> of(
+ PCollectionView<ViewT> view) {
+ return new CreateGearpumpPCollectionView<>(view);
+ }
+
+ @Override
+ public PCollection<List<ElemT>> expand(PCollection<List<ElemT>> input) {
+ return PCollection.<List<ElemT>>createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
+ .setCoder(input.getCoder());
+ }
+
+ public PCollectionView<ViewT> getView() {
+ return view;
+ }
+ }
+
+ public static class Factory<ElemT, ViewT>
+ implements PTransformOverrideFactory<
+ PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> {
+ public Factory() {}
+
+ @Override
+ public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform(
+ AppliedPTransform<
+ PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>>
+ transform) {
+ return PTransformReplacement.of(
+ (PCollection<ElemT>) Iterables.getOnlyElement(transform.getInputs().values()),
+ new CreateStreamingGearpumpView<ElemT, ViewT>(transform.getTransform().getView()));
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PValue> outputs, PCollection<ElemT> newOutput) {
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GearpumpPipelineTranslator.java
new file mode 100644
index 0000000..ca98aac
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GearpumpPipelineTranslator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PValue;
+
+import org.apache.gearpump.util.Graph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link GearpumpPipelineTranslator} knows how to translate {@link Pipeline} objects
+ * into Gearpump {@link Graph}.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ GearpumpPipelineTranslator.class);
+
+ /**
+ * A map from {@link PTransform} subclass to the corresponding
+ * {@link TransformTranslator} to use to translate that transform.
+ */
+ private static final Map<Class<? extends PTransform>, TransformTranslator>
+ transformTranslators = new HashMap<>();
+
+ private final TranslationContext translationContext;
+
+ static {
+ // register TransformTranslators
+ registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
+ registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
+ registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
+ registerTransformTranslator(Flatten.PCollections.class,
+ new FlattenPCollectionsTranslator());
+ registerTransformTranslator(ParDo.MultiOutput.class, new ParDoMultiOutputTranslator());
+ registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator());
+ registerTransformTranslator(CreateStreamingGearpumpView.CreateGearpumpPCollectionView.class,
+ new CreateGearpumpPCollectionViewTranslator());
+ }
+
+ public GearpumpPipelineTranslator(TranslationContext translationContext) {
+ this.translationContext = translationContext;
+ }
+
+ public void translate(Pipeline pipeline) {
+ List<PTransformOverride> overrides =
+ ImmutableList.<PTransformOverride>builder()
+ .add(PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
+ new CreateStreamingGearpumpView.Factory()))
+ .build();
+
+ pipeline.replaceAll(overrides);
+ pipeline.traverseTopologically(this);
+ }
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+ LOG.debug("entering composite transform {}", node.getTransform());
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformHierarchy.Node node) {
+ LOG.debug("leaving composite transform {}", node.getTransform());
+ }
+
+ @Override
+ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+ LOG.debug("visiting transform {}", node.getTransform());
+ PTransform transform = node.getTransform();
+ TransformTranslator translator = getTransformTranslator(transform.getClass());
+ if (null == translator) {
+ throw new IllegalStateException(
+ "no translator registered for " + transform);
+ }
+ translationContext.setCurrentTransform(node, getPipeline());
+ translator.translate(transform, translationContext);
+ }
+
+ @Override
+ public void visitValue(PValue value, TransformHierarchy.Node producer) {
+ LOG.debug("visiting value {}", value);
+ }
+
+ /**
+ * Records that instances of the specified PTransform class
+ * should be translated by default by the corresponding
+ * {@link TransformTranslator}.
+ */
+ private static <TransformT extends PTransform> void registerTransformTranslator(
+ Class<TransformT> transformClass,
+ TransformTranslator<? extends TransformT> transformTranslator) {
+ if (transformTranslators.put(transformClass, transformTranslator) != null) {
+ throw new IllegalArgumentException(
+ "defining multiple translators for " + transformClass);
+ }
+ }
+
+ /**
+ * Returns the {@link TransformTranslator} to use for instances of the
+ * specified PTransform class, or null if none registered.
+ */
+ private <TransformT extends PTransform>
+ TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
+ return transformTranslators.get(transformClass);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index 64a1e0d..42b7a53 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Iterables;
import java.util.HashMap;
import java.util.Map;
+import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -78,7 +79,7 @@ public class TranslationContext {
}
public PValue getInput() {
- return Iterables.getOnlyElement(getInputs().values());
+ return Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform()));
}
public Map<TupleTag<?>, PValue> getOutputs() {
http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index e2777df..b20896a 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -135,7 +135,6 @@ public class DoFnFunction<InputT, OutputT> extends
}
}
-
for (PCollectionView<?> sideInput: sideInputs) {
for (WindowedValue<InputT> value : pushedBackValues) {
for (BoundedWindow win: value.getWindows()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index 6637a9b..558eb0d 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -80,7 +80,7 @@ public abstract class GearpumpSource<T> implements DataSource {
org.joda.time.Instant timestamp = reader.getCurrentTimestamp();
message = new DefaultMessage(
WindowedValue.timestampedValueInGlobalWindow(data, timestamp),
- TranslatorUtils.jodaTimeToJava8Time(timestamp));
+ timestamp.getMillis());
}
available = reader.advance();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index 83fc6e6..b1cd61c 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -52,6 +52,7 @@ public class TranslatorUtils {
}
public static Window boundedWindowToGearpumpWindow(BoundedWindow window) {
+ // Gearpump window upper bound is exclusive
Instant end = TranslatorUtils.jodaTimeToJava8Time(window.maxTimestamp().plus(1L));
if (window instanceof IntervalWindow) {
IntervalWindow intervalWindow = (IntervalWindow) window;
http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
index b23b0c6..511eed1 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
@@ -23,7 +23,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import org.apache.beam.runners.gearpump.GearpumpPipelineTranslator;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
@@ -38,8 +37,8 @@ public class CreateGearpumpPCollectionViewTranslatorTest {
CreateGearpumpPCollectionViewTranslator translator =
new CreateGearpumpPCollectionViewTranslator();
- GearpumpPipelineTranslator.CreateGearpumpPCollectionView pCollectionView =
- mock(GearpumpPipelineTranslator.CreateGearpumpPCollectionView.class);
+ CreateStreamingGearpumpView.CreateGearpumpPCollectionView pCollectionView =
+ mock(CreateStreamingGearpumpView.CreateGearpumpPCollectionView.class);
JavaStream javaStream = mock(JavaStream.class);
TranslationContext translationContext = mock(TranslationContext.class);
@@ -49,7 +48,7 @@ public class CreateGearpumpPCollectionViewTranslatorTest {
when(translationContext.getInputStream(mockInput)).thenReturn(javaStream);
PCollectionView view = mock(PCollectionView.class);
- when(translationContext.getOutput()).thenReturn(view);
+ when(pCollectionView.getView()).thenReturn(view);
translator.translate(pCollectionView, translationContext);
verify(translationContext, times(1)).setOutputStream(view, javaStream);
http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
index 4490737..cc4284f 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.gearpump.DefaultMessage;
@@ -42,10 +43,11 @@ import org.junit.Test;
public class GearpumpSourceTest {
private static final List<TimestampedValue<String>> TEST_VALUES =
Lists.newArrayList(
- TimestampedValue.of("a", new org.joda.time.Instant(Long.MIN_VALUE)),
+ TimestampedValue.of("a", BoundedWindow.TIMESTAMP_MIN_VALUE),
TimestampedValue.of("b", new org.joda.time.Instant(0)),
TimestampedValue.of("c", new org.joda.time.Instant(53)),
- TimestampedValue.of("d", new org.joda.time.Instant(Long.MAX_VALUE - 1)));
+ TimestampedValue.of("d", BoundedWindow.TIMESTAMP_MAX_VALUE)
+ );
private static class SourceForTest<T> extends GearpumpSource<T> {
private ValuesSource<T> valuesSource;
@@ -72,10 +74,16 @@ public class GearpumpSourceTest {
new SourceForTest<>(options, valuesSource);
sourceForTest.open(null, Instant.EPOCH);
- for (TimestampedValue<String> value : TEST_VALUES) {
+ for (int i = 0; i < TEST_VALUES.size(); i++) {
+ TimestampedValue<String> value = TEST_VALUES.get(i);
+
// Check the watermark first since the Source will advance when it's opened
- Instant expectedWaterMark = TranslatorUtils.jodaTimeToJava8Time(value.getTimestamp());
- Assert.assertEquals(expectedWaterMark, sourceForTest.getWatermark());
+ if (i < TEST_VALUES.size() - 1) {
+ Instant expectedWaterMark = TranslatorUtils.jodaTimeToJava8Time(value.getTimestamp());
+ Assert.assertEquals(expectedWaterMark, sourceForTest.getWatermark());
+ } else {
+ Assert.assertEquals(Watermark.MAX(), sourceForTest.getWatermark());
+ }
Message expectedMsg =
new DefaultMessage(
http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
index 524887d..6ebe59b 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertThat;
import com.google.common.collect.Lists;
-import java.time.Duration;
import java.time.Instant;
import java.util.List;
@@ -68,7 +67,7 @@ public class TranslatorUtilsTest {
Instant.ofEpochMilli(Long.MAX_VALUE))));
BoundedWindow globalWindow = GlobalWindow.INSTANCE;
assertThat(TranslatorUtils.boundedWindowToGearpumpWindow(globalWindow),
- equalTo(Window.apply(Instant.ofEpochMilli(Long.MIN_VALUE / 1000),
- Instant.ofEpochMilli(Long.MAX_VALUE / 1000).minus(Duration.ofDays(1)).plusMillis(1))));
+ equalTo(Window.apply(Instant.ofEpochMilli(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()),
+ Instant.ofEpochMilli(globalWindow.maxTimestamp().getMillis() + 1))));
}
}