You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:33 UTC
[08/50] [abbrv] beam git commit: [BEAM-79] Fix gearpump-runner merge
conflicts and test failure
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
deleted file mode 100644
index 81970e2..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators;
-
-import com.google.common.collect.Iterables;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
-import org.joda.time.Instant;
-
-/**
- * {@link Window.Bound} is translated to Gearpump flatMap function.
- */
-@SuppressWarnings("unchecked")
-public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bound<T>> {
-
- private static final long serialVersionUID = -964887482120489061L;
-
- @Override
- public void translate(Window.Bound<T> transform, TranslationContext context) {
- PCollection<T> input = context.getInput(transform);
- JavaStream<WindowedValue<T>> inputStream = context.getInputStream(input);
- WindowingStrategy<?, ?> outputStrategy =
- transform.getOutputStrategyInternal(input.getWindowingStrategy());
- WindowFn<T, BoundedWindow> windowFn = (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
- JavaStream<WindowedValue<T>> outputStream =
- inputStream
- .flatMap(new AssignWindows(windowFn), "assign_windows");
-
- context.setOutputStream(context.getOutput(transform), outputStream);
- }
-
- private static class AssignWindows<T> extends
- FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
-
- private static final long serialVersionUID = 7284565861938681360L;
- private final WindowFn<T, BoundedWindow> windowFn;
-
- AssignWindows(WindowFn<T, BoundedWindow> windowFn) {
- this.windowFn = windowFn;
- }
-
- @Override
- public Iterator<WindowedValue<T>> flatMap(final WindowedValue<T> value) {
- try {
- Collection<BoundedWindow> windows = windowFn.assignWindows(windowFn.new AssignContext() {
- @Override
- public T element() {
- return value.getValue();
- }
-
- @Override
- public Instant timestamp() {
- return value.getTimestamp();
- }
-
- @Override
- public BoundedWindow window() {
- return Iterables.getOnlyElement(value.getWindows());
- }
- });
- List<WindowedValue<T>> values = new ArrayList<>(windows.size());
- for (BoundedWindow win: windows) {
- values.add(
- WindowedValue.of(value.getValue(), value.getTimestamp(), win, value.getPane()));
- }
- return values.iterator();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index b2c68d6..9941e71 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
@@ -48,7 +49,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
@@ -134,12 +134,16 @@ public class DoFnFunction<InputT, OutputT> extends
} else {
// side input
PCollectionView<?> sideInput = tagsToSideInputs.get(unionValue.getUnionTag());
- WindowedValue<Iterable<?>> sideInputValue =
- (WindowedValue<Iterable<?>>) unionValue.getValue();
+ WindowedValue<?> sideInputValue =
+ (WindowedValue<?>) unionValue.getValue();
+ Object value = sideInputValue.getValue();
+ if (!(value instanceof Iterable)) {
+ sideInputValue = sideInputValue.withValue(Lists.newArrayList(value));
+ }
if (!sideInputValues.containsKey(sideInput)) {
sideInputValues.put(sideInput, new LinkedList<WindowedValue<Iterable<?>>>());
}
- sideInputValues.get(sideInput).add(sideInputValue);
+ sideInputValues.get(sideInput).add((WindowedValue<Iterable<?>>) sideInputValue);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
index dfdecb2..cb912c1 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
public class UnboundedSourceWrapper<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
extends GearpumpSource<OutputT> {
+ private static final long serialVersionUID = -2453956849834747150L;
private final UnboundedSource<OutputT, CheckpointMarkT> source;
public UnboundedSourceWrapper(UnboundedSource<OutputT, CheckpointMarkT> source,
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
index 5db8320..bdfc336 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -25,12 +25,12 @@ import java.util.List;
import org.apache.beam.runners.core.AggregatorFactory;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SimpleDoFnRunner;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollectionView;
@@ -75,7 +75,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
ReadyCheckingSideInputReader sideInputReader) {
- DoFnRunner<InputT, OutputT> underlying = DoFnRunners.createDefault(
+ DoFnRunner<InputT, OutputT> underlying = DoFnRunners.simpleRunner(
options, fn, sideInputReader, outputManager, mainOutputTag,
sideOutputTags, stepContext, aggregatorFactory, windowingStrategy);
return PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
index 22ffc4d..3436930 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
@@ -21,9 +21,9 @@ package org.apache.beam.runners.gearpump.translators.utils;
import java.io.Serializable;
import org.apache.beam.runners.core.AggregatorFactory;
+import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.util.ExecutionContext;
/**
* no-op aggregator factory.
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
index 45f146b..140df2a 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
@@ -21,12 +21,12 @@ package org.apache.beam.runners.gearpump.translators.utils;
import java.io.IOException;
import java.io.Serializable;
+import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ExecutionContext;
-import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.TupleTag;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index b8a5233..b8f0ccb 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -38,8 +38,6 @@ import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
import org.apache.gearpump.streaming.dsl.window.impl.Window;
-
-
/**
* Utility methods for translators.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
index 10976e8..524887d 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.gearpump.streaming.dsl.window.impl.Window;
import org.junit.Test;
-
/**
* Tests for {@link TranslatorUtils}.
*/