You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:33 UTC

[08/50] [abbrv] beam git commit: [BEAM-79] Fix gearpump-runner merge conflicts and test failure

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
deleted file mode 100644
index 81970e2..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators;
-
-import com.google.common.collect.Iterables;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
-import org.joda.time.Instant;
-
-/**
- * {@link Window.Bound} is translated to Gearpump flatMap function.
- */
-@SuppressWarnings("unchecked")
-public class WindowBoundTranslator<T> implements  TransformTranslator<Window.Bound<T>> {
-
-  private static final long serialVersionUID = -964887482120489061L;
-
-  @Override
-  public void translate(Window.Bound<T> transform, TranslationContext context) {
-    PCollection<T> input = context.getInput(transform);
-    JavaStream<WindowedValue<T>> inputStream = context.getInputStream(input);
-    WindowingStrategy<?, ?> outputStrategy =
-        transform.getOutputStrategyInternal(input.getWindowingStrategy());
-    WindowFn<T, BoundedWindow> windowFn = (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
-    JavaStream<WindowedValue<T>> outputStream =
-        inputStream
-            .flatMap(new AssignWindows(windowFn), "assign_windows");
-
-    context.setOutputStream(context.getOutput(transform), outputStream);
-  }
-
-  private static class AssignWindows<T> extends
-      FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
-
-    private static final long serialVersionUID = 7284565861938681360L;
-    private final WindowFn<T, BoundedWindow> windowFn;
-
-    AssignWindows(WindowFn<T, BoundedWindow> windowFn) {
-      this.windowFn = windowFn;
-    }
-
-    @Override
-    public Iterator<WindowedValue<T>> flatMap(final WindowedValue<T> value) {
-      try {
-        Collection<BoundedWindow> windows = windowFn.assignWindows(windowFn.new AssignContext() {
-          @Override
-          public T element() {
-            return value.getValue();
-          }
-
-          @Override
-          public Instant timestamp() {
-            return value.getTimestamp();
-          }
-
-          @Override
-          public BoundedWindow window() {
-            return Iterables.getOnlyElement(value.getWindows());
-          }
-        });
-        List<WindowedValue<T>> values = new ArrayList<>(windows.size());
-        for (BoundedWindow win: windows) {
-          values.add(
-              WindowedValue.of(value.getValue(), value.getTimestamp(), win, value.getPane()));
-        }
-        return values.iterator();
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index b2c68d6..9941e71 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.InMemoryStateInternals;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
@@ -48,7 +49,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 
-import org.apache.beam.sdk.util.state.InMemoryStateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
@@ -134,12 +134,16 @@ public class DoFnFunction<InputT, OutputT> extends
       } else {
         // side input
         PCollectionView<?> sideInput = tagsToSideInputs.get(unionValue.getUnionTag());
-        WindowedValue<Iterable<?>> sideInputValue =
-            (WindowedValue<Iterable<?>>) unionValue.getValue();
+        WindowedValue<?> sideInputValue =
+            (WindowedValue<?>) unionValue.getValue();
+        Object value = sideInputValue.getValue();
+        if (!(value instanceof Iterable)) {
+          sideInputValue = sideInputValue.withValue(Lists.newArrayList(value));
+        }
         if (!sideInputValues.containsKey(sideInput)) {
           sideInputValues.put(sideInput, new LinkedList<WindowedValue<Iterable<?>>>());
         }
-        sideInputValues.get(sideInput).add(sideInputValue);
+        sideInputValues.get(sideInput).add((WindowedValue<Iterable<?>>) sideInputValue);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
index dfdecb2..cb912c1 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 public class UnboundedSourceWrapper<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
     extends GearpumpSource<OutputT> {
 
+  private static final long serialVersionUID = -2453956849834747150L;
   private final UnboundedSource<OutputT, CheckpointMarkT> source;
 
   public UnboundedSourceWrapper(UnboundedSource<OutputT, CheckpointMarkT> source,

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
index 5db8320..bdfc336 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -25,12 +25,12 @@ import java.util.List;
 import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SimpleDoFnRunner;
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -75,7 +75,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
 
   public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
       ReadyCheckingSideInputReader sideInputReader) {
-    DoFnRunner<InputT, OutputT> underlying = DoFnRunners.createDefault(
+    DoFnRunner<InputT, OutputT> underlying = DoFnRunners.simpleRunner(
         options, fn, sideInputReader, outputManager, mainOutputTag,
         sideOutputTags, stepContext, aggregatorFactory, windowingStrategy);
     return PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
index 22ffc4d..3436930 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
@@ -21,9 +21,9 @@ package org.apache.beam.runners.gearpump.translators.utils;
 import java.io.Serializable;
 
 import org.apache.beam.runners.core.AggregatorFactory;
+import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.util.ExecutionContext;
 
 /**
  * no-op aggregator factory.

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
index 45f146b..140df2a 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
@@ -21,12 +21,12 @@ package org.apache.beam.runners.gearpump.translators.utils;
 import java.io.IOException;
 import java.io.Serializable;
 
+import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ExecutionContext;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index b8a5233..b8f0ccb 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -38,8 +38,6 @@ import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 import org.apache.gearpump.streaming.dsl.window.impl.Window;
 
-
-
 /**
  * Utility methods for translators.
  */

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
index 10976e8..524887d 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.gearpump.streaming.dsl.window.impl.Window;
 import org.junit.Test;
 
-
 /**
  * Tests for {@link TranslatorUtils}.
  */