You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:36 UTC

[11/50] [abbrv] beam git commit: [BEAM-972] Add unit tests to Gearpump runner

[BEAM-972] Add unit tests to Gearpump runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eb0d333d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eb0d333d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eb0d333d

Branch: refs/heads/master
Commit: eb0d333df23624f54aae2abb8d7c7873f8ed2a7a
Parents: 555842a
Author: huafengw <fv...@gmail.com>
Authored: Tue Mar 21 19:45:10 2017 +0800
Committer: huafengw <fv...@gmail.com>
Committed: Thu Mar 23 19:52:11 2017 +0800

----------------------------------------------------------------------
 examples/java/pom.xml                           | 12 +++
 pom.xml                                         |  6 ++
 runners/gearpump/README.md                      | 41 ++++++++-
 runners/gearpump/pom.xml                        |  2 -
 .../gearpump/GearpumpRunnerRegistrar.java       |  4 +-
 .../translators/WindowAssignTranslator.java     |  2 +-
 .../gearpump/translators/io/ValuesSource.java   |  2 -
 .../gearpump/GearpumpRunnerRegistrarTest.java   | 55 ++++++++++++
 .../runners/gearpump/PipelineOptionsTest.java   | 73 ++++++++++++++++
 .../translators/io/GearpumpSourceTest.java      | 90 ++++++++++++++++++++
 .../gearpump/translators/io/ValueSoureTest.java | 82 ++++++++++++++++++
 11 files changed, 362 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index ed4a1d4..0a6d8fe 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -87,6 +87,18 @@
       </dependencies>
     </profile>
 
+    <!-- Include the Apache Gearpump (incubating) runner with -P gearpump-runner -->
+    <profile>
+      <id>gearpump-runner</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-gearpump</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+
     <!-- Include the Apache Flink runner with -P flink-runner -->
     <profile>
       <id>flink-runner</id>

http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c3b8476..2cdb09d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -475,6 +475,12 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
+        <artifactId>beam-runners-gearpump</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
         <artifactId>beam-examples-java</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/README.md
----------------------------------------------------------------------
diff --git a/runners/gearpump/README.md b/runners/gearpump/README.md
index ad043fa..e8ce794 100644
--- a/runners/gearpump/README.md
+++ b/runners/gearpump/README.md
@@ -19,4 +19,43 @@
 
 ## Gearpump Beam Runner
 
-The Gearpump Beam runner allows users to execute pipelines written using the Apache Beam programming API with Apache Gearpump (incubating) as an execution engine. 
\ No newline at end of file
+The Gearpump Beam runner allows users to execute pipelines written using the Apache Beam programming API with Apache Gearpump (incubating) as an execution engine.
+
+##Getting Started
+
+The following shows how to run the WordCount example that is provided with the source code on Beam.
+
+###Installing Beam
+
+To get the latest version of Beam with Gearpump-Runner, first clone the Beam repository:
+
+```
+git clone https://github.com/apache/beam
+git checkout gearpump-runner
+```
+
+Then switch to the newly created directory and run Maven to build the Apache Beam:
+
+```
+cd beam
+mvn clean install -DskipTests
+```
+
+Now Apache Beam and the Gearpump Runner are installed in your local Maven repository.
+
+###Running Wordcount Example
+
+Download something to count:
+
+```
+curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > /tmp/kinglear.txt
+```
+
+Run the pipeline, using the Gearpump runner:
+
+```
+cd examples/java
+mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--inputFile=/tmp/kinglear.txt --output=/tmp/wordcounts.txt --runner=TestGearpumpRunner" -Pgearpump-runner
+```
+
+Once completed, check the output file /tmp/wordcounts.txt-00000-of-00001

http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 9a6a432..a691801 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -99,13 +99,11 @@
       <groupId>org.apache.gearpump</groupId>
       <artifactId>gearpump-streaming_2.11</artifactId>
       <version>${gearpump.version}</version>
-      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.gearpump</groupId>
       <artifactId>gearpump-core_2.11</artifactId>
       <version>${gearpump.version}</version>
-      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>com.typesafe</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java
index b77e1e3..3183d45 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java
@@ -44,7 +44,9 @@ public class GearpumpRunnerRegistrar {
 
     @Override
     public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
-      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(TestGearpumpRunner.class);
+      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
+        GearpumpRunner.class,
+        TestGearpumpRunner.class);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
index fe6015a..29d8f02 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
@@ -39,7 +39,7 @@ import org.joda.time.Instant;
  * {@link Window.Bound} is translated to Gearpump flatMap function.
  */
 @SuppressWarnings("unchecked")
-public class WindowAssignTranslator<T> implements  TransformTranslator<Window.Assign<T>> {
+public class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> {
 
   private static final long serialVersionUID = -964887482120489061L;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
index e0488cd..ccd5cdf 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
@@ -110,8 +110,6 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
       this.source = source;
     }
 
-
-
     @Override
     public boolean start() throws IOException {
       if (null == iterator) {

http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrarTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrarTest.java
new file mode 100644
index 0000000..9a01d20
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrarTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+
+/**
+ * Tests for {@link GearpumpRunnerRegistrar}.
+ */
+public class GearpumpRunnerRegistrarTest {
+
+  @Test
+  public void testFullName() {
+    String[] args =
+      new String[] {String.format("--runner=%s", GearpumpRunner.class.getName())};
+    PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+    assertEquals(opts.getRunner(), GearpumpRunner.class);
+  }
+
+  @Test
+  public void testClassName() {
+    String[] args =
+      new String[] {String.format("--runner=%s", GearpumpRunner.class.getSimpleName())};
+    PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+    assertEquals(opts.getRunner(), GearpumpRunner.class);
+  }
+
+  @Test
+  public void testOptions() {
+    assertEquals(
+      ImmutableList.of(GearpumpPipelineOptions.class),
+      new GearpumpRunnerRegistrar.Options().getPipelineOptions());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java
new file mode 100644
index 0000000..994856b
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.gearpump;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.gearpump.cluster.ClusterConfig;
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+import org.junit.Test;
+
+/**
+ * Tests for {@link GearpumpPipelineOptions}.
+ */
+public class PipelineOptionsTest {
+
+  @Test
+  public void testIgnoredFieldSerialization() throws IOException {
+    String appName = "forTest";
+    Map<String, String> serializers = Maps.newHashMap();
+    serializers.put("classA", "SerializerA");
+    GearpumpPipelineOptions options = PipelineOptionsFactory.create()
+      .as(GearpumpPipelineOptions.class);
+    Config config = ClusterConfig.master(null);
+    EmbeddedCluster cluster = new EmbeddedCluster(config);
+    options.setSerializers(serializers);
+    options.setApplicationName(appName);
+    options.setEmbeddedCluster(cluster);
+    options.setParallelism(10);
+
+    byte[] serializedOptions = serialize(options);
+    GearpumpPipelineOptions deserializedOptions = new ObjectMapper()
+      .readValue(serializedOptions, PipelineOptions.class).as(GearpumpPipelineOptions.class);
+
+    assertNull(deserializedOptions.getEmbeddedCluster());
+    assertNull(deserializedOptions.getSerializers());
+    assertEquals(10, deserializedOptions.getParallelism());
+    assertEquals(appName, deserializedOptions.getApplicationName());
+  }
+
+  private byte[] serialize(Object obj) {
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      new ObjectMapper().writeValue(baos, obj);
+      return baos.toByteArray();
+    } catch (Exception e) {
+      throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
new file mode 100644
index 0000000..af5a1d2
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.io;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.gearpump.Message;
+import org.apache.gearpump.streaming.source.Watermark;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for {@link GearpumpSource}.
+ */
+public class GearpumpSourceTest {
+  private static final List<TimestampedValue<String>> TEST_VALUES = Lists.newArrayList(
+    TimestampedValue.of("a", new org.joda.time.Instant(Long.MIN_VALUE)),
+    TimestampedValue.of("b", new org.joda.time.Instant(0)),
+    TimestampedValue.of("c", new org.joda.time.Instant(53)),
+    TimestampedValue.of("d", new org.joda.time.Instant(Long.MAX_VALUE - 1))
+  );
+
+  private static class SourceForTest<T> extends GearpumpSource<T> {
+    private ValuesSource<T> valuesSource;
+
+    SourceForTest(PipelineOptions options, ValuesSource<T> valuesSource) {
+      super(options);
+      this.valuesSource = valuesSource;
+    }
+
+    @Override
+    protected Source.Reader<T> createReader(PipelineOptions options) throws IOException {
+      return this.valuesSource.createReader(options, null);
+    }
+  }
+
+  @Test
+  public void testGearpumpSource() {
+    GearpumpPipelineOptions options = PipelineOptionsFactory.create()
+      .as(GearpumpPipelineOptions.class);
+    ValuesSource<TimestampedValue<String>> valuesSource = new ValuesSource<>(TEST_VALUES,
+      TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of()));
+    SourceForTest<TimestampedValue<String>> sourceForTest =
+      new SourceForTest<>(options, valuesSource);
+    sourceForTest.open(null, Instant.EPOCH);
+
+    for (TimestampedValue<String> value: TEST_VALUES) {
+      // Check the watermark first since the Source will advance when it's opened
+      Instant expectedWaterMark = TranslatorUtils.jodaTimeToJava8Time(value.getTimestamp());
+      Assert.assertEquals(expectedWaterMark, sourceForTest.getWatermark());
+
+      Message expectedMsg = Message.apply(
+        WindowedValue.timestampedValueInGlobalWindow(value, value.getTimestamp()),
+        value.getTimestamp().getMillis());
+      Message message = sourceForTest.read();
+      Assert.assertEquals(expectedMsg, message);
+    }
+
+    Assert.assertNull(sourceForTest.read());
+    Assert.assertEquals(Watermark.MAX(), sourceForTest.getWatermark());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
new file mode 100644
index 0000000..8c50703
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.gearpump.translators.io;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.runners.gearpump.GearpumpRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.gearpump.cluster.ClusterConfig;
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+import org.apache.gearpump.util.Constants;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ValuesSource}.
+ */
+public class ValueSoureTest {
+
+  @Test
+  public void testValueSource() {
+    GearpumpPipelineOptions options = PipelineOptionsFactory.create()
+      .as(GearpumpPipelineOptions.class);
+    Config config = ClusterConfig.master(null);
+    config = config.withValue(Constants.APPLICATION_TOTAL_RETRIES(),
+      ConfigValueFactory.fromAnyRef(0));
+    EmbeddedCluster cluster = new EmbeddedCluster(config);
+    cluster.start();
+
+    options.setEmbeddedCluster(cluster);
+    options.setRunner(GearpumpRunner.class);
+    options.setParallelism(1);
+    Pipeline p = Pipeline.create(options);
+    List<String> values = Lists.newArrayList("1", "2", "3", "4", "5");
+    ValuesSource<String> source = new ValuesSource<>(values, StringUtf8Coder.of());
+    p.apply(Read.from(source))
+      .apply(ParDo.of(new ResultCollector()));
+
+    p.run().waitUntilFinish();
+    cluster.stop();
+
+    Assert.assertEquals(Sets.newHashSet(values), ResultCollector.RESULTS);
+  }
+
+  private static class ResultCollector extends DoFn<Object, Void> {
+    private static final Set<Object> RESULTS = Collections.synchronizedSet(new HashSet<>());
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      RESULTS.add(c.element());
+    }
+  }
+}